消息通知与处理机制

设计目的

当发送消息的线程任然有任务时,就可以使用此机制,来进行消息的通知。

此消息处理机制是开启了一个线程进行接受数据并处理,所以不影响发送消息的线程。

使用场景

  • 状态上报:客户端收到server的上报时,使用此机制开启线程处理上报,接受消息的线程继续接受不会阻塞
  • 其他模块的上报:如电池电量变化通知到我们sdk中的回调函数,此时是由负责电池电量部分创建线程执行的,利用此机制不会阻塞第三方模块的线程
  • 开启自定义的线程,达到条件后,可以借此机制运行自己的线程函数

机制设计

主要有两个类:CHandler和CMessage

  • CHandler:消息处理类,负责发送和接受消息,通过原子操作实现自己发给自己消息
  • CMessage:消息体,负责创建消息,消息中包含接受此消息的CHandler,方便其他对象向不同的CHandler发送消息

工具类也有两个:CNotify和CNotifyList

  • CNotify:包含CHandler和id,可以注册一条消息,以及通知消息。实现不同对象间的消息通知。
  • CNotifyList:CNotify的集合,可以通知多个CHandler对象

结构如下:

handler

使用方式

  1. 需要接收消息的对象继承CHandler,并调用线程启动函数start(),start()函数具体参考CThread
  2. 必须重写handleMessage(),可以选择重写MessageToString()方法,建议重写
  3. 发送消息的对象创建CNotify或CNotifyList对象,并接收需要通知的对象注册
  4. CHandler通过sendMessage()向自己发消息并处理,或者接收已经被注册的对象发送的消息

CHandler的实现

主要使用原子操作进行数据的同步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#include "chandler.h"

CHandler::CHandler()
{

}

CHandler::~CHandler()
{
CMessage::obtain(this,C_MESSAGE_HANDLER_EXIT)->sendToHandler();

/*Wait for 50ms to make sure thread exit*/
mMutex.lock();
mCond.wait(&mMutex, 50);
mMutex.unlock();

stop();
join();
}

void CHandler::run()
{
CMessage *message = nullptr;
bool exit_flag = false;

while (!exit_flag)
{
mMutex.lock();
if (mMessageQueue.empty())
{
//wait new message
printf("wait new message\n");
mCond.wait(&mMutex);
}
message = mMessageQueue.front();
mMessageQueue.pop();
mMutex.unlock();

if (message != nullptr)
{
//Dispatch message
exit_flag = dispatchMessage(message);
delete message;
message = nullptr;
}
}

printf("[%s] prepare exit", getThreadName().c_str());
mMutex.lock();
while (!mMessageQueue.empty())
{
message = mMessageQueue.front();
mMessageQueue.pop();
delete message;
message = nullptr;
}
mCond.signal();
mMutex.unlock();
printf("[%s] exit\n", getThreadName().c_str());
}

bool CHandler::dispatchMessage(CMessage *message)
{
if (message != nullptr)
{
if (message->getMessageId() == C_MESSAGE_HANDLER_EXIT)
{
printf("[%s] <= [%s] message[C_MESSAGE_HANDLER_EXIT]\n", (message->getHandler())->getThreadName().c_str(), (message->getOriginName()).c_str());
return true;
}
printf("[%s] <= [%s] message[%s]\n", (message->getHandler())->getThreadName().c_str(), (message->getOriginName()).c_str(), messageToString(message).c_str());
//Processing messages
handleMessage(message);
}

return false;
}

CMessage *CHandler::obtainMessage(int message_id)
{
return CMessage::obtain(this, message_id);
}

void CHandler::sendMessage(CMessage *message)
{
if (message == nullptr)
{
printf("message is null\n");
return;
}

mMutex.lock();
mMessageQueue.push(message);
if (mMessageQueue.size() >= 1)
{
//recv new message
//notify running thread
mCond.signal();
}
mMutex.unlock();
}

std::string CHandler::messageToString(CMessage *message)
{
if (message == nullptr)
{
return "";
}

switch (message->getMessageId())
{
case C_MESSAGE_HANDLER_EXIT:
return "C_MESSAGE_HANDLER_EXIT";
default:
return "UNKNOWN";
}
}

CMessage的实现

CMessage比较简单,使用静态方法obtain生成对象,使用getArg获取参数

主要运用了模板函数,此处贴上他们的模板函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
template<typename T>
static CMessage *obtain(CHandler *handler, int message_id, std::shared_ptr<T> arg)
{
std::shared_ptr<void> p_arg = std::static_pointer_cast<void>(arg);
CMessage *message = new CMessage(handler, message_id, p_arg);
return message;
}

template <typename T>
std::shared_ptr<T> getArg()
{
return std::static_pointer_cast<T>(mArg);
}

CNotify的实现

CNotify的实现是保存CHandler对象的指针以及messageid,在需要通知时接收参数并创建CMessage对象再进行发送,set()方法就是简单的赋值,主要看通知方法:

1
2
3
4
5
6
7
8
template <typename T>
void notify(std::shared_ptr<T> arg)
{
if (mHandler != nullptr)
{
CMessage::obtain(mHandler, mMessageId,std::static_pointer_cast<void>(arg))->sendToHandler();
}
}

CNotifyList的实现

接收CHandler对象的指针以及messageid,创建CNotify并保存到队列中,通知时调用CNotify的通知方法就可以了,主要展示通知方法:

1
2
3
4
5
6
7
8
9
10
11
template<typename T>
void notify(std::shared_ptr<T> arg)
{
std::list<CNotify *>::iterator it;
mMutex.lock();
for (it = mNotifications.begin(); it != mNotifications.end(); ++it)
{
(*it)->notify(arg);
}
mMutex.unlock();
}

消息通知与处理机制
https://carl-5535.github.io/2022/11/26/CarlSDK/消息处理/
作者
Carl Chen
发布于
2022年11月26日
许可协议