对于C++消息队列机制实现的思路构想

如果你不知道这是什么

 了解这个框架的方法笔者推荐是去看一下 Microsoft.CommunityToolkit.Mvvm 中 Messenger 通讯相关的内容,当读者了解并能使用 IRecipient,ObservableRecipient 进行消息通讯后再来读此篇文章。

 在这里简短的介绍一下 IRecipient 及其相关知识:

 ObservableRecipient 在 Microsoft.CommunityToolkit.Mvvm 中使用 默认的弱引用消息(WeakReferenceMessenger.Default) 进行通信,我们可以使用多种方法来注册消息,以下笔者选择为后文方便介绍的形式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class MyViewModel : ObservableRecipient, IRecipient<MyMessage>
{
public MyViewModel()
{
IsActive = true;
}

public void Receive(MyMessage message)
{
// 处理消息
}

protected override void OnActivated()
{
Messenger.RegisterAll(this);
}

protected override void OnDeactivated()
{
Messenger.UnregisterAll(this);
}
}

 其中 IRecipient<TMessage> 接口的某版本的代码定义为:

1
2
3
4
5
public interface IRecipient<TMessage>
where TMessage : class
{
void Receive(TMessage message);
}

 发送消息的方式(此处的 MyMessage 为自定义类,通过构造传参或提前设置数据来为接收方传递数据):

1
WeakReferenceMessenger.Default.Send(new MyMessage());

 在继续阅读之前读者应该深刻理解到该消息机制在对象间通讯的便捷性。


消息机制系统的构成实现

  1. 消息

     在 Microsoft.CommunityToolkit.Mvvm 中 IRecipient 的消息识别方式是对存储的消息类型进行判断,WeakReferenceMessenger.Default 作为 IMessenger 的继承者需要通过参数来调用正确的接收者的 Receive 函数。在 C++ 中通过构造 消息基类(MessageBase)并添加共有标识符即可做到,如下为示例:

    1
    2
    3
    4
    class MessageBase {
    public:
    virtual std::string getType() const = 0;
    }

     此处的返回类型可以替换为 enum class 类型方便代码编写与更容易的判断消息类型是否一致。

     为对消息基类进行封装我们引入 消息类(Message) 作为其子类并用于真正的消息识别,以下为 Message 的实现示例:

    1
    2
    3
    4
    5
    6
    7
    tempalte <typename MessageType> 
    class Message : public MessageBase {
    public:
    virtual std::string getType() const {
    return MessageType::getStaticType();
    }
    }

     如果不想再封装一层的话可以直接使用 Message 作为基类,本文章不提及这样的相关实现思路与注意事项。

     定义一个具体消息的方式如下:

    1
    2
    3
    4
    5
    6
    7
    8
    class ASimpleMessage : public Message<ASimpleMessage> {
    public:
    static const std::string getStaticType() {
    return "ASimpleMessage";
    }

    int customParameter;
    }

     如果读者不知道上面的定义是什么意思,请阅读 奇异递归模板模式(Curiously Recurring Template Pattern, CRTP)的相关内容。Message<ASimpleMessage> 通过模板类型数据调用模板类的静态函数来获取子类信息。

  2. 消息的监听者

     在 如果你不知道这是什么 中笔者展示了一种 IRecipient 的继承使用方式:

    1
    public class MyViewModel : ObservableRecipient, IRecipient<MyMessage>

     这里的 IRecipient 可以看作对某个具体消息的监听者,监听者有义务实现对某个具体消息的处理方法。

     为构建封装良好的框架结果以及存储便利笔者这里对监听者的基类进行实现如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    class MessageListenerBase {
    public:
    MessageListenerBase() {...};

    virtual ~MessageListenerBase() {...};

    virtual std::string getType() const = 0;

    virtual void doHandleMessageBase(MessageBase* message) = 0;
    }

     在此类中我们进行一些监听者通用的操作,在下文中我们会介绍到,此处暂时略过。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    template<typename MessageType>
    class MessageListener : public MessageListenerBase {
    public:
    virtual std::string getType() const {
    return MessageType::getStaticType();
    }

    virtual void doHandleMessageBase(MessageBase* message) {
    handleMessage(dynamic_cast<MessageType*>(message));
    }

    virtual void handleMessage(MessageType* message) = 0;
    }

     MessageListener将作为用于实际继承的类,由被继承的类完成对 handleMessage 方法的重写。

     MessageListener将具体消息作为模板参数,拥有对消息类型的访问权,通过 MessageListenerBase 作为基类进行存储,例如我们可以将 MessageListener 在以下数据结构中存储:

    1
    std::vector<MessageListenerBase*> m_listeners;

     对消息类型的访问可以通过以下方式进行:

    1
    2
    3
    4
    for(MessageListenerBase* listener : m_listeners) {
    std::string type = listener->getType();
    ...
    }
  3. 消息处理管道

     消息处理的系统应为整个程序服务,这里我们设计为单例模式为消息系统服务。

     该单例模式的实现样例如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    class MessageQueue {
    public:
    static std::shared_ptr<MessageQueue> getInstance() {
    if (!s_instance)
    {
    s_instance = std::shared_ptr<MessageQueue>(new MessageQueue());
    }
    return s_instance;
    };

    private:
    static std::shared_ptr<MessageQueue> s_instance;

    MessageQueue() {};
    MessageQueue(const MessageQueue&) = delete;
    void operator=(const MessageQueue&) = delete;
    }

    std::shared_ptr<MessageQueue> MessageQueue::s_instance;

     MessageQueue 将作为消息存储、回调消息处理函数的总负责类,其中需要对消息对象进行存储,注册及取消注册的操作定义如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    class MessageQueue {
    ...
    public:
    void registerListener(MessageListenerBase* listener) {
    std::lock_guard<std::mutex> lock(m_listenersMutex);
    m_listeners.push_back(listener);
    };

    void unregisterListener(MessageListenerBase* listener) {
    std::lock_guard<std::mutex> lock(m_listenersMutex);
    for (size_t i = 0; i < m_listeners.size(); i++)
    {
    if (m_listeners[i] == listener)
    {
    m_listeners.erase(m_listeners.begin() + i);
    return;
    }
    }
    };

    private:
    std::vector<MessageListenerBase*> m_listeners;
    mutable std::mutex m_listenersMutex;
    ...
    }

     对于接下来消息的处理笔者只给出实现思路以及部分实现。

     对消息的立即处理可以由下面给出的函数实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    void MessageQueue::sendMessage(std::shared_ptr<MessageBase> message) {
    std::lock_guard<std::mutex> lock(m_listenersMutex);

    for (m_currentListenerIndex = 0; m_currentListenerIndex < m_listeners.size();
    m_currentListenerIndex++)
    {
    MessageListenerBase* listener = m_listeners[m_currentListenerIndex];

    if (listener->getType() == message->getType())
    {
    // The listenersMutex gets unlocked so changes to listeners are possible while message handling.
    m_listenersMutex.unlock();
    listener->doHandleMessageBase(message.get());
    m_listenersMutex.lock();
    }
    }
    }

     对于消息的自动注册,可以通过对 MessageListenerBase 的构造函数添加以下实现代码,并在该监听者销毁时自动注销:

    1
    2
    3
    4
    5
    6
    7
    void MessageListenerBase::MessageListenerBase() {
    MessageQueue::getInstance()->registerListener(this);
    }

    void MessageListenerBase::~MessageListenerBase() {
    MessageQueue::getInstance()->unregisterListener(this);
    }

     现在我们有一个 ASimpleMessage 作为具体的消息类,一个 MessageQueue 用于全局单例处理消息的存储、对象注册等,以下我们添加一个监听者:

    1
    2
    3
    4
    5
    6
    class ASimpleListener : public MessageListener<ASimpleMessage> {
    public:
    virtual void handleMessage(MessageType* message) {
    ...//Handle Message
    }
    }

     因为继承至 MessageListenerBase 的原因,ASimpleListener 在构造时调用父类 MessageListenerBase 的构造函数自动在 MessageQueue 中注册处理,并在销毁时自动取消注册。

     在程序运行时调用 MessageQueue 单例的 sendMessage 函数即可发送并处理对应的消息。


我们其实做了什么

 在以上如此 ” 繁琐 “ 的实现中,我们仅仅是为了将正确的消息调度到正确的处理函数上。由于 C++ 本身没有反射功能,我们封装了消息类型并为之添加上能够识别彼此的标识信息(上文中为一个 std::string 类型字符串作为标识),并为每个需要监听消息的类实现了监听抽象的基类,监听本身的作用便是在消息管道中正确的调用监听者的回调处理函数(上文的 handleMessage 方法)。消息管道拥有所有监听者的指针引用,在有消息到来时遍历这些指针引用并找出匹配该消息的监听者,将消息作为参数传递到监听者的处理函数中回调,实际上为消息去找监听者。

 实现中比较常用的几个思想: 奇异递归模板模式(Curiously Recurring Template Pattern, CRTP)回调(Call Back)单例模式(Singleton Pattern)


以下SourceTrail中用到的 消息 机制框架可以直接进行使用:

SourceTrail Message Queue Zip