Fast Message Queue (FMQ)

If you are looking for AIDL support, see also FMQ with AIDL.

HIDL's remote procedure call (RPC) infrastructure uses Binder mechanisms, meaning calls involve overhead, require kernel operations, and may trigger scheduler action. However, for cases where data must be transferred between processes with less overhead and no kernel involvement, the Fast Message Queue (FMQ) system is used.

FMQ creates message queues with the desired properties. An MQDescriptorSync or MQDescriptorUnsync object can be sent over a HIDL RPC call and used by the receiving process to access the message queue.

Fast Message Queues are supported only in C++ and on devices running Android 8.0 and higher.

MessageQueue types

Android supports two queue types (known as flavors):

  • Unsynchronized queues are allowed to overflow, and can have many readers; each reader must read data in time or lose it.
  • Synchronized queues are not allowed to overflow, and can have only one reader.

Both queue types are not allowed to underflow (read from an empty queue will fail) and can only have one writer.

Unsynchronized

An unsynchronized queue has only one writer, but can have any number of readers. There is one write position for the queue; however, each reader keeps track of its own independent read position.

Writes to the queue always succeed (are not checked for overflow) as long as they are no larger than the configured queue capacity (writes larger than the queue capacity fail immediately). As each reader may have a different read position, rather than waiting for every reader to read every piece of data, data is allowed to fall off the queue whenever new writes need the space.

Readers are responsible for retrieving data before it falls off the end of the queue. A read that attempts to read more data than is available either fails immediately (if nonblocking) or waits for enough data to be available (if blocking). A read that attempts to read more data than the queue capacity always fails immediately.

If a reader fails to keep up with the writer, so that the amount of data written and not yet read by that reader is larger than the queue capacity, the next read does not return data; instead, it resets the reader's read position to equal the latest write position then returns failure. If the data available to read is checked after overflow but before the next read, it shows more data available to read than the queue capacity, indicating overflow has occurred. (If the queue overflows between checking available data and attempting to read that data, the only indication of overflow is that the read fails.)

Readers of an Unsynchronized queue likely don't want to reset the read and write pointers of the queue. So, when creating the queue from the descriptor readers should use a `false` argument for the `resetPointers` parameter.

Synchronized

A synchronized queue has one writer and one reader with a single write position and a single read position. It is impossible to write more data than the queue has space for or read more data than the queue currently holds. Depending on whether the blocking or nonblocking write or read function is called, attempts to exceed available space or data either return failure immediately or block until the desired operation can be completed. Attempts to read or write more data than the queue capacity will always fail immediately.

Setting up an FMQ

A message queue requires multiple MessageQueue objects: one to be written to, and one or more to be read from. There is no explicit configuration of which object is used for writing or reading; it is up to the user to ensure that no object is used for both reading and writing, that there is at most one writer, and, for synchronized queues, that there is at most one reader.

Creating the first MessageQueue object

A message queue is created and configured with a single call:

#include <fmq/MessageQueue.h>
using android::hardware::kSynchronizedReadWrite;
using android::hardware::kUnsynchronizedWrite;
using android::hardware::MQDescriptorSync;
using android::hardware::MQDescriptorUnsync;
using android::hardware::MessageQueue;
....
// For a synchronized non-blocking FMQ
mFmqSynchronized =
  new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite>
      (kNumElementsInQueue);
// For an unsynchronized FMQ that supports blocking
mFmqUnsynchronizedBlocking =
  new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
      (kNumElementsInQueue, true /* enable blocking operations */);
  • The MessageQueue<T, flavor>(numElements) initializer creates and initializes an object that supports the message queue functionality.
  • The MessageQueue<T, flavor>(numElements, configureEventFlagWord) initializer creates and initializes an object that supports the message queue functionality with blocking.
  • flavor can be either kSynchronizedReadWrite for a synchronized queue or kUnsynchronizedWrite for an unsynchronized queue.
  • uint16_t (in this example) can be any HIDL-defined type that does not involve nested buffers (no string or vec types), handles, or interfaces.
  • kNumElementsInQueue indicates the size of queue in number of entries; it determines the size of shared memory buffer that will be allocated for the queue.

Creating the second MessageQueue object

The second side of the message queue is created using an MQDescriptor object obtained from the first side. The MQDescriptor object is sent over a HIDL or AIDL RPC call to the process that will hold the second end of the message queue. The MQDescriptor contains information about the queue, including:

  • Information to map the buffer and write pointer.
  • Information to map the read pointer (if the queue is synchronized).
  • Information to map the event flag word (if the queue is blocking).
  • Object type (<T, flavor>), which includes the HIDL-defined type of queue elements and the queue flavor (synchronized or unsynchronized).

The MQDescriptor object can be used to construct a MessageQueue object:

MessageQueue<T, flavor>::MessageQueue(const MQDescriptor<T, flavor>& Desc, bool resetPointers)

The resetPointers parameter indicates whether to reset the read and write positions to 0 while creating this MessageQueue object. In an unsynchronized queue, the read position (which is local to each MessageQueue object in unsynchronized queues) is always set to 0 during creation. Typically, the MQDescriptor is initialized during creation of the first message queue object. For extra control over the shared memory, you can set up the MQDescriptor manually (MQDescriptor is defined in system/libhidl/base/include/hidl/MQDescriptor.h) then create every MessageQueue object as described in this section.

Blocking queues and event flags

By default, queues do not support blocking reads/writes. There are two kinds of blocking read/write calls:

  • Short form, with three parameters (data pointer, number of items, timeout). Supports blocking on individual read/write operations on a single queue. When using this form, the queue will handle the event flag and bitmasks internally, and the first message queue object must be initialized with a second parameter of true. For example:
    // For an unsynchronized FMQ that supports blocking
    mFmqUnsynchronizedBlocking =
      new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
          (kNumElementsInQueue, true /* enable blocking operations */);
    
  • Long form, with six parameters (includes event flag and bitmasks). Supports using a shared EventFlag object between multiple queues and allows specifying the notification bit masks to be used. In this case, the event flag and bitmasks must be supplied to each read and write call.

For the long form, the EventFlag can be supplied explicitly in each readBlocking() and writeBlocking() call. One of the queues may be initialized with an internal event flag, which must then be extracted from that queue's MessageQueue objects using getEventFlagWord() and used to create EventFlag objects in each process for use with other FMQs. Alternatively, the EventFlag objects can be initialized with any suitable shared memory.

In general, each queue should use only one of non-blocking, short-form blocking, or long-form blocking. It is not an error to mix them, but careful programming is required to get the desired result.

Marking the memory as read only

By default, shared memory has read and write permissions. For unsynchronized queues (kUnsynchronizedWrite), the writer might want to remove write permissions for all of the readers before it hands out the MQDescriptorUnsync objects. This ensures the other processes can't write to the queue, which is recommended to protect against bugs or bad behavior in the reader processes. If the writer wants the readers to be able to reset the queue whenever they use the MQDescriptorUnsync to create the read side of the queue, then the memory can't be marked as read-only. This is the default behavior of the `MessageQueue` constructor. So, if there is already existing users of this queue, their code needs to be changed to construct the queue with resetPointer=false.

  • Writer: call ashmem_set_prot_region with a MQDescriptor file descriptor and region set to read-only (PROT_READ):
    int res = ashmem_set_prot_region(mqDesc->handle->data[0], PROT_READ)
  • Reader: create message queue with resetPointer=false (the default is true):
    mFmq = new (std::nothrow) MessageQueue(mqDesc, false);

Using the MessageQueue

The public API of the MessageQueue object is:

size_t availableToWrite()  // Space available (number of elements).
size_t availableToRead()  // Number of elements available.
size_t getQuantumSize()  // Size of type T in bytes.
size_t getQuantumCount() // Number of items of type T that fit in the FMQ.
bool isValid() // Whether the FMQ is configured correctly.
const MQDescriptor<T, flavor>* getDesc()  // Return info to send to other process.

bool write(const T* data)  // Write one T to FMQ; true if successful.
bool write(const T* data, size_t count) // Write count T's; no partial writes.

bool read(T* data);  // read one T from FMQ; true if successful.
bool read(T* data, size_t count);  // Read count T's; no partial reads.

bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0);
bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);

// Allows multiple queues to share a single event flag word
std::atomic<uint32_t>* getEventFlagWord();

bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr); // Blocking write operation for count Ts.

bool readBlocking(T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr) // Blocking read operation for count Ts;

//APIs to allow zero copy read/write operations
bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);
bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);

availableToWrite() and availableToRead() can be used to determine how much data can be transferred in a single operation. In an unsynchronized queue:

  • availableToWrite() always returns the capacity of the queue.
  • Each reader has its own read position and does its own calculation for availableToRead().
  • From the point of view of a slow reader, the queue is allowed to overflow; this may result in availableToRead() returning a value larger than the size of the queue. The first read after an overflow will fail and result in the read position for that reader being set equal to the current write pointer, whether or not the overflow was reported through availableToRead().

The read() and write() methods return true if all requested data could be (and was) transferred to/from the queue. These methods do not block; they either succeed (and return true), or return failure (false) immediately.

The readBlocking() and writeBlocking() methods wait until the requested operation can be completed, or until they timeout (a timeOutNanos value of 0 means never timeout).

Blocking operations are implemented using an event flag word. By default, each queue creates and uses its own flag word to support the short form of readBlocking() and writeBlocking(). It is possible for multiple queues to share a single word, so that a process can wait on writes or reads to any of the queues. A pointer to a queue's event flag word can be obtained by calling getEventFlagWord(), and that pointer (or any pointer to a suitable shared memory location) can be used to create an EventFlag object to pass into the long form of readBlocking() and writeBlocking()for a different queue. The readNotification and writeNotification parameters tell which bits in the event flag should be used to signal reads and writes on that queue. readNotification and writeNotification are 32-bit bitmasks.

readBlocking() waits on the writeNotification bits; if that parameter is 0, the call always fails. If the readNotification value is 0, the call will not fail, but a successful read will not set any notification bits. In a synchronized queue, this would mean that the corresponding writeBlocking() call will never wake up unless the bit is set elsewhere. In an unsynchronized queue, writeBlocking() will not wait (it should still be used to set the write notification bit), and it is appropriate for reads to not set any notification bits. Similarly, writeblocking() will fail if readNotification is 0, and a successful write sets the specified writeNotification bits.

To wait on multiple queues at once, use an EventFlag object's wait() method to wait on a bitmask of notifications. The wait() method returns a status word with the bits that caused the wake up set. This information is then used to verify the corresponding queue has enough space or data for the desired write/read operation and perform a nonblocking write()/read(). To get a post operation notification, use another call to the EventFlag's wake() method. For a definition of the EventFlag abstraction, refer to system/libfmq/include/fmq/EventFlag.h.

Zero copy operations

The read/write/readBlocking/writeBlocking() APIs take a pointer to an input/output buffer as an argument and use memcpy() calls internally to copy data between the same and the FMQ ring buffer. To improve performance, Android 8.0 and higher include a set of APIs that provide direct pointer access into the ring buffer, eliminating the need to use memcpy calls.

Use the following public APIs for zero copy FMQ operations:

bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);

bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);
  • The beginWrite method provides base pointers into the FMQ ring buffer. After the data is written, commit it using commitWrite(). ThebeginRead/commitRead methods act the same way.
  • The beginRead/Write methods take as input the number of messages to be read/written and return a boolean indicating if the read/write is possible. If the read or write is possible the memTx struct is populated with base pointers that can be used for direct pointer access into the ring buffer shared memory.
  • The MemRegion struct contains details about a block of memory, including the base pointer (base address of the memory block) and the length in terms of T (length of the memory block in terms of the HIDL-defined type of the message queue).
  • The MemTransaction struct contains two MemRegion structs, first and second as a read or write into the ring buffer may require a wrap around to the beginning of the queue. This would mean that two base pointers are needed to read/write data into the FMQ ring buffer.

To get the base address and length from a MemRegion struct:

T* getAddress(); // gets the base address
size_t getLength(); // gets the length of the memory region in terms of T
size_t getLengthInBytes(); // gets the length of the memory region in bytes

To get references to the first and second MemRegions within a MemTransaction object:

const MemRegion& getFirstRegion(); // get a reference to the first MemRegion
const MemRegion& getSecondRegion(); // get a reference to the second MemRegion

Example write to the FMQ using zero copy APIs:

MessageQueueSync::MemTransaction tx;
if (mQueue->beginRead(dataLen, &tx)) {
    auto first = tx.getFirstRegion();
    auto second = tx.getSecondRegion();

    foo(first.getAddress(), first.getLength()); // method that performs the data write
    foo(second.getAddress(), second.getLength()); // method that performs the data write

    if(commitWrite(dataLen) == false) {
       // report error
    }
} else {
   // report error
}

The following helper methods are also part of MemTransaction:

  • T* getSlot(size_t idx);
    Returns a pointer to slot idx within the MemRegions that are part of this MemTransaction object. If the MemTransaction object is representing the memory regions to read/write N items of type T, then the valid range of idx is between 0 and N-1.
  • bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1);
    Write nMessages items of type T into the memory regions described by the object, starting from index startIdx. This method uses memcpy() and is not to meant to be used for a zero copy operation. If the MemTransaction object represents memory to read/write N items of type T, then the valid range of idx is between 0 and N-1.
  • bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1);
    Helper method to read nMessages items of type T from the memory regions described by the object starting from startIdx. This method uses memcpy() and is not meant to be used for a zero copy operation.

Sending the queue over HIDL

On the creating side:

  1. Create message queue object as described above.
  2. Verify the object is valid with isValid().
  3. If you will be waiting on multiple queues by passing an EventFlag into the long form of readBlocking()/writeBlocking(), you can extract the event flag pointer (using getEventFlagWord()) from a MessageQueue object that was initialized to create the flag, and use that flag to create the necessary EventFlag object.
  4. Use the MessageQueue getDesc() method to get a descriptor object.
  5. In the .hal file, give the method a parameter of type fmq_sync or fmq_unsync where T is a suitable HIDL-defined type. Use this to send the object returned by getDesc() to the receiving process.

On the receiving side:

  1. Use the descriptor object to create a MessageQueue object. Be sure to use the same queue flavor and data type, or the template will fail to compile.
  2. If you extracted an event flag, extract the flag from the corresponding MessageQueue object in the receiving process.
  3. Use the MessageQueue object to transfer data.