IPC-消息队列

共 5770字,需浏览 12分钟

 ·

2021-03-26 19:43






消息队列概念


    消息队列是IPC(进程间通信,inter process communication)中常用的一种方式,相比与其他的通信方式消息队列具有在短消息处理和消息类别上有突出的表现。在理解消息队列前,需要了解一点关于Linux内核的知识。

    如图所示,Linux/unix的体系架构可以抽像成三层结构:应用程序(用户程序,软件等),内核(用于操作底层硬件)以及硬件。所以用户编写的程序是不能够直接操纵底层硬件的,需要通过系统内核。因此这就可以解释为什么共享内存的方式是IPC最快的方式,因为共享内存没有在内核中,而其他的包括消息队列是在内核中开辟空间,所以在访问速度上直接访问用户内存比内核空间要快


1.1 消息队列结构

    消息队列虽然不能够进程大量数据通信,但是却有一个很明显的优点,那就是在同一个消息队列中可以包含不同类型的消息,接收端可以根据自己的情况接受相应的消息,那么消息队列是怎样来保证这样的特点的呢?先看一下它的结构:

    如图在消息队列其实就是一个链表,其中可以存在不同的type,每一个type可能会有多条消息,接受端根据自己需要的type,一次从链表中获取第一个,第二个消息。


1.2 如何使用消息队列

    消息队列的使用很简单,使用msgget( ), msgsnd( ),msgrcv( )以及msgctl( )就完全搞定。

key:是内核中消息队列的标识,一般使用ftok来生成。

flagmsgget(key, IPC_CREAT|0666); 创建由key指定的消息队列,操作权限为0666。

    IPC_CREAT:用来创建一个消息队列

    IPC_EXCL:查询消息队列是否存在,IPC_CREAT同时使用,存在则报错。

    IPC_NOWAIT:之后的消息队列操作都为非阻塞


msqid:msgget创建队列后返回的消息队列唯一的标识。

ptr:用于接受或发送消息的内容,其结构固定,

struct msgbuf {  long mtype;//消息类型  char mtext[1024];//消息数据};

nbytes:消息长度。

flag标志选项

  •  阻塞等待

  • IPC_NOWAIT   使操作不阻塞,没有消息返回-1,error设置为ENOMSG



消息队列的demo


消息队列如何使用?一个简单的例子:

发送端:

接受端:



消息队列的进阶


    在使用消息队列的时候,你是否会有这样的疑问:如果多个进程同时向一个消息队列里面同一个Type发送消息,那么是否会出现资源竞争同步的问题呢

    是否会出现上面图中的局面呢,也就是说多个进程同时往消息队列里面写,那么就会同时认为msg1是链表的末尾,于是都把自己的消息添加到msg1的后面,导致了上面的现象,在接收端取数据的时候就会出现问题了。

    正确答案是,系统并没有这么傻,对于消息队列而言,一定要保证单链表,在内核代码中其实是做了同步的,也就是说当多个进程同时写的话,内核里面也会一个一个的进行链表的连接。我们大可放心,且不像共享内存一样需要用户自己同步,在使用消息队列的时候用户不需要进行同步

    内核代码里面有一个ipc_lock_check()来进行同步,保证资源竞争。

// 先看msgsnd()函数,它通过系统调用接口界面,进入内核执行,代码如下:SYSCALL_DEFINE4(msgsnd, int, msqid, struct msgbuf __user *, msgp, size_t, msgsz,        int, msgflg){    long mtype;     if (get_user(mtype, &msgp->mtype))        return -EFAULT;    return do_msgsnd(msqid, mtype, msgp->mtext, msgsz, msgflg);}// 接下来看do_msgsnd()部分的代码,如下:long do_msgsnd(int msqid, long mtype, void __user *mtext,        size_t msgsz, int msgflg){    struct msg_queue *msq;    struct msg_msg *msg;    int err;    struct ipc_namespace *ns;     ns = current->nsproxy->ipc_ns;     if (msgsz > ns->msg_ctlmax || (long) msgsz < 0 || msqid < 0)        return -EINVAL;    if (mtype < 1)        return -EINVAL;     msg = load_msg(mtext, msgsz);    if (IS_ERR(msg))        return PTR_ERR(msg);     msg->m_type = mtype;    msg->m_ts = msgsz;     msq = msg_lock_check(ns, msqid);    if (IS_ERR(msq)) {        err = PTR_ERR(msq);        goto out_free;    }     for (;;) {        struct msg_sender s;         err = -EACCES;        if (ipcperms(&msq->q_perm, S_IWUGO))            goto out_unlock_free;         err = security_msg_queue_msgsnd(msq, msg, msgflg);        if (err)            goto out_unlock_free;         if (msgsz + msq->q_cbytes <= msq->q_qbytes &&                1 + msq->q_qnum <= msq->q_qbytes) {            break;        }         /* queue full, wait: */        if (msgflg & IPC_NOWAIT) {            err = -EAGAIN;            goto out_unlock_free;        }        ss_add(msq, &s);        ipc_rcu_getref(msq);        msg_unlock(msq);        schedule();         ipc_lock_by_ptr(&msq->q_perm);        ipc_rcu_putref(msq);        if (msq->q_perm.deleted) {            err = -EIDRM;            goto out_unlock_free;        }        ss_del(&s);         if (signal_pending(current)) {            err = -ERESTARTNOHAND;            goto out_unlock_free;        }    }     msq->q_lspid = task_tgid_vnr(current);    msq->q_stime = get_seconds();     if (!pipelined_send(msq, msg)) {        /* noone is waiting for this message, enqueue it */        list_add_tail(&msg->m_list, &msq->q_messages);        msq->q_cbytes += msgsz;        msq->q_qnum++;        atomic_add(msgsz, &ns->msg_bytes);        atomic_inc(&ns->msg_hdrs);    }     err = 0;    msg = NULL; out_unlock_free:    msg_unlock(msq);out_free:    if (msg != NULL)        free_msg(msg);    return err;}// 在这段代码中,请注意临近入口位置的这个函数msg_lock_check(),我们跟进,看一下这个lock是如何check// 的,代码如下:static inline struct msg_queue *msg_lock_check(struct ipc_namespace *ns,                        int id){    struct kern_ipc_perm *ipcp = ipc_lock_check(&msg_ids(ns), id);     if (IS_ERR(ipcp))        return (struct msg_queue *)ipcp;     return container_of(ipcp, struct msg_queue, q_perm);}// ipc_lock_check()是一个能够check所有IPC object同步信息的函数,它的定义如下:struct kern_ipc_perm *ipc_lock_check(struct ipc_ids *ids, int id){    struct kern_ipc_perm *out;     out = ipc_lock(ids, id);    if (IS_ERR(out))        return out;     if (ipc_checkid(out, id)) {        ipc_unlock(out);        return ERR_PTR(-EIDRM);    }     return out;}// 这里的ipc_lock()是至关重要的地方!通过这个函数的注释,也能明白它的作用了:/** * ipc_lock - Lock an ipc structure without rw_mutex held * @ids: IPC identifier set * @id: ipc id to look for * * Look for an id in the ipc ids idr and lock the associated ipc object. * * The ipc object is locked on exit. */ struct kern_ipc_perm *ipc_lock(struct ipc_ids *ids, int id){    struct kern_ipc_perm *out;    int lid = ipcid_to_idx(id);     rcu_read_lock();    out = idr_find(&ids->ipcs_idr, lid);    if (out == NULL) {        rcu_read_unlock();        return ERR_PTR(-EINVAL);    }     spin_lock(&out->lock);         /* ipc_rmid() may have already freed the ID while ipc_lock     * was spinning: here verify that the structure is still valid     */    if (out->deleted) {        spin_unlock(&out->lock);        rcu_read_unlock();        return ERR_PTR(-EINVAL);    }     return out;}



end




Linux下的多进程编程

排序-冒泡排序
网络流媒体-RTP与RTCP

专注音视频技术、

编程语言学习笔记以及互联网信息分享与交流,

扫码关注






浏览 57
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报