Fast Message Queue (FMQ)

Die RPC-Infrastruktur (Remote Procedure Call) von HIDL verwendet Bindermechanismen. Das bedeutet, dass Aufrufe Overhead verursachen, Kernelvorgänge erfordern und eine Scheduler-Aktion auslösen können. Wenn Daten jedoch mit weniger Overhead und ohne Einbindung des Kernels zwischen Prozessen übertragen werden müssen, wird das Fast Message Queue-System (FMQ) verwendet.

FMQ erstellt Nachrichtenwarteschlangen mit den gewünschten Eigenschaften. Sie können ein MQDescriptorSync- oder MQDescriptorUnsync-Objekt über einen HIDL-RPC-Aufruf senden. Das Objekt wird vom empfangenden Prozess verwendet, um auf die Nachrichtenwarteschlange zuzugreifen.

Warteschlangentypen

Android unterstützt zwei Warteschlangentypen (sogenannte flavors):

  • Nicht synchronisierte Warteschlangen können überlaufen und viele Leser haben. Jeder Leser muss Daten rechtzeitig lesen oder verlieren.
  • Synchronisierte Warteschlangen dürfen nicht überlaufen und haben nur einen Leser.

Beide Warteschlangentypen dürfen keinen Unterlauf ausführen (das Lesen aus einer leeren Warteschlange schlägt fehl) und kann nur einen Schreiber haben.

Nicht synchronisierte Warteschlangen

Eine nicht synchronisierte Warteschlange hat nur einen Schreiber, aber eine beliebige Anzahl von Lesern. Es gibt eine Schreibposition für die Warteschlange. Jeder Leser überwacht jedoch seine eigene unabhängige Leseposition.

Schreibvorgänge in die Warteschlange sind immer erfolgreich (nicht auf Überlauf geprüft), solange sie nicht größer als die konfigurierte Warteschlangenkapazität sind (Schreibvorgänge, die größer als die Warteschlangenkapazität sind, schlagen sofort fehl). Da jeder Leser eine andere Leseposition haben kann, werden die Daten nicht mehr in der Warteschlange gehalten, bis jeder Leser alle Daten gelesen hat. Stattdessen werden sie aus der Warteschlange entfernt, wenn für neue Schreibvorgänge Speicherplatz benötigt wird.

Leser sind dafür verantwortlich, Daten abzurufen, bevor sie am Ende der Warteschlange ankommen. Ein Lesevorgang, der versucht, mehr Daten zu lesen, als verfügbar sind, schlägt entweder sofort fehl (falls nicht blockiert) oder wartet, bis genügend Daten verfügbar sind (bei Blockierung). Ein Lesevorgang, bei dem versucht wird, mehr Daten zu lesen, als die Warteschlangenkapazität zulässt, schlägt immer sofort fehl.

Wenn ein Leser mit dem Schreiber nicht Schritt halten kann, sodass die Menge der geschriebenen und noch nicht gelesenen Daten von diesem Lesegerät größer als die Warteschlangenkapazität ist, gibt der nächste Lesevorgang keine Daten zurück. Stattdessen wird die Leseposition des Lesegeräts auf die letzte Schreibposition zurückgesetzt und dann der Fehler zurückgegeben. Wenn die zum Lesen verfügbaren Daten nach dem Überlauf, aber vor dem nächsten Lesen geprüft werden, werden mehr Daten zum Lesen angezeigt als die Kapazität der Warteschlange. Dies weist auf einen Überlauf hin. Wenn die Warteschlange zwischen der Prüfung der verfügbaren Daten und dem Versuch, diese Daten zu lesen, überläuft, ist der einzige Hinweis auf einen Überlauf, dass das Lesen fehlschlägt.

Synchronisierte Warteschlangen

Eine synchronisierte Warteschlange hat einen Writer und einen Reader mit einer einzelnen Schreibposition und einer einzigen Leseposition. Es ist nicht möglich, mehr Daten zu schreiben, als in der Warteschlange Platz ist, oder mehr Daten zu lesen, als sich derzeit in der Warteschlange befinden. Je nachdem, ob die blockierende oder nicht blockierende Schreib- oder Lesefunktion aufgerufen wird, werden Versuche, den verfügbaren Speicherplatz oder die Daten zu überschreiten, entweder sofort als Fehler zurückgegeben oder blockiert, bis der gewünschte Vorgang abgeschlossen werden kann. Versuche, mehr Daten zu lesen oder zu schreiben als die Kapazität der Warteschlange, schlagen immer sofort fehl.

FMQ einrichten

Für eine Nachrichtenwarteschlange sind mehrere MessageQueue-Objekte erforderlich: eines, in das geschrieben werden soll, und ein oder mehrere, aus denen gelesen werden soll. Es gibt keine explizite Konfiguration, welches Objekt zum Schreiben oder Lesen verwendet wird. Der Nutzer muss dafür sorgen, dass kein Objekt sowohl zum Lesen als auch Schreiben verwendet wird, dass es höchstens einen Autor und bei synchronisierten Warteschlangen maximal einen Leser gibt.

Erstes MessageQueue-Objekt erstellen

Eine Nachrichtenwarteschlange wird in einem einzigen Aufruf erstellt und konfiguriert:

#include <fmq/MessageQueue.h>
using android::hardware::kSynchronizedReadWrite;
using android::hardware::kUnsynchronizedWrite;
using android::hardware::MQDescriptorSync;
using android::hardware::MQDescriptorUnsync;
using android::hardware::MessageQueue;
....
// For a synchronized nonblocking FMQ
mFmqSynchronized =
  new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite>
      (kNumElementsInQueue);
// For an unsynchronized FMQ that supports blocking
mFmqUnsynchronizedBlocking =
  new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
      (kNumElementsInQueue, true /* enable blocking operations */);
  • Der MessageQueue<T, flavor>(numElements)-Initialisierer erstellt und initialisiert ein Objekt, das die Nachrichtenwarteschlangenfunktion unterstützt.
  • Der MessageQueue<T, flavor>(numElements, configureEventFlagWord)-Initialisierer erstellt und initialisiert ein Objekt, das die Nachrichtenwarteschlangenfunktion mit Blockierung unterstützt.
  • flavor kann entweder kSynchronizedReadWrite für eine synchronisierte Warteschlange oder kUnsynchronizedWrite für eine nicht synchronisierte Warteschlange sein.
  • uint16_t (in diesem Beispiel) kann ein beliebiger von HIDL definierter Typ sein, der keine verschachtelten Puffer (keine string- oder vec-Typen), Handles oder Schnittstellen enthält.
  • kNumElementsInQueue gibt die Größe der Warteschlange als Anzahl der Einträge an und bestimmt die Größe des Zwischenspeichers für gemeinsamen Arbeitsspeicher, der der Warteschlange zugewiesen ist.

Zweites MessageQueue-Objekt erstellen

Die zweite Seite der Nachrichtenwarteschlange wird mit einem MQDescriptor-Objekt erstellt, das von der ersten Seite abgerufen wird. Das MQDescriptor-Objekt wird über einen HIDL- oder AIDL-RPC-Aufruf an den Prozess gesendet, der das zweite Ende der Nachrichtenwarteschlange enthält. MQDescriptor enthält Informationen zur Warteschlange, darunter:

  • Informationen zum Zuordnen des Buffers und des Schreibzeigers.
  • Informationen zum Zuordnen des Lesezeigers (falls die Warteschlange synchronisiert ist).
  • Informationen zum Zuordnen des Ereignis-Flags (falls die Warteschlange blockiert).
  • Objekttyp (<T, flavor>), einschließlich des von HIDL definierten Typs von Warteschlangenelementen und der Warteschlangenvariante (synchronisiert oder nicht synchronisiert).

Mit dem MQDescriptor-Objekt können Sie ein MessageQueue-Objekt erstellen:

MessageQueue<T, flavor>::MessageQueue(const MQDescriptor<T, flavor>& Desc, bool resetPointers)

Der Parameter resetPointers gibt an, ob die Lese- und Schreibpositionen beim Erstellen dieses MessageQueue-Objekts auf 0 zurückgesetzt werden sollen. In einer nicht synchronisierten Warteschlange wird die Leseposition (die für jedes MessageQueue-Objekt in nicht synchronisierten Warteschlangen lokal ist) beim Erstellen immer auf 0 gesetzt. Normalerweise wird MQDescriptor beim Erstellen des ersten Nachrichtenqueue-Objekts initialisiert. Für zusätzliche Kontrolle über den gemeinsamen Speicher können Sie MQDescriptor manuell einrichten (MQDescriptor ist in system/libhidl/base/include/hidl/MQDescriptor.h definiert) und dann jedes MessageQueue-Objekt wie in diesem Abschnitt beschrieben erstellen.

Warteschlangen und Ereignis-Flags blockieren

Standardmäßig unterstützen Warteschlangen das Blockieren von Lese- und Schreibvorgängen nicht. Es gibt zwei Arten von Blockierungsaufrufen für Lese- und Schreibvorgänge:

  • Die Kurzform mit drei Parametern (Datenpointer, Anzahl der Elemente, Zeitüberschreitung) unterstützt das Blockieren einzelner Lese- und Schreibvorgänge in einer einzelnen Warteschlange. Bei Verwendung dieses Formulars werden das Ereignisflag und die Bitmasken intern von der Warteschlange verarbeitet. Das erste Nachrichtenwarteschlangenobjekt muss mit einem zweiten Parameter von true initialisiert werden. Beispiel:
    // For an unsynchronized FMQ that supports blocking
    mFmqUnsynchronizedBlocking =
      new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
          (kNumElementsInQueue, true /* enable blocking operations */);
    
  • Lange Version mit sechs Parametern (einschließlich Ereignis-Flag und Bitmasken) unterstützt die Verwendung eines gemeinsamen EventFlag-Objekts für mehrere Warteschlangen und ermöglicht das Angeben der zu verwendenden Bitmasken für Benachrichtigungen. In diesem Fall müssen das Ereignisflag und die Bitmasken für jeden Lese- und Schreibaufruf angegeben werden.

Bei der Langform kannst du EventFlag bei jedem readBlocking()- und writeBlocking()-Aufruf explizit angeben. Sie können eine der Warteschlangen mit einem internen Ereignisflag initialisieren, das dann mit getEventFlagWord() aus den MessageQueue-Objekten dieser Warteschlange extrahiert und verwendet werden muss, um in jedem Prozess ein EventFlag-Objekt für die Verwendung mit anderen FMQs zu erstellen. Alternativ können Sie die EventFlag-Objekte mit einem beliebigen geeigneten freigegebenen Speicher initialisieren.

Im Allgemeinen sollte für jede Warteschlange nur eine der folgenden Blockierungsarten verwendet werden: nicht blockierend, blockierend in Kurzform oder blockierend in Langform. Es ist nicht falsch, sie zu mischen, aber es ist eine sorgfältige Programmierung erforderlich, um das gewünschte Ergebnis zu erzielen.

Speicher als schreibgeschützt markieren

Standardmäßig hat der freigegebene Arbeitsspeicher Lese- und Schreibberechtigungen. Bei nicht synchronisierten Warteschlangen (kUnsynchronizedWrite) möchte der Autor möglicherweise die Schreibberechtigungen für alle Leser entfernen, bevor er die MQDescriptorUnsync-Objekte weitergibt. So wird sichergestellt, dass die anderen Prozesse nicht in die Warteschlange schreiben können. Dies wird empfohlen, um vor Fehlern oder Fehlverhalten in den Leserprozessen zu schützen. Wenn der Schreiber möchte, dass die Leser die Warteschlange zurücksetzen können, wenn sie MQDescriptorUnsync verwenden, um die Leseseite der Warteschlange zu erstellen, kann der Speicher nicht als schreibgeschützt gekennzeichnet werden. Das ist das Standardverhalten des MessageQueue-Konstruktors. Wenn es also bereits Nutzer dieser Warteschlange gibt, muss ihr Code geändert werden, damit die Warteschlange mit resetPointer=false erstellt wird.

  • Schreibvorgang: ashmem_set_prot_region mit einem MQDescriptor-Datei-Descriptor und einer Region auf „Schreibgeschützt“ (PROT_READ) aufrufen:
    int res = ashmem_set_prot_region(mqDesc->handle->data[0], PROT_READ)
  • Leser: Nachrichtenwarteschlange mit resetPointer=false erstellen (Standard ist true):
    mFmq = new (std::nothrow) MessageQueue(mqDesc, false);

MessageQueue verwenden

Die öffentliche API des MessageQueue-Objekts lautet:

size_t availableToWrite() // Space available (number of elements).
size_t availableToRead() // Number of elements available.
size_t getQuantumSize() // Size of type T in bytes.
size_t getQuantumCount() // Number of items of type T that fit in the FMQ.
bool isValid() // Whether the FMQ is configured correctly.
const MQDescriptor<T, flavor>* getDesc() // Return info to send to other process.

bool write(const T* data)  // Write one T to FMQ; true if successful.
bool write(const T* data, size_t count) // Write count T's; no partial writes.

bool read(T* data); // read one T from FMQ; true if successful.
bool read(T* data, size_t count); // Read count T's; no partial reads.

bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0);
bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);

// Allows multiple queues to share a single event flag word
std::atomic<uint32_t>* getEventFlagWord();

bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr); // Blocking write operation for count Ts.

bool readBlocking(T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr) // Blocking read operation for count Ts;

// APIs to allow zero copy read/write operations
bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);
bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);

Mit availableToWrite() und availableToRead() können Sie festlegen, wie viele Daten bei einem einzelnen Vorgang übertragen werden können. In einer nicht synchronisierten Warteschlange:

  • availableToWrite() gibt immer die Kapazität der Warteschlange zurück.
  • Jeder Leser hat eine eigene Leseposition und führt eine eigene Berechnung für availableToRead() durch.
  • Aus Sicht eines langsamen Lesers darf die Warteschlange überlaufen. Dies kann dazu führen, dass availableToRead() einen Wert zurückgibt, der größer als die Größe der Warteschlange ist. Der erste Lesevorgang nach einem Überlauf schlägt fehl und führt dazu, dass die Leseposition für diesen Reader auf den aktuellen Schreibzeiger gesetzt wird, unabhängig davon, ob der Überlauf über availableToRead() gemeldet wurde oder nicht.

Die Methoden read() und write() geben true zurück, wenn alle angeforderten Daten in die Warteschlange übertragen werden konnten (und wurden). Diese Methoden werden nicht blockiert. Sie sind entweder erfolgreich (und geben true zurück) oder geben sofort einen Fehler zurück (false).

Die Methoden readBlocking() und writeBlocking() warten, bis der angeforderte Vorgang abgeschlossen werden kann oder das Zeitlimit überschritten wird. Ein timeOutNanos-Wert von 0 bedeutet, dass kein Zeitlimit gilt.

Blockierungsvorgänge werden mit einem Ereignisflagwort implementiert. Standardmäßig wird für jede Warteschlange ein eigenes Flag-Wort erstellt und verwendet, um die Kurzform von readBlocking() und writeBlocking() zu unterstützen. Mehrere Warteschlangen können sich ein einzelnes Wort teilen, sodass ein Prozess auf Schreib- oder Lesevorgänge in einer der Warteschlangen warten kann. Wenn Sie getEventFlagWord() aufrufen, erhalten Sie einen Verweis auf das Ereignisflaggenwort einer Warteschlange. Mit diesem Verweis (oder einem beliebigen Verweis auf einen geeigneten gemeinsamen Speicherort) können Sie ein EventFlag-Objekt erstellen, das in die Langform von readBlocking() und writeBlocking() für eine andere Warteschlange übergeben wird. Die Parameter readNotification und writeNotification geben an, welche Bits im Ereignisflag verwendet werden sollen, um Lese- und Schreibvorgänge in dieser Warteschlange zu signalisieren. readNotification und writeNotification sind 32-Bit-Bitmasken.

readBlocking() wartet auf die writeNotification-Bits. Wenn dieser Parameter 0 ist, schlägt der Aufruf immer fehl. Wenn der Wert von readNotification 0 ist, schlägt der Aufruf nicht fehl, aber bei einem erfolgreichen Lesen werden keine Benachrichtigungsbits gesetzt. In einer synchronisierten Warteschlange bedeutet das, dass der entsprechende writeBlocking()-Aufruf nur dann aktiviert wird, wenn das Bit an anderer Stelle gesetzt wird. In einer nicht synchronisierten Warteschlange wartet writeBlocking() nicht (es sollte trotzdem zum Festlegen des Schreibbenachrichtigungsbits verwendet werden) und es ist für Lesevorgänge geeignet, keine Benachrichtigungsbits zu setzen. Ebenso schlägt writeblocking() fehl, wenn readNotification 0 ist und ein erfolgreicher Schreibvorgang die angegebenen writeNotification-Bits festlegt.

Wenn du auf mehrere Warteschlangen gleichzeitig warten möchtest, verwende die Methode wait() eines EventFlag-Objekts, um auf eine Bitmaske von Benachrichtigungen zu warten. Die Methode wait() gibt ein Statuswort mit den Bits zurück, die das Wecken ausgelöst haben. Anhand dieser Informationen wird dann geprüft, ob die entsprechende Warteschlange genügend Speicherplatz oder Daten für den gewünschten Schreib- und Lesevorgang hat, und es werden nicht blockierende write() und read() ausgeführt. Wenn du eine Benachrichtigung nach dem Vorgang erhalten möchtest, musst du die Methode wake() des EventFlag-Objekts noch einmal aufrufen. Eine Definition der EventFlag-Abstraktion finden Sie unter system/libfmq/include/fmq/EventFlag.h.

Kopiervorgänge ohne Datenübertragung

Die Methoden read, write, readBlocking und writeBlocking() nehmen einen Verweis auf einen Eingabe-/Ausgabe-Puffer als Argument an und verwenden intern memcpy()-Aufrufe, um Daten zwischen demselben und dem FMQ-Ringpuffer zu kopieren. Zur Leistungssteigerung enthalten Android 8.0 und höher eine Reihe von APIs, die direkten Zeigerzugriff auf den Ringbuffer bieten. Dadurch ist die Verwendung von memcpy-Aufrufen nicht mehr erforderlich.

Verwenden Sie die folgenden öffentlichen APIs für Zero-Copy-FMQ-Vorgänge:

bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);

bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);
  • Die Methode beginWrite stellt Basiszeigerstöcke in den FMQ-Ringbuffer bereit. Nachdem die Daten geschrieben wurden, können Sie sie mit commitWrite() festschreiben. Die Methoden beginRead und commitRead funktionieren gleich.
  • Die Methoden beginRead und Write nehmen die Anzahl der zu lesenden und zu schreibenden Nachrichten als Eingabe und geben einen booleschen Wert zurück, der angibt, ob das Lesen oder Schreiben möglich ist. Wenn das Lesen oder Schreiben möglich ist, wird das memTx-Objekt mit Basiszeigern gefüllt, die für den direkten Zeigerzugriff auf den freigegebenen Ringbuffer-Speicher verwendet werden können.
  • Das MemRegion-Objekt enthält Details zu einem Speicherblock, einschließlich des Basiszeigers (Basisadresse des Speicherblocks) und der Länge in T (Länge des Speicherblocks in Bezug auf den HIDL-definierten Typ der Nachrichtenwarteschlange).
  • Das MemTransaction-Struktur enthält zwei MemRegion-Strukturen, first und second, da ein Lesen oder Schreiben in den Ringpuffer einen Umlauf zum Anfang der Warteschlange erfordern kann. Das würde bedeuten, dass zwei Basiszeichner erforderlich sind, um Daten in den FMQ-Ringpuffer zu lesen und zu schreiben.

So rufen Sie die Basisadresse und die Länge aus einer MemRegion-Struktur ab:

T* getAddress(); // gets the base address
size_t getLength(); // gets the length of the memory region in terms of T
size_t getLengthInBytes(); // gets the length of the memory region in bytes

So rufen Sie Verweise auf die erste und zweite MemRegion-Struktur in einem MemTransaction-Objekt ab:

const MemRegion& getFirstRegion(); // get a reference to the first MemRegion
const MemRegion& getSecondRegion(); // get a reference to the second MemRegion

Beispiel für das Schreiben in die FMQ mit Zero-Copy-APIs:

MessageQueueSync::MemTransaction tx;
if (mQueue->beginRead(dataLen, &tx)) {
    auto first = tx.getFirstRegion();
    auto second = tx.getSecondRegion();

    foo(first.getAddress(), first.getLength()); // method that performs the data write
    foo(second.getAddress(), second.getLength()); // method that performs the data write

    if(commitWrite(dataLen) == false) {
       // report error
    }
} else {
   // report error
}

Die folgenden Hilfsmethoden sind ebenfalls Teil von MemTransaction:

  • T* getSlot(size_t idx); gibt einen Zeiger auf den Slot idx innerhalb der MemRegions zurück, der Teil dieses MemTransaction-Objekts ist. Wenn das MemTransaction-Objekt die Arbeitsspeicherbereiche darstellt, in denen N Elemente vom Typ T gelesen und geschrieben werden sollen, liegt der gültige Bereich von idx zwischen 0 und N−1.
  • bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1); schreibt nMessages Elemente vom Typ T in die vom Objekt beschriebenen Speicherbereiche, beginnend mit dem Index startIdx. Bei dieser Methode wird memcpy() verwendet und sie ist nicht für einen Kopiervorgang ohne Datenübertragung vorgesehen. Wenn das MemTransaction-Objekt den Arbeitsspeicher darstellt, in dem N Elemente vom Typ T gelesen und geschrieben werden, liegt der gültige Bereich von idx zwischen 0 und N-1.
  • bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1); ist eine Hilfsmethode zum Lesen von nMessages Elementen vom Typ T aus den vom Objekt beschriebenen Speicherbereichen, beginnend bei startIdx. Bei dieser Methode wird memcpy() verwendet und sie ist nicht für einen Kopiervorgang ohne Datenübertragung vorgesehen.

Warteschlange über HIDL senden

Beim Erstellen:

  1. Erstellen Sie wie oben beschrieben ein Nachrichtenwarteschlangenobjekt.
  2. Prüfen Sie mit isValid(), ob das Objekt gültig ist.
  3. Wenn du auf mehrere Warteschlangen wartest, indem du EventFlag in die Langform von readBlocking() oder writeBlocking() übergibst, kannst du den Ereignisflag-Pointer (mit getEventFlagWord()) aus einem MessageQueue-Objekt extrahieren, das zum Erstellen des Flags initialisiert wurde, und mit diesem Flag das erforderliche EventFlag-Objekt erstellen.
  4. Verwenden Sie die Methode MessageQueuegetDesc(), um ein Descriptor-Objekt abzurufen.
  5. Geben Sie der Methode in der HAL-Datei einen Parameter vom Typ fmq_sync oder fmq_unsync, wobei T ein geeigneter HIDL-definierter Typ ist. Damit wird das von getDesc() zurückgegebene Objekt an den Empfängerprozess gesendet.

Auf der Empfängerseite:

  1. Verwenden Sie das Deskriptorobjekt, um ein MessageQueue-Objekt zu erstellen. Verwenden Sie dieselbe Warteschlangenvariante und denselben Datentyp, da die Vorlage sonst nicht kompiliert werden kann.
  2. Wenn Sie ein Ereignisflag extrahiert haben, extrahieren Sie das Flag aus dem entsprechenden MessageQueue-Objekt im Empfangsprozess.
  3. Verwenden Sie das MessageQueue-Objekt, um Daten zu übertragen.