消息队列

消息队列

消息队列就是一些消息的列表。用户可以从消息队列中添加消息和读取消息等。从这点看,消息队列具有一定的FIFO特性,但是它可以实现消息的随即查询,比FIFO具有更大的优势。同时这些消息又是存在于内核中的,有“队列ID”来标识

消息队列使用

函数说明

消息队列的实现包括打开消息队列、添加消息、读取消息和控制消息队列这四种操作:

  • 创建和打开消息队列使用msggget(),创建消息队列的数量会受到系统消息队列数量的限制
  • 添加消息使用msgsnd(),它把消息添加到已经打开的消息队列末尾
  • 读取消息使用msgrcv(),它把消息从消息队列中取走,与FIFO不同它可以指定取走的消息
  • 控制消息队列是以哦那个msgctl(),它可以完成多项功能

函数格式

1
2
3
4
5
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>

int msgget (key_t key, int msgflg);

key:消息队列的键值,多个进程可以通过它访问同一个消息队列,IPC_PRIVATE用于创建当前进程的私有消息队列

msgflg:权限标志位

成功返回消息队列ID,出错返回-1


1
2
3
4
5
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>

int msgsnd (int msqid, const *msgp, size_t msgsz, int msgflg);

msgid:消息队列的队列ID

msgp:指向消息结构的指针


1
2
3
4
5
struct msgbuf
{
long mtype; //消息类型
char mtext[1]; //消息正文
}

msgsz:消息正文字节数

msgflg:0阻塞直到发送成功,IPC_NOWAIT消息无法立即发送则返回

成功返回0,出错返回1


1
2
3
4
5
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>

int msgrcv (int msqid, const *msgp, size_t msgsz, long int msgtyp, int msgflg);

这个函数和msgsnd()差不多这里介绍两个不同的参数:

msgtyp:

  • 0 接收消息队列中的第一个消息
  • 大于0 接收第一个类型为msgtyp的消息
  • 小于0 接收第一个类型值不小于msgtyp绝对值且类型值又最小的消息

msgflg:

  • MSG_NOERROR 若返回的消息比msgsz字节多,则被截断到msgsz且不通知消息发送进程
  • IPC_NOWAIT 若没有相应类型的消息则立即返回
  • 0 阻塞直到接收一条相应类型的消息

成功返回0,出错返回-1


1
2
3
4
5
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>

int msgctl (int msqid, int cmd, struct msgid_ds *buf)

cmd:

  • IPC_STAT 读取消息队列的数据结构msgid_ds,并将其存储在buf指定的地址中
  • IPC_SET 设置消息队列的数据结构msgid_ds中的ipc_perm域值,这个值取自buff
  • IPC_RMID 删除消息队列

成功返回0,出错返回-1

实例

发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/wait.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>


#define BUFFER_SIZE 512

struct message
{
long msg_type;
char msg_text[BUFFER_SIZE];
};

int main ()
{
int qid = 0;
key_t key;
struct message msg;

if ((key = ftok (".", 'a')) == -1)
{
perror ("ftok");
exit(1);
}

if ((qid = msgget (key, IPC_CREAT|0666)) == -1)
{
perror ("msgget");
exit (1);
}
printf ("Open queue %d\n", qid);

while (1)
{
printf ("Enter some message to the queue:");
if ((fgets (msg.msg_text, BUFFER_SIZE, stdin)) == NULL)
{
puts ("no message");
exit(1);
}

msg.msg_type = getpid ();

if ((msgsnd (qid, &msg, strlen (msg.msg_text), 0)) < 0)
{
perror ("message posted");
exit (1);
}

if (strncmp (msg.msg_text, "quit", 4) == 0)
{
break;
}
}

exit (0);
}

接收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/wait.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>


#define BUFFER_SIZE 512

struct message
{
long msg_type;
char msg_text[BUFFER_SIZE];
};

int main ()
{
int qid = 0;
key_t key;
struct message msg;

if ((key = ftok (".", 'a')) == -1)
{
perror ("ftok");
exit(1);
}

if ((qid = msgget (key, IPC_CREAT|0666)) == -1)
{
perror ("msgget");
exit (1);
}
printf ("Open queue %d\n", qid);

do
{
memset (msg.msg_text, 0, BUFFER_SIZE);

if ((msgrcv (qid, (void *)&msg, BUFFER_SIZE, 0, 0)) < 0)
{
perror ("msgrcv");
exit (1);
}

printf ("The message from process %ld : %s", msg.msg_type, msg.msg_text);
} while (strncmp (msg.msg_text, "quit", 4));

if ((msgctl (qid, IPC_RMID, NULL)) < 0)
{
perror ("msgctl");
exit (1);
}

exit (0);
}

消息队列
https://carl-5535.github.io/2021/05/31/Linux系统编程/消息队列/
作者
Carl Chen
发布于
2021年5月31日
许可协议