A feature that distinguishes POSIX message queues from their System V counterparts is the ability to receive asynchronous notification of the availability of a message on a previously empty queue (i.e., when the queue transitions from being empty to nonempty). This feature means that instead of making a blocking mq_receive() call or marking the message queue descriptor nonblocking and performing periodic mq_receive() calls (“polls”) on the queue, a process can request a notification of message arrival and then perform other tasks until it is notified. A process can choose to be notified either via a signal or via invocation of a function in a separate thread.
The notification feature of POSIX message queues is similar to the notification facility that we described for POSIX timers in Section 23.6. (Both of these APIs originated in POSIX.1b.)
The mq_notify() function registers the calling process to receive a notification when a message arrives on the empty queue referred to by the descriptor mqdes.
#include <mqueue.h>
int mq_notify
(mqd_t mqdes, const struct sigevent *notification);
Returns 0 on success, or -1 on error
The notification argument specifies the mechanism by which the process is to be notified. Before going into the details of the notification argument, we note a few points about message notification:
At any time, only one process (“the registered process”) can be registered to receive a notification from a particular message queue. If there is already a process registered for a message queue, further attempts to register for that queue fail (mq_notify() fails with the error EBUSY
).
The registered process is notified only when a new message arrives on a queue that was previously empty. If a queue already contains messages at the time of the registration, a notification will occur only after the queue is emptied and a new message arrives.
After one notification is sent to the registered process, the registration is removed, and any process can then register itself for notification. In other words, as long as a process wishes to keep receiving notifications, it must reregister itself after each notification by once again calling mq_notify().
The registered process is notified only if some other process is not currently blocked in a call to mq_receive() for the queue. If some other process is blocked in mq_receive(), that process will read the message, and the registered process will remain registered.
A process can explicitly deregister itself as the target for message notification by calling mq_notify() with a notification argument of NULL
.
We already showed the sigevent structure that is used to type the notification argument in Creating a Timer: timer_create(). Here, we present the structure in simplified form, showing just those fields relevant to the discussion of mq_notify():
union sigval { int sival_int; /* Integer value for accompanying data */ void *sival_ptr; /* Pointer value for accompanying data */ }; struct sigevent { int sigev_notify; /* Notification method */ int sigev_signo; /* Notification signal for SIGEV_SIGNAL */ union sigval sigev_value; /* Value passed to signal handler or thread function */ void (*sigev_notify_function) (union sigval); /* Thread notification function */ void *sigev_notify_attributes; /* Really 'pthread_attr_t' */ };
The sigev_notify field of this structure is set to one of the following values:
SIGEV_NONE
Register this process for notification, but when a message arrives on the previously empty queue, don’t actually notify the process. As usual, the registration is removed when a new messages arrives on an empty queue.
SIGEV_SIGNAL
Notify the process by generating the signal specified in the sigev_signo field. The sigev_value field specifies data to accompany the signal (). This data can be retrieved via the si_value field of the siginfo_t structure that is passed to the signal handler or returned by a call to sigwaitinfo() or sigtimedwait(). The following fields in the siginfo_t structure are also filled in: si_code, with the value SI_MESGQ
; si_signo, with the signal number; si_pid, with the process ID of the process that sent the message; and si_uid, with the real user ID of the process that sent the message. (The si_pid and si_uid fields are not set on most other implementations.)
SIGEV_THREAD
Notify the process by calling the function specified in sigev_notify_function as if it were the start function in a new thread. The sigev_notify_attributes field can be specified as NULL
or as a pointer to a pthread_attr_t structure that defines attributes for the thread (Thread Attributes). The union sigval value specified in sigev_value is passed as the argument of this function.
Example 52-6 provides an example of message notification using signals. This program performs the following steps:
Open the message queue named on the command line in nonblocking mode , determine the mq_msgsize attribute for the queue
, and allocate a buffer of that size for receiving messages
.
Block the notification signal (SIGUSR1
) and establish a handler for it .
Make an initial call to mq_notify() to register the process to receive message notification .
Execute an infinite loop that performs the following steps:
Call sigsuspend(), which unblocks the notification signal and waits until the signal is caught . Return from this system call indicates that a message notification has occurred. At this point, the process will have been deregistered for message notification.
Call mq_notify() to reregister this process to receive message notification .
Execute a while
loop that drains the queue by reading as many messages as possible .
Example 52-6. Receiving message notification via a signal
pmsg/mq_notify_sig.c
#include <signal.h> #include <mqueue.h> #include <fcntl.h> /* For definition of O_NONBLOCK */ #include "tlpi_hdr.h" #define NOTIFY_SIG SIGUSR1 static void handler(int sig) { /* Just interrupt sigsuspend() */ } int main(int argc, char *argv[]) { struct sigevent sev; mqd_t mqd; struct mq_attr attr; void *buffer; ssize_t numRead; sigset_t blockMask, emptyMask; struct sigaction sa; if (argc != 2 || strcmp(argv[1], "--help") == 0) usageErr("%s mq-name\n", argv[0]);mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK); if (mqd == (mqd_t) -1) errExit("mq_open");
if (mq_getattr(mqd, &attr) == -1) errExit("mq_getattr");
buffer = malloc(attr.mq_msgsize); if (buffer == NULL) errExit("malloc");
sigemptyset(&blockMask); sigaddset(&blockMask, NOTIFY_SIG); if (sigprocmask(SIG_BLOCK, &blockMask, NULL) == -1) errExit("sigprocmask"); sigemptyset(&sa.sa_mask); sa.sa_flags = 0; sa.sa_handler = handler; if (sigaction(NOTIFY_SIG, &sa, NULL) == -1) errExit("sigaction");
sev.sigev_notify = SIGEV_SIGNAL; sev.sigev_signo = NOTIFY_SIG; if (mq_notify(mqd, &sev) == -1) errExit("mq_notify"); sigemptyset(&emptyMask); for (;;) {
sigsuspend(&emptyMask); /* Wait for notification signal */
if (mq_notify(mqd, &sev) == -1) errExit("mq_notify");
while ((numRead = mq_receive(mqd, buffer, attr.mq_msgsize, NULL)) >= 0) printf("Read %ld bytes\n", (long) numRead); if (errno != EAGAIN) /* Unexpected error */ errExit("mq_receive"); } }
pmsg/mq_notify_sig.c
Various aspects of the program in Example 52-6 merit further comment:
We block the notification signal and use sigsuspend() to wait for it, rather than pause(), to prevent the possibility of missing a signal that is delivered while the program is executing elsewhere (i.e., is not blocked waiting for signals) in the for
loop. If this occurred, and we were using pause() to wait for signals, then the next call to pause() would block, even though a signal had already been delivered.
We open the queue in nonblocking mode, and, whenever a notification occurs, we use a while
loop to read all messages from the queue. Emptying the queue in this way ensures that a further notification is generated when a new message arrives. Employing nonblocking mode means that the while
loop will terminate (mq_receive() will fail with the error EAGAIN
) when we have emptied the queue. (This approach is analogous to the use of nonblocking I/O with edge-triggered I/O notification, which we describe in , and is employed for similar reasons.)
Within the for
loop, it is important that we reregister for message notification before reading all messages from the queue. If we reversed these steps, the following sequence could occur: all messages are read from the queue, and the while
loop terminates; another message is placed on the queue; mq_notify() is called to reregister for message notification. At this point, no further notification signal would be generated, because the queue is already nonempty. Consequently, the program would remain permanently blocked in its next call to sigsuspend().
Example 52-7 provides an example of message notification using threads. This program shares a number of design features with the program in Example 52-6:
Example 52-7. Receiving message notification via a thread
pmsg/mq_notify_thread.c
#include <pthread.h> #include <mqueue.h> #include <fcntl.h> /* For definition of O_NONBLOCK */ #include "tlpi_hdr.h" static void notifySetup(mqd_t *mqdp); static void /* Thread notification function */threadFunc(union sigval sv) { ssize_t numRead; mqd_t *mqdp; void *buffer; struct mq_attr attr; mqdp = sv.sival_ptr; if (mq_getattr(*mqdp, &attr) == -1) errExit("mq_getattr"); buffer = malloc(attr.mq_msgsize); if (buffer == NULL) errExit("malloc");
notifySetup(mqdp); while ((numRead = mq_receive(*mqdp, buffer, attr.mq_msgsize, NULL)) >= 0) printf("Read %ld bytes\n", (long) numRead); if (errno != EAGAIN) /* Unexpected error */ errExit("mq_receive"); free(buffer); pthread_exit(NULL); } static void notifySetup(mqd_t *mqdp) { struct sigevent sev;
sev.sigev_notify = SIGEV_THREAD; /* Notify via thread */ sev.sigev_notify_function = threadFunc; sev.sigev_notify_attributes = NULL; /* Could be pointer to pthread_attr_t structure */
sev.sigev_value.sival_ptr = mqdp; /* Argument to threadFunc() */ if (mq_notify(*mqdp, &sev) == -1) errExit("mq_notify"); } int main(int argc, char *argv[]) { mqd_t mqd; if (argc != 2 || strcmp(argv[1], "--help") == 0) usageErr("%s mq-name\n", argv[0]);
mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK); if (mqd == (mqd_t) -1) errExit("mq_open");
notifySetup(&mqd); pause(); /* Wait for notifications via thread function */ }
pmsg/mq_notify_thread.c
Note the following further points regarding the design of the program in Example 52-7:
The program requests notification via a thread, by specifying SIGEV_THREAD
in the sigev_notify field of the sigevent structure passed to mq_notify(). The thread’s start function, threadFunc(), is specified in the sigev_notify_function field .
After enabling message notification, the main program pauses indefinitely ; timer notifications are delivered by invocations of threadFunc() in a separate thread
.
We could have made the message queue descriptor, mqd, visible in threadFunc() by making it a global variable. However, we adopted a different approach to illustrate the alternative: we place the address of the message queue descriptor in the sigev_value.sival_ptr field that is passed to mq_notify(). When threadFunc() is later invoked, this address is passed as its argument.
We must assign a pointer to the message queue descriptor to sigev_value.sival_ptr, rather than (some cast version of) the descriptor itself because, other than the stipulation that it is not an array type, SUSv3 makes no guarantee about the nature or size of the type used to represent the mqd_t data type.