消息队列
消息队列就是一些消息的列表。用户可以从消息队列中添加消息和读取消息等。从这点看,消息队列具有一定的FIFO特性,但是它可以实现消息的随即查询,比FIFO具有更大的优势。同时这些消息又是存在于内核中的,有“队列ID”来标识
消息队列使用
函数说明
消息队列的实现包括打开消息队列、添加消息、读取消息和控制消息队列这四种操作:
- 创建和打开消息队列使用msggget(),创建消息队列的数量会受到系统消息队列数量的限制
- 添加消息使用msgsnd(),它把消息添加到已经打开的消息队列末尾
- 读取消息使用msgrcv(),它把消息从消息队列中取走,与FIFO不同它可以指定取走的消息
- 控制消息队列是以哦那个msgctl(),它可以完成多项功能
函数格式
1 2 3 4 5
| #include <sys/types.h> #include <sys/ipc.h> #include <sys/shm.h>
int msgget (key_t key, int msgflg);
|
key:消息队列的键值,多个进程可以通过它访问同一个消息队列,IPC_PRIVATE用于创建当前进程的私有消息队列
msgflg:权限标志位
成功返回消息队列ID,出错返回-1
1 2 3 4 5
| #include <sys/types.h> #include <sys/ipc.h> #include <sys/shm.h>
int msgsnd (int msqid, const *msgp, size_t msgsz, int msgflg);
|
msgid:消息队列的队列ID
msgp:指向消息结构的指针
1 2 3 4 5
| struct msgbuf { long mtype; char mtext[1]; }
|
msgsz:消息正文字节数
msgflg:0阻塞直到发送成功,IPC_NOWAIT消息无法立即发送则返回
成功返回0,出错返回1
1 2 3 4 5
| #include <sys/types.h> #include <sys/ipc.h> #include <sys/shm.h>
int msgrcv (int msqid, const *msgp, size_t msgsz, long int msgtyp, int msgflg);
|
这个函数和msgsnd()差不多这里介绍两个不同的参数:
msgtyp:
- 0 接收消息队列中的第一个消息
- 大于0 接收第一个类型为msgtyp的消息
- 小于0 接收第一个类型值不小于msgtyp绝对值且类型值又最小的消息
msgflg:
- MSG_NOERROR 若返回的消息比msgsz字节多,则被截断到msgsz且不通知消息发送进程
- IPC_NOWAIT 若没有相应类型的消息则立即返回
- 0 阻塞直到接收一条相应类型的消息
成功返回0,出错返回-1
1 2 3 4 5
| #include <sys/types.h> #include <sys/ipc.h> #include <sys/shm.h>
int msgctl (int msqid, int cmd, struct msgid_ds *buf)
|
cmd:
- IPC_STAT 读取消息队列的数据结构msgid_ds,并将其存储在buf指定的地址中
- IPC_SET 设置消息队列的数据结构msgid_ds中的ipc_perm域值,这个值取自buff
- IPC_RMID 删除消息队列
成功返回0,出错返回-1
实例
发送
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #include <sys/wait.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h>
#define BUFFER_SIZE 512
struct message { long msg_type; char msg_text[BUFFER_SIZE]; };
int main () { int qid = 0; key_t key; struct message msg;
if ((key = ftok (".", 'a')) == -1) { perror ("ftok"); exit(1); }
if ((qid = msgget (key, IPC_CREAT|0666)) == -1) { perror ("msgget"); exit (1); } printf ("Open queue %d\n", qid);
while (1) { printf ("Enter some message to the queue:"); if ((fgets (msg.msg_text, BUFFER_SIZE, stdin)) == NULL) { puts ("no message"); exit(1); }
msg.msg_type = getpid ();
if ((msgsnd (qid, &msg, strlen (msg.msg_text), 0)) < 0) { perror ("message posted"); exit (1); }
if (strncmp (msg.msg_text, "quit", 4) == 0) { break; } }
exit (0); }
|
接收
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #include <sys/wait.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h>
#define BUFFER_SIZE 512
struct message { long msg_type; char msg_text[BUFFER_SIZE]; };
int main () { int qid = 0; key_t key; struct message msg;
if ((key = ftok (".", 'a')) == -1) { perror ("ftok"); exit(1); }
if ((qid = msgget (key, IPC_CREAT|0666)) == -1) { perror ("msgget"); exit (1); } printf ("Open queue %d\n", qid);
do { memset (msg.msg_text, 0, BUFFER_SIZE);
if ((msgrcv (qid, (void *)&msg, BUFFER_SIZE, 0, 0)) < 0) { perror ("msgrcv"); exit (1); }
printf ("The message from process %ld : %s", msg.msg_type, msg.msg_text); } while (strncmp (msg.msg_text, "quit", 4));
if ((msgctl (qid, IPC_RMID, NULL)) < 0) { perror ("msgctl"); exit (1); }
exit (0); }
|