Быстрая очередь сообщений (FMQ)

Инфраструктура удаленного вызова процедур (RPC) HIDL использует механизмы Binder, а это означает, что вызовы связаны с накладными расходами, требуют операций ядра и могут инициировать действия планировщика. Однако в случаях, когда данные должны передаваться между процессами с меньшими затратами и без участия ядра, используется система быстрой очереди сообщений (FMQ).

FMQ создает очереди сообщений с нужными свойствами. Объект MQDescriptorSync или MQDescriptorUnsync может быть отправлен через вызов HIDL RPC и использован принимающим процессом для доступа к очереди сообщений.

Очереди быстрых сообщений поддерживаются только в C++ и на устройствах под управлением Android 8.0 и выше.

Типы очереди сообщений

Android поддерживает два типа очередей (известных как ароматы ):

  • Несинхронизированные очереди могут переполняться и могут иметь много читателей; каждый читатель должен вовремя прочитать данные или потерять их.
  • Синхронизированные очереди не могут переполняться и могут иметь только одного читателя.

Оба типа очередей не могут быть переполнены (чтение из пустой очереди не будет выполнено) и могут иметь только один модуль записи.

Не синхронизировано

Несинхронизированная очередь имеет только один модуль записи, но может иметь любое количество читателей. В очереди есть одна позиция записи; однако каждый считыватель отслеживает свою собственную независимую позицию чтения.

Записи в очередь всегда завершаются успешно (не проверяются на переполнение), если они не превышают сконфигурированную емкость очереди (записи, превышающие емкость очереди, завершаются с ошибкой немедленно). Поскольку каждый считыватель может иметь другую позицию чтения, вместо того, чтобы ждать, пока каждый считыватель прочитает каждый фрагмент данных, данные могут удаляться из очереди всякий раз, когда новым операциям записи требуется место.

Операции чтения отвечают за извлечение данных до того, как они выпадут из очереди. Чтение, которое пытается прочитать больше данных, чем доступно, либо немедленно терпит неудачу (если не блокируется), либо ожидает, пока будет доступно достаточно данных (если блокируется). Чтение, которое пытается прочитать больше данных, чем вместимость очереди, всегда немедленно завершается ошибкой.

Если считыватель не успевает за писателем, так что объем данных, записанных и еще не прочитанных этим считывателем, превышает емкость очереди, следующее чтение не возвращает данные; вместо этого он сбрасывает позицию чтения устройства чтения, чтобы она совпадала с последней позицией записи, а затем возвращает ошибку. Если данные, доступные для чтения, проверяются после переполнения, но перед следующим чтением, отображается больше данных, доступных для чтения, чем емкость очереди, что указывает на то, что произошло переполнение. (Если очередь переполняется между проверкой доступных данных и попыткой чтения этих данных, единственным признаком переполнения является сбой чтения.)

Синхронизировано

Синхронизированная очередь имеет один модуль записи и один модуль чтения с одной позицией записи и одной позицией чтения. Невозможно записать больше данных, чем есть места в очереди, или прочитать больше данных, чем в настоящее время содержит очередь. В зависимости от того, вызывается блокирующая или неблокирующая функция записи или чтения, попытки превысить доступное пространство или данные либо немедленно возвращают ошибку, либо блокируются до тех пор, пока не будет завершена требуемая операция. Попытки прочитать или записать больше данных, чем вместимость очереди, всегда немедленно заканчиваются неудачей.

Настройка FMQ

Для очереди сообщений требуется несколько объектов MessageQueue : один для записи и один или несколько для чтения. Нет явной конфигурации того, какой объект используется для записи или чтения; пользователь должен убедиться, что ни один объект не используется как для чтения, так и для записи, что имеется не более одного записывающего устройства, а для синхронизированных очередей - не более одного считывающего устройства.

Создание первого объекта MessageQueue

Очередь сообщений создается и настраивается одним вызовом:

#include <fmq/MessageQueue.h>
using android::hardware::kSynchronizedReadWrite;
using android::hardware::kUnsynchronizedWrite;
using android::hardware::MQDescriptorSync;
using android::hardware::MQDescriptorUnsync;
using android::hardware::MessageQueue;
....
// For a synchronized non-blocking FMQ
mFmqSynchronized =
  new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite>
      (kNumElementsInQueue);
// For an unsynchronized FMQ that supports blocking
mFmqUnsynchronizedBlocking =
  new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
      (kNumElementsInQueue, true /* enable blocking operations */);
  • MessageQueue<T, flavor>(numElements) создает и инициализирует объект, поддерживающий функциональность очереди сообщений.
  • MessageQueue<T, flavor>(numElements, configureEventFlagWord) создает и инициализирует объект, поддерживающий функциональность очереди сообщений с блокировкой.
  • kSynchronizedReadWrite flavor синхронизированной очереди, либо kUnsynchronizedWrite для несинхронизированной очереди.
  • uint16_t (в этом примере) может быть любым типом, определенным в HIDL , который не включает вложенные буферы (без типов string или vec ), дескрипторы или интерфейсы.
  • kNumElementsInQueue указывает размер очереди в количестве записей; он определяет размер буфера разделяемой памяти, который будет выделен для очереди.

Создание второго объекта MessageQueue

Вторая сторона очереди сообщений создается с помощью объекта MQDescriptor , полученного из первой стороны. Объект MQDescriptor отправляется через вызов HIDL RPC процессу, который будет удерживать второй конец очереди сообщений. MQDescriptor содержит информацию об очереди, в том числе:

  • Информация для сопоставления буфера и указателя записи.
  • Информация для сопоставления указателя чтения (если очередь синхронизирована).
  • Информация для сопоставления слова флага события (если очередь блокируется).
  • Тип объекта ( <T, flavor> ), который включает определенный HIDL тип элементов очереди и разновидность очереди (синхронизированную или несинхронизированную).

Объект MQDescriptor можно использовать для создания объекта MessageQueue :

MessageQueue<T, flavor>::MessageQueue(const MQDescriptor<T, flavor>& Desc, bool resetPointers)

Параметр resetPointers указывает, следует ли сбрасывать позиции чтения и записи на 0 при создании этого объекта MessageQueue . В несинхронизированной очереди позиция чтения (локальная для каждого объекта MessageQueue в несинхронизированных очередях) всегда устанавливается на 0 во время создания. Обычно MQDescriptor инициализируется во время создания первого объекта очереди сообщений. Для дополнительного контроля над общей памятью вы можете настроить MQDescriptor вручную ( MQDescriptor определен в system/libhidl/base/include/hidl/MQDescriptor.h ), а затем создать каждый объект MessageQueue , как описано в этом разделе.

Блокирующие очереди и флаги событий

По умолчанию очереди не поддерживают блокировку чтения/записи. Существует два вида блокировки вызовов чтения/записи:

  • Краткая форма с тремя параметрами (указатель данных, количество элементов, тайм-аут). Поддерживает блокировку отдельных операций чтения/записи в одной очереди. При использовании этой формы очередь будет обрабатывать флаг события и битовые маски внутри, а первый объект очереди сообщений должен быть инициализирован вторым параметром, равным true . Например:
    // For an unsynchronized FMQ that supports blocking
    mFmqUnsynchronizedBlocking =
      new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
          (kNumElementsInQueue, true /* enable blocking operations */);
    
  • Long form, with six parameters (includes event flag and bitmasks). Supports using a shared EventFlag object between multiple queues and allows specifying the notification bit masks to be used. In this case, the event flag and bitmasks must be supplied to each read and write call.

For the long form, the EventFlag can be supplied explicitly in each readBlocking() and writeBlocking() call. One of the queues may be initialized with an internal event flag, which must then be extracted from that queue's MessageQueue objects using getEventFlagWord() and used to create EventFlag objects in each process for use with other FMQs. Alternatively, the EventFlag objects can be initialized with any suitable shared memory.

In general, each queue should use only one of non-blocking, short-form blocking, or long-form blocking. It is not an error to mix them, but careful programming is required to get the desired result.

Using the MessageQueue

The public API of the MessageQueue object is:

size_t availableToWrite()  // Space available (number of elements).
size_t availableToRead()  // Number of elements available.
size_t getQuantumSize()  // Size of type T in bytes.
size_t getQuantumCount() // Number of items of type T that fit in the FMQ.
bool isValid() // Whether the FMQ is configured correctly.
const MQDescriptor<T, flavor>* getDesc()  // Return info to send to other process.

bool write(const T* data)  // Write one T to FMQ; true if successful.
bool write(const T* data, size_t count) // Write count T's; no partial writes.

bool read(T* data);  // read one T from FMQ; true if successful.
bool read(T* data, size_t count);  // Read count T's; no partial reads.

bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0);
bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);

// Allows multiple queues to share a single event flag word
std::atomic<uint32_t>* getEventFlagWord();

bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr); // Blocking write operation for count Ts.

bool readBlocking(T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr) // Blocking read operation for count Ts;

//APIs to allow zero copy read/write operations
bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);
bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);

availableToWrite() и availableToRead() можно использовать для определения того, сколько данных может быть передано за одну операцию. В несинхронизированной очереди:

  • availableToWrite() всегда возвращает емкость очереди.
  • Каждый читатель имеет свою собственную позицию чтения и выполняет свои собственные вычисления для availableToRead() .
  • С точки зрения медленного читателя, очередь может переполняться; это может привести к тому, что availableToRead() вернет значение, превышающее размер очереди. Первое чтение после переполнения завершится ошибкой и приведет к тому, что позиция чтения для этого считывателя будет установлена ​​равной текущему указателю записи, независимо от того, сообщалось ли о переполнении через availableToRead() .

Методы read() и write() возвращают true , если все запрошенные данные могут быть (и были) переданы в/из очереди. Эти методы не блокируют; они либо преуспевают (и возвращают true ), либо немедленно возвращают ошибку ( false ).

readBlocking() и writeBlocking() ожидают завершения запрошенной операции или истечения времени ожидания (значение timeOutNanos , равное 0, означает, что время ожидания никогда не истекает).

Операции блокировки реализуются с помощью слова флага события. По умолчанию каждая очередь создает и использует собственное флаговое слово для поддержки сокращенной формы readBlocking() и writeBlocking() . Несколько очередей могут совместно использовать одно слово, так что процесс может ожидать записи или чтения в любую из очередей. Указатель на слово флага события очереди можно получить, вызвав getEventFlagWord() , и этот указатель (или любой указатель на подходящее место в общей памяти) можно использовать для создания объекта EventFlag для передачи в длинную форму readBlocking() и writeBlocking() для другой очереди. Параметры readNotification и writeNotification , какие биты во флаге события следует использовать для сигнализации операций чтения и записи в этой очереди. readNotification и writeNotification — это 32-битные битовые маски.

readBlocking() ожидает битов writeNotification ; если этот параметр равен 0, вызов всегда терпит неудачу. Если значение readNotification равно 0, вызов не завершится ошибкой, но успешное чтение не установит никаких битов уведомления. В синхронизированной очереди это будет означать, что соответствующий writeBlocking() никогда не проснется, если бит не установлен в другом месте. В несинхронизированной очереди writeBlocking() не будет ждать (ее все равно следует использовать для установки бита уведомления о записи), и для операций чтения целесообразно не устанавливать никаких битов уведомления. Точно так же writeblocking() завершится ошибкой, если readNotification равен 0, а успешная запись устанавливает указанные биты writeNotification .

Чтобы ожидать одновременно несколько очередей, используйте метод wait() объекта EventFlag для ожидания битовой маски уведомлений. Метод wait() возвращает слово состояния с установленными битами, вызвавшими пробуждение. Затем эта информация используется для проверки того, что в соответствующей очереди достаточно места или данных для желаемой операции записи/чтения, и выполнения неблокирующей write() / read() . Чтобы получить уведомление о завершении операции, используйте еще один вызов метода wake() EventFlag . Для определения абстракции EventFlag обратитесь к system/libfmq/include/fmq/EventFlag.h .

Нулевые операции копирования

API read / write / readBlocking / writeBlocking() принимают указатель на буфер ввода/вывода в качестве аргумента и используют внутренние вызовы memcpy() для копирования данных между ним и кольцевым буфером FMQ. Для повышения производительности Android 8.0 и более поздние версии включают набор API-интерфейсов, обеспечивающих прямой доступ указателя к кольцевому буферу, что устраняет необходимость использования вызовов memcpy .

Используйте следующие общедоступные API для операций FMQ с нулевым копированием:

bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);

bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);
  • Метод beginWrite предоставляет базовые указатели в кольцевой буфер FMQ. После записи данных зафиксируйте их с помощью commitWrite() . Точно так же действуют методы beginRead / commitRead .
  • beginRead / Write принимают в качестве входных данных количество сообщений для чтения/записи и возвращают логическое значение, указывающее, возможно ли чтение/запись. Если чтение или запись возможны, структура memTx заполняется базовыми указателями, которые можно использовать для прямого доступа указателей к разделяемой памяти кольцевого буфера.
  • Структура MemRegion содержит сведения о блоке памяти, включая базовый указатель (базовый адрес блока памяти) и длину в терминах T (длина блока памяти в терминах определенного HIDL типа очереди сообщений).
  • Структура MemTransaction содержит две структуры MemRegion , first и second , поскольку для чтения или записи в кольцевой буфер может потребоваться переход к началу очереди. Это означает, что для чтения/записи данных в кольцевой буфер FMQ необходимы два базовых указателя.

Чтобы получить базовый адрес и длину из структуры MemRegion :

T* getAddress(); // gets the base address
size_t getLength(); // gets the length of the memory region in terms of T
size_t getLengthInBytes(); // gets the length of the memory region in bytes

Чтобы получить ссылки на первый и второй MemRegion в объекте MemTransaction :

const MemRegion& getFirstRegion(); // get a reference to the first MemRegion
const MemRegion& getSecondRegion(); // get a reference to the second MemRegion

Пример записи в FMQ с использованием API с нулевым копированием:

MessageQueueSync::MemTransaction tx;
if (mQueue->beginRead(dataLen, &tx)) {
    auto first = tx.getFirstRegion();
    auto second = tx.getSecondRegion();

    foo(first.getAddress(), first.getLength()); // method that performs the data write
    foo(second.getAddress(), second.getLength()); // method that performs the data write

    if(commitWrite(dataLen) == false) {
       // report error
    }
} else {
   // report error
}

Следующие вспомогательные методы также являются частью MemTransaction :

  • T* getSlot(size_t idx);
    Возвращает указатель на idx слота в MemRegions , которые являются частью этого объекта MemTransaction . Если объект MemTransaction представляет области памяти для чтения/записи N элементов типа T, то допустимый диапазон idx находится между 0 и N-1.
  • bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1);
    Запишите элементы nMessages типа T в области памяти, описанные объектом, начиная с индекса startIdx . Этот метод использует memcpy() и не предназначен для использования в операции нулевого копирования. Если объект MemTransaction представляет память для чтения/записи N элементов типа T, то допустимый диапазон idx находится между 0 и N-1.
  • bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1);
    Вспомогательный метод для чтения элементов nMessages типа T из областей памяти, описанных объектом, начиная с startIdx . Этот метод использует memcpy() и не предназначен для использования в операции нулевого копирования.

Отправка очереди по HIDL

На стороне создания:

  1. Создайте объект очереди сообщений, как описано выше.
  2. Убедитесь, что объект действителен с помощью isValid() .
  3. Если вы будете ожидать в нескольких очередях, передав EventFlag в длинную форму readBlocking() / writeBlocking() , вы можете извлечь указатель флага события (используя getEventFlagWord() ) из объекта MessageQueue , который был инициализирован для создания флага, и используйте этот флаг для создания необходимого объекта EventFlag .
  4. Используйте метод MessageQueue getDesc() для получения объекта-дескриптора.
  5. В файле .hal дайте методу параметр типа fmq_sync или fmq_unsync где T — подходящий тип, определенный HIDL. Используйте это, чтобы отправить объект, возвращенный getDesc() , в принимающий процесс.

На принимающей стороне:

  1. Используйте объект дескриптора для создания объекта MessageQueue . Обязательно используйте ту же разновидность очереди и тип данных, иначе шаблон не скомпилируется.
  2. Если вы извлекли флаг события, извлеките флаг из соответствующего объекта MessageQueue в принимающем процессе.
  3. Используйте объект MessageQueue для передачи данных.