快速消息隊列 (FMQ)

HIDL 的遠程過程調用 (RPC) 基礎架構使用 Binder 機制,這意味著調用涉及開銷,需要內核操作,並且可能觸發調度程序操作。但是,對於必須在開銷較小且不涉及內核的進程之間傳輸數據的情況,使用快速消息隊列 (FMQ) 系統。

FMQ 創建具有所需屬性的消息隊列。可以通過 HIDL RPC 調用發送MQDescriptorSyncMQDescriptorUnsync對象,並由接收進程用於訪問消息隊列。

只有 C++ 和運行 Android 8.0 及更高版本的設備支持快速消息隊列。

消息隊列類型

Android 支持兩種隊列類型(稱為風味):

  • 不同步的隊列允許溢出,可以有很多讀者;每個閱讀器都必須及時讀取數據,否則會丟失。
  • 同步隊列不允許溢出,並且只能有一個讀卡器。

兩種隊列類型都不允許下溢(從空隊列讀取將失敗)並且只能有一個寫入器。

不同步

非同步隊列只有一個寫入者,但可以有任意數量的讀取者。隊列有一個寫入位置;但是,每個閱讀器都會跟踪自己獨立的閱讀位置。

只要不大於配置的隊列容量(大於隊列容量的寫入立即失敗),寫入隊列總是成功(不檢查溢出)。由於每個讀取器可能有不同的讀取位置,而不是等待每個讀取器讀取每條數據,只要新的寫入需要空間,數據就可以從隊列中掉下來。

讀取負責在數據脫離隊列末尾之前檢索數據。嘗試讀取比可用數據更多的數據的讀取要么立即失敗(如果非阻塞),要么等待足夠的數據可用(如果阻塞)。嘗試讀取比隊列容量更多的數據的讀取總是立即失敗。

如果一個reader跟不上writer,使得該reader已經寫入但尚未讀取的數據量大於隊列容量,則下次read不返回數據;相反,它將讀取器的讀取位置重置為最新的寫入位置,然後返回失敗。如果在溢出之後但在下一次讀取之前檢查了可供讀取的數據,則表明可供讀取的數據多於隊列容量,表明發生了溢出。 (如果隊列在檢查可用數據和嘗試讀取該數據之間發生溢出,則溢出的唯一指示是讀取失敗。)

同步的

同步隊列具有一個寫入器和一個讀取器,具有一個寫入位置和一個讀取位置。不可能寫入比隊列空間更多的數據或讀取比隊列當前持有的更多數據。根據是調用阻塞還是非阻塞寫入或讀取函數,嘗試超過可用空間或數據時,要么立即返回失敗,要么阻塞,直到可以完成所需的操作。嘗試讀取或寫入超過隊列容量的數據總是會立即失敗。

設置 FMQ

一個消息隊列需要多個MessageQueue對象:一個要寫入,一個或多個要讀取。沒有明確配置哪個對像用於寫入或讀取;由用戶來確保沒有對象同時用於讀取和寫入,最多有一個寫入器,並且對於同步隊列,最多有一個讀取器。

創建第一個 MessageQueue 對象

使用單個調用創建和配置消息隊列:

#include <fmq/MessageQueue.h>
using android::hardware::kSynchronizedReadWrite;
using android::hardware::kUnsynchronizedWrite;
using android::hardware::MQDescriptorSync;
using android::hardware::MQDescriptorUnsync;
using android::hardware::MessageQueue;
....
// For a synchronized non-blocking FMQ
mFmqSynchronized =
  new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite>
      (kNumElementsInQueue);
// For an unsynchronized FMQ that supports blocking
mFmqUnsynchronizedBlocking =
  new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
      (kNumElementsInQueue, true /* enable blocking operations */);
  • MessageQueue<T, flavor>(numElements)初始化程序創建並初始化一個支持消息隊列功能的對象。
  • MessageQueue<T, flavor>(numElements, configureEventFlagWord)初始化程序創建並初始化一個對象,該對象支持帶有阻塞的消息隊列功能。
  • flavor可以是同步隊列的kSynchronizedReadWrite或非同步隊列的kUnsynchronizedWrite
  • uint16_t (在此示例中)可以是不涉及嵌套緩衝區(無stringvec類型)、句柄或接口的任何HIDL 定義的類型
  • kNumElementsInQueue以條目數表示隊列的大小;它確定將為隊列分配的共享內存緩衝區的大小。

創建第二個 MessageQueue 對象

消息隊列的第二端是使用從第一端獲得的MQDescriptor對象創建的。 MQDescriptor對象通過 HIDL RPC 調用發送到將保存消息隊列第二端的進程。 MQDescriptor包含有關隊列的信息,包括:

  • 映射緩衝區和寫指針的信息。
  • 映射讀取指針的信息(如果隊列已同步)。
  • 映射事件標誌字的信息(如果隊列阻塞)。
  • 對像類型 ( <T, flavor> ),其中包括HIDL 定義的隊列元素類型和隊列風格(同步或非同步)。

MQDescriptor對象可用於構造MessageQueue對象:

MessageQueue<T, flavor>::MessageQueue(const MQDescriptor<T, flavor>& Desc, bool resetPointers)

resetPointers參數指示在創建此MessageQueue對象時是否將讀取和寫入位置重置為 0。在非同步隊列中,讀取位置(對於非同步隊列中的每個MessageQueue對像都是本地的)在創建期間始終設置為 0。通常, MQDescriptor在創建第一個消息隊列對象期間被初始化。為了對共享內存進行額外控制,您可以手動設置MQDescriptorMQDescriptorsystem/libhidl/base/include/hidl/MQDescriptor.h中定義),然後按照本節中的說明創建每個MessageQueue對象。

阻塞隊列和事件標誌

默認情況下,隊列不支持阻塞讀/寫。有兩種阻塞讀/寫調用:

  • 短格式,具有三個參數(數據指針、項目數、超時)。支持阻塞單個隊列上的單個讀/寫操作。使用這種形式時,隊列將在內部處理事件標誌和位掩碼,並且第一個消息隊列對象必須使用第二個參數true進行初始化。例如:
    // For an unsynchronized FMQ that supports blocking
    mFmqUnsynchronizedBlocking =
      new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
          (kNumElementsInQueue, true /* enable blocking operations */);
    
  • Long form, with six parameters (includes event flag and bitmasks). Supports using a shared EventFlag object between multiple queues and allows specifying the notification bit masks to be used. In this case, the event flag and bitmasks must be supplied to each read and write call.

For the long form, the EventFlag can be supplied explicitly in each readBlocking() and writeBlocking() call. One of the queues may be initialized with an internal event flag, which must then be extracted from that queue's MessageQueue objects using getEventFlagWord() and used to create EventFlag objects in each process for use with other FMQs. Alternatively, the EventFlag objects can be initialized with any suitable shared memory.

In general, each queue should use only one of non-blocking, short-form blocking, or long-form blocking. It is not an error to mix them, but careful programming is required to get the desired result.

Using the MessageQueue

The public API of the MessageQueue object is:

size_t availableToWrite()  // Space available (number of elements).
size_t availableToRead()  // Number of elements available.
size_t getQuantumSize()  // Size of type T in bytes.
size_t getQuantumCount() // Number of items of type T that fit in the FMQ.
bool isValid() // Whether the FMQ is configured correctly.
const MQDescriptor<T, flavor>* getDesc()  // Return info to send to other process.

bool write(const T* data)  // Write one T to FMQ; true if successful.
bool write(const T* data, size_t count) // Write count T's; no partial writes.

bool read(T* data);  // read one T from FMQ; true if successful.
bool read(T* data, size_t count);  // Read count T's; no partial reads.

bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0);
bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);

// Allows multiple queues to share a single event flag word
std::atomic<uint32_t>* getEventFlagWord();

bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr); // Blocking write operation for count Ts.

bool readBlocking(T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr) // Blocking read operation for count Ts;

//APIs to allow zero copy read/write operations
bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);
bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);

availableToWrite()availableToRead()可用於確定在單個操作中可以傳輸多少數據。在非同步隊列中:

  • availableToWrite()總是返回隊列的容量。
  • 每個閱讀器都有自己的閱讀位置,並對availableToRead()進行自己的計算。
  • 從慢讀器的角度來看,隊列是允許溢出的;這可能會導致availableToRead()返回一個大於隊列大小的值。溢出後的第一次讀取將失敗並導致該讀取器的讀取位置設置為等於當前寫入指針,無論溢出是否通過availableToRead()報告。

如果所有請求的數據都可以(並且曾經)傳輸到隊列/從隊列傳輸,則read()write()方法返回true 。這些方法不會阻塞;他們要么成功(並返回true ),要么立即返回失敗( false )。

readBlocking()writeBlocking()方法等待直到請求的操作可以完成,或者直到它們超時( timeOutNanos值為 0 表示永不超時)。

阻塞操作是使用事件標誌字實現的。默認情況下,每個隊列都會創建並使用自己的標誌字來支持readBlocking()writeBlocking()的縮寫形式。多個隊列可以共享一個單詞,以便進程可以等待對任何隊列的寫入或讀取。可以通過調用getEventFlagWord()獲得指向隊列事件標誌字的指針,並且該指針(或任何指向合適共享內存位置的指針)可用於創建EventFlag對像以傳遞到readBlocking()的長格式和writeBlocking()用於不同的隊列。 readNotificationwriteNotification參數告訴應該使用事件標誌中的哪些位來表示對該隊列的讀取和寫入。 readNotificationwriteNotification是 32 位位掩碼。

readBlocking()等待writeNotification位;如果該參數為 0,則調用總是失敗。如果readNotification值為 0,則調用不會失敗,但成功讀取不會設置任何通知位。在同步隊列中,這意味著相應的writeBlocking()調用將永遠不會喚醒,除非該位在其他位置設置。在非同步隊列中, writeBlocking()不會等待(它仍應用於設置寫入通知位),並且適合讀取不設置任何通知位。同樣,如果readNotification為 0, writeblocking()將失敗,並且成功的寫入會設置指定的writeNotification位。

要一次等待多個隊列,請使用EventFlag對象的wait()方法來等待通知的位掩碼。 wait()方法返回一個狀態字,其中包含導致喚醒設置的位。然後使用此信息來驗證相應隊列是否有足夠的空間或數據用於所需的寫入/讀取操作並執行非阻塞write() / read() 。要獲取操作後通知,請再次調用EventFlagwake()方法。有關EventFlag抽象的定義,請參閱system/libfmq/include/fmq/EventFlag.h

零拷貝操作

read / write / readBlocking / writeBlocking() API 將指向輸入/輸出緩衝區的指針作為參數,並在內部使用memcpy()調用在相同和 FMQ 環形緩衝區之間複製數據。為了提高性能,Android 8.0 及更高版本包含一組 API,可提供對環形緩衝區的直接指針訪問,從而無需使用memcpy調用。

使用以下公共 API 進行零拷貝 FMQ 操作:

bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);

bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);
  • beginWrite方法提供指向 FMQ 環形緩衝區的基本指針。數據寫入後,使用commitWrite()提交。 beginRead / commitRead方法的行為方式相同。
  • beginRead / Write方法將要讀取/寫入的消息數量作為輸入,並返回一個布爾值,指示是否可以讀取/寫入。如果讀取或寫入是可能的,則memTx結構將填充有可用於直接指針訪問環形緩衝區共享內存的基指針。
  • MemRegion結構包含有關內存塊的詳細信息,包括基指針(內存塊的基地址)和以T為單位的長度(根據 HIDL 定義的消息隊列類型的內存塊長度)。
  • MemTransaction結構包含兩個MemRegion結構, firstsecond作為對環形緩衝區的讀取或寫入可能需要環繞到隊列的開頭。這意味著需要兩個基本指針才能將數據讀/寫到 FMQ 環形緩衝區。

MemRegion結構中獲取基地址和長度:

T* getAddress(); // gets the base address
size_t getLength(); // gets the length of the memory region in terms of T
size_t getLengthInBytes(); // gets the length of the memory region in bytes

要在 MemTransaction 對像中獲取對第一個和第二個MemRegionMemTransaction

const MemRegion& getFirstRegion(); // get a reference to the first MemRegion
const MemRegion& getSecondRegion(); // get a reference to the second MemRegion

使用零拷貝 API 寫入 FMQ 的示例:

MessageQueueSync::MemTransaction tx;
if (mQueue->beginRead(dataLen, &tx)) {
    auto first = tx.getFirstRegion();
    auto second = tx.getSecondRegion();

    foo(first.getAddress(), first.getLength()); // method that performs the data write
    foo(second.getAddress(), second.getLength()); // method that performs the data write

    if(commitWrite(dataLen) == false) {
       // report error
    }
} else {
   // report error
}

以下輔助方法也是MemTransaction的一部分:

  • T* getSlot(size_t idx);
    返回指向作為此MemTransaction對像一部分的MemRegions中的插槽idx的指針。如果MemTransaction對象表示要讀取/寫入 T 類型的 N 個項目的內存區域,則idx的有效範圍在 0 和 N-1 之間。
  • bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1);
    將類型為 T 的nMessages項寫入對象描述的內存區域,從索引startIdx開始。此方法使用memcpy()並且不打算用於零複製操作。如果MemTransaction對象代表內存來讀取/寫入 N 個 T 類型的項目,則idx的有效範圍在 0 和 N-1 之間。
  • bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1);
    startIdx開始的對象描述的內存區域中讀取類型為 T 的nMessages項的輔助方法。此方法使用memcpy()並不意味著用於零複製操作。

通過 HIDL 發送隊列

在創作方面:

  1. 如上所述創建消息隊列對象。
  2. 使用isValid()驗證對像是否有效。
  3. 如果您將通過將EventFlag傳遞給readBlocking() / writeBlocking()的長格式來等待多個隊列,則可以從初始化以創建標誌的MessageQueue對像中提取事件標誌指針(使用getEventFlagWord() ),並使用該標誌創建必要的EventFlag對象。
  4. 使用MessageQueue getDesc()方法獲取描述符對象。
  5. .hal文件中,給方法一個fmq_sync類型的參數fmq_unsync其中T是合適的 HIDL 定義的類型。使用getDesc()返回的對象發送到接收進程。

在接收方:

  1. 使用描述符對象創建MessageQueue對象。確保使用相同的隊列風格和數據類型,否則模板將無法編譯。
  2. 如果您提取了一個事件標誌,請在接收進程中從相應的MessageQueue對像中提取該標誌。
  3. 使用MessageQueue對像傳輸數據。