Message Notification

A feature that distinguishes POSIX message queues from their System V counterparts is the ability to receive asynchronous notification of the availability of a message on a previously empty queue (i.e., when the queue transitions from being empty to nonempty). This feature means that instead of making a blocking mq_receive() call or marking the message queue descriptor nonblocking and performing periodic mq_receive() calls (“polls”) on the queue, a process can request a notification of message arrival and then perform other tasks until it is notified. A process can choose to be notified either via a signal or via invocation of a function in a separate thread.

Note

The notification feature of POSIX message queues is similar to the notification facility that we described for POSIX timers in Section 23.6. (Both of these APIs originated in POSIX.1b.)

The mq_notify() function registers the calling process to receive a notification when a message arrives on the empty queue referred to by the descriptor mqdes.

#include <mqueue.h>

int mq_notify(mqd_t mqdes, const struct sigevent *notification);

Note

Returns 0 on success, or -1 on error

The notification argument specifies the mechanism by which the process is to be notified. Before going into the details of the notification argument, we note a few points about message notification:

  • At any time, only one process (“the registered process”) can be registered to receive a notification from a particular message queue. If there is already a process registered for a message queue, further attempts to register for that queue fail (mq_notify() fails with the error EBUSY).

  • The registered process is notified only when a new message arrives on a queue that was previously empty. If a queue already contains messages at the time of the registration, a notification will occur only after the queue is emptied and a new message arrives.

  • After one notification is sent to the registered process, the registration is removed, and any process can then register itself for notification. In other words, as long as a process wishes to keep receiving notifications, it must reregister itself after each notification by once again calling mq_notify().

  • The registered process is notified only if some other process is not currently blocked in a call to mq_receive() for the queue. If some other process is blocked in mq_receive(), that process will read the message, and the registered process will remain registered.

  • A process can explicitly deregister itself as the target for message notification by calling mq_notify() with a notification argument of NULL.

We already showed the sigevent structure that is used to type the notification argument in Creating a Timer: timer_create(). Here, we present the structure in simplified form, showing just those fields relevant to the discussion of mq_notify():

union sigval {
    int    sival_int;             /* Integer value for accompanying data */
    void  *sival_ptr;             /* Pointer value for accompanying data */
};

struct sigevent {
    int    sigev_notify;          /* Notification method */
    int    sigev_signo;           /* Notification signal for SIGEV_SIGNAL */
    union sigval sigev_value;     /* Value passed to signal handler or
                                     thread function */
    void (*sigev_notify_function) (union sigval);
                                  /* Thread notification function */
    void  *sigev_notify_attributes;   /* Really 'pthread_attr_t' */
};

The sigev_notify field of this structure is set to one of the following values:

SIGEV_NONE

Register this process for notification, but when a message arrives on the previously empty queue, don’t actually notify the process. As usual, the registration is removed when a new messages arrives on an empty queue.

SIGEV_SIGNAL

Notify the process by generating the signal specified in the sigev_signo field. The sigev_value field specifies data to accompany the signal (). This data can be retrieved via the si_value field of the siginfo_t structure that is passed to the signal handler or returned by a call to sigwaitinfo() or sigtimedwait(). The following fields in the siginfo_t structure are also filled in: si_code, with the value SI_MESGQ; si_signo, with the signal number; si_pid, with the process ID of the process that sent the message; and si_uid, with the real user ID of the process that sent the message. (The si_pid and si_uid fields are not set on most other implementations.)

SIGEV_THREAD

Notify the process by calling the function specified in sigev_notify_function as if it were the start function in a new thread. The sigev_notify_attributes field can be specified as NULL or as a pointer to a pthread_attr_t structure that defines attributes for the thread (Thread Attributes). The union sigval value specified in sigev_value is passed as the argument of this function.

Example 52-6 provides an example of message notification using signals. This program performs the following steps:

Example 52-6. Receiving message notification via a signal

pmsg/mq_notify_sig.c
    #include <signal.h>
    #include <mqueue.h>
    #include <fcntl.h>              /* For definition of O_NONBLOCK */
    #include "tlpi_hdr.h"

    #define NOTIFY_SIG SIGUSR1

    static void
    handler(int sig)
    {
        /* Just interrupt sigsuspend() */
    }

    int
    main(int argc, char *argv[])
    {
        struct sigevent sev;
        mqd_t mqd;
        struct mq_attr attr;
        void *buffer;
        ssize_t numRead;
        sigset_t blockMask, emptyMask;
        struct sigaction sa;

        if (argc != 2 || strcmp(argv[1], "--help") == 0)
            usageErr("%s mq-name\n", argv[0]);

    mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK);
        if (mqd == (mqd_t) -1)
            errExit("mq_open");

    if (mq_getattr(mqd, &attr) == -1)
            errExit("mq_getattr");

    buffer = malloc(attr.mq_msgsize);
        if (buffer == NULL)
            errExit("malloc");

    sigemptyset(&blockMask);
        sigaddset(&blockMask, NOTIFY_SIG);
        if (sigprocmask(SIG_BLOCK, &blockMask, NULL) == -1)
            errExit("sigprocmask");

            sigemptyset(&sa.sa_mask);
        sa.sa_flags = 0;
        sa.sa_handler = handler;
        if (sigaction(NOTIFY_SIG, &sa, NULL) == -1)
            errExit("sigaction");

    sev.sigev_notify = SIGEV_SIGNAL;
        sev.sigev_signo = NOTIFY_SIG;
        if (mq_notify(mqd, &sev) == -1)
            errExit("mq_notify");

        sigemptyset(&emptyMask);

        for (;;) {
        sigsuspend(&emptyMask);         /* Wait for notification signal */

        if (mq_notify(mqd, &sev) == -1)
                errExit("mq_notify");

        while ((numRead = mq_receive(mqd, buffer,
 attr.mq_msgsize, NULL)) >= 0)
                printf("Read %ld bytes\n", (long) numRead);

            if (errno != EAGAIN)            /* Unexpected error */
                errExit("mq_receive");
        }
    }
        pmsg/mq_notify_sig.c

Various aspects of the program in Example 52-6 merit further comment:

Example 52-7 provides an example of message notification using threads. This program shares a number of design features with the program in Example 52-6:

Note the following further points regarding the design of the program in Example 52-7: