国内最全IT社区平台 联系我们 | 收藏本站
华晨云阿里云优惠2
您当前位置:首页 > php框架 > 框架设计 > 细说linux IPC(九):posix消息队列

细说linux IPC(九):posix消息队列

来源:程序员人生   发布时间:2015-01-12 08:39:37 阅读次数:3808次
【版权声明:尊重原创,转载请保存出处:blog.csdn.net/shallnet 或 .../gentleliu,文章仅供学习交换,请勿用于商业用处】

        消息队列可以看做1系列消息组织成的链表,1个程序可以往这个链表添加消息,另外的程序可以从这个消息链表读走消息。
  • mq_open()函数打开或创建1个posix消息队列。
       #include <fcntl.h>           /* For O_* constants */
       #include <sys/stat.h>        /* For mode constants */
       #include <mqueue.h>
       mqd_t mq_open(const char *name, int oflag);
       mqd_t mq_open(const char *name, int oflag, mode_t mode,
                     struct mq_attr *attr);
       Link with -lrt.
参数name为posix IPC名字, 行将要被打开或创建的消息队列对象,为了便于移植,需要指定为“/name”的格式。
参数oflag必须要有O_RDONLY(只读)、标志O_RDWR(读写), O_WRONLY(只写)之1,除此以外还可以指定O_CREAT(没有该对象则创建)、O_EXCL(如果O_CREAT指定,但name不存在,就返回毛病),O_NONBLOCK(以非阻塞方式打开销息队列,在正常情况下mq_receive和mq_send函数会阻塞的地方,使用该标志打开的消息队列会返回EAGAIN毛病)。
当操作1个新队列时,使用O_CREAT标识,此时后面两个参数需要被指定,参数mode为指定权限位,attr指定新创建队列的属性。

  • mq_close()函数关闭消息队列。
       #include <mqueue.h>
       int mq_close(mqd_t mqdes);
       Link with -lrt.
关闭以后调用进程不在使用该描写符,但消息队列不会从系统中删除,进程终止时,会自动关闭已打开的消息队列,和调用mq_close1样。参数为mq_open()函数返回的值。

  • mq_unlink()函数从系统中删除某个消息队列。
    #include <mqueue.h> int mq_unlink(const char *name); Link with -lrt.

删除会马上产生,即便该队列的描写符援用计数依然大于0。参数为mq_open()函数第1个参数。

  • mq_setattr()函数和mq_getattr()函数分别设置和和获得消息队列属性。
       #include <mqueue.h>
       int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
       int mq_setattr(mqd_t mqdes, struct mq_attr *newattr,  struct mq_attr *oldattr); 
       Link with -lrt.


参数mqdes为mq_open()函数返回的消息队列描写符。
参数attr、newattr、oldattr为消息队列属性结构体指针;
           struct mq_attr {
               long mq_flags;       /* Flags: 0 or O_NONBLOCK */
               long mq_maxmsg;      /* Max. # of messages on queue */
               long mq_msgsize;     /* Max. message size (bytes) */
               long mq_curmsgs;     /* # of messages currently in queue */
           };
参数mq_flags在mq_open时被初始化(oflag参数),其值为0 或 O_NONBLOCK。
参数mq_maxmsg和mq_msgsize在mq_open时在参数attr中初始化设置,mq_maxmsg是指队列的消息个数最大值;mq_msgsize为队列每一个消息的最大值。
参数mq_curmsgs为当前队列消息。

mq_getattr()函数把队列当前属性填入attr所指向的结构体。
mq_setattr()函数只能设置mq_flags属性,另外的域会被自动疏忽,mq_maxmsg和mq_msgsize的设置需要在mq_open当中来完成, 参数oldattr会和函数mq_getattr函数中参数attr相同的值

  • mq_send() 函数 和mq_receive()函数分别用于向消息队列放置和取走消息。
       #include <mqueue.h>
       int mq_send(mqd_t mqdes, const char *msg_ptr,   size_t msg_len, unsigned msg_prio); 
       ssize_t mq_receive(mqd_t mqdes, char *msg_ptr,  size_t msg_len, unsigned *msg_prio); 
       Link with -lrt.
参数msg_ptr为指向消息的指针。
msg_len为消息长度,该值不能大于属性值中mq_msgsize的值。
msg_prio为优先级,消息在队列中将依照优先级大小顺序来排列消息。
如果消息队列已满,mq_send()函数将阻塞,直到队列有可用空间再次允许放置消息或该调用被信号打断;如果O_NONBLOCK被指定,mq_send()那末将不会阻塞,而是返回EAGAIN毛病。如果队列空,mq_receive()函数将阻塞,直到消息队列中有新的消息;如果O_NONBLOCK被指定,mq_receive()那末将不会阻塞,而是返回EAGAIN毛病。


示例:

服务进程:
int sln_ipc_mq_loop(void)
{
    mqd_t           mqd;
    struct mq_attr  setattr, attr;
    char            *recvbuf = NULL;
    unsigned int    prio;
    int             recvlen;

    setattr.mq_maxmsg = SLN_IPC_MQ_MAXMSG;
    setattr.mq_msgsize = SLN_IPC_MQ_MSGSIZE;

    mqd = mq_open(SLN_IPC_MQ_NAME, O_RDWR | O_CREAT | O_EXCL, 0644, &setattr); //创建消息队列并设置消息队列属性
    if ((mqd < 0) && (errno != EEXIST)) {
        fprintf(stderr, "mq_open: %s
", strerror(errno));
        return ⑴;
    }

    if ((mqd < 0) && (errno == EEXIST)) { // 消息队列存在则打开
        mqd = mq_open(SLN_IPC_MQ_NAME, O_RDWR);
        if (mqd < 0) {
            fprintf(stderr, "mq_open: %s
", strerror(errno));
            return ⑴;
        }
    }

    if (mq_getattr(mqd, &attr) < 0) { //获得消息队列属性
        fprintf(stderr, "mq_getattr: %s
", strerror(errno));
        return ⑴;
    }

    printf("flags: %ld, maxmsg: %ld, msgsize: %ld, curmsgs: %ld
",
            attr.mq_flags, attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);

    recvbuf = malloc(attr.mq_msgsize); //为读取消息队列分配当前系统允许的每条消息的最大大小的内存空间
    if (NULL == recvbuf) {
        return ⑴;
    }

    for (;;) {
        recvlen = mq_receive(mqd, recvbuf, attr.mq_msgsize, &prio); //从消息队列中读取消息
        if (recvlen < 0) {
            fprintf(stderr, "mq_receive: %s
", strerror(errno));
            continue;
        }

        printf("recvive length: %d, prio: %d, recvbuf: %s
", recvlen, prio, recvbuf);
    }

    return 0;
}


客户进程:
int sln_ipc_mq_send(const char *sendbuf, int sendlen, int prio)
{
    mqd_t           mqd;

    mqd = mq_open(SLN_IPC_MQ_NAME, O_WRONLY); //客户进程打开销息队列
    if (mqd < 0) {
        fprintf(stderr, "mq_open: %s
", strerror(errno));
        return ⑴;
    }

    if (mq_send(mqd, sendbuf, sendlen, prio) < 0) { //客户进程网消息队列中添加1条消息
        fprintf(stderr, "mq_send: %s
", strerror(errno));
        return ⑴;
    }

    return 0;
}
        程序运行时,服务进程阻塞于mq_receive,客户进程每发1条消息队列,服务进程都会从mq_receive处返回,但不1定接收到的消息就是客户进程最近发送的那1条消息,由于客户进程往消息队列中添加消息时会依照优先级来排序,如果客户进程同时向消息队列添加多条消息,服务进程还未来得及读取,那末当服务进程开始读取的消息1定是优先级最高的那条消息,而不是客户进程最早发送的那1条消息。
我们将服务进程稍作修改来试1下:
int sln_ipc_mq_loop(void)
{
    mqd_t           mqd;
    struct mq_attr  setattr, attr;
    char            *recvbuf = NULL;
    unsigned int    prio;
    int             recvlen;
    memset(&setattr, 0, sizeof(setattr));
    setattr.mq_maxmsg = SLN_IPC_MQ_MAXMSG;
    setattr.mq_msgsize = SLN_IPC_MQ_MSGSIZE;
    mqd = mq_open(SLN_IPC_MQ_NAME, O_RDWR | O_CREAT | O_EXCL, 0644, &setattr);
    //mqd = mq_open(SLN_IPC_MQ_NAME, O_RDWR | O_CREAT | O_EXCL, 0644, NULL);
    if ((mqd < 0) && (errno != EEXIST)) {
        fprintf(stderr, "mq_open: %s
", strerror(errno));
        return ⑴;
    }
    if ((mqd < 0) && (errno == EEXIST)) { // name is exist
        mqd = mq_open(SLN_IPC_MQ_NAME, O_RDWR);
        if (mqd < 0) {
            fprintf(stderr, "mq_open: %s
", strerror(errno));
            return ⑴;
        }
    }
    if (mq_getattr(mqd, &attr) < 0) {
        fprintf(stderr, "mq_getattr: %s
", strerror(errno));
        return ⑴;
    }
    printf("flags: %ld, maxmsg: %ld, msgsize: %ld, curmsgs: %ld
",
            attr.mq_flags, attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);
    recvbuf = malloc(attr.mq_msgsize);
    if (NULL == recvbuf) {
        return ⑴;
    }
    sleep(10); //此处等待10秒,此时客户进程1次性向消息队列加入多条消息
    for (;;) {
        if (mq_getattr(mqd, &attr) < 0) {
            fprintf(stderr, "mq_getattr: %s
", strerror(errno));
            return ⑴;
        }
        printf("msgsize: %ld, curmsgs: %ld
", attr.mq_msgsize, attr.mq_curmsgs);
        recvlen = mq_receive(mqd, recvbuf, attr.mq_msgsize, &prio);
        if (recvlen < 0) {
            fprintf(stderr, "mq_receive: %s
", strerror(errno));
            continue;
        }
        printf("recvive-> prio: %d, recvbuf: %s
", prio, recvbuf);
        sleep(1); //每秒处理1个消息
    }
    mq_close(mqd);
    return 0;
}
服务进程先运行,然后客户进程立即向消息队列加入12消息,每条消息优先级从1到12,,以后服务进程运行,程序运行以下:
# ./server
flags: 0, maxmsg: 10, msgsize: 1024, curmsgs: 0
msgsize: 1024, curmsgs: 10
recvive-> prio: 10, recvbuf: asdf
msgsize: 1024, curmsgs: 10
recvive-> prio: 11, recvbuf: 1234
msgsize: 1024, curmsgs: 10
recvive-> prio: 12, recvbuf: asdf
msgsize: 1024, curmsgs: 9
recvive-> prio: 9, recvbuf: 1234
msgsize: 1024, curmsgs: 8
recvive-> prio: 8, recvbuf: asdf
msgsize: 1024, curmsgs: 7
recvive-> prio: 7, recvbuf: 1234
msgsize: 1024, curmsgs: 6
recvive-> prio: 6, recvbuf: asdf
msgsize: 1024, curmsgs: 5
recvive-> prio: 5, recvbuf: 1234
msgsize: 1024, curmsgs: 4
recvive-> prio: 4, recvbuf: asdf
msgsize: 1024, curmsgs: 3
recvive-> prio: 3, recvbuf: 1234
msgsize: 1024, curmsgs: 2
recvive-> prio: 2, recvbuf: asdf
msgsize: 1024, curmsgs: 1
recvive-> prio: 1, recvbuf: 1234
msgsize: 1024, curmsgs: 0
        可以看到,系统允许最大消息数量是10条,当客户进程1次性加入12条消息时,客户进程在加入最后两条会阻塞在那里,直到服务进程取出消息以后,最后两天消息才能顺次加入到消息队列。并且服务进程取出消息时从优先级从高到低取出:10->11->12->9->8->... ->1
本节源码下载:
点击打开链接

生活不易,码农辛苦
如果您觉得本网站对您的学习有所帮助,可以手机扫描二维码进行捐赠
程序员人生
------分隔线----------------------------
分享到:
------分隔线----------------------------
关闭
程序员人生