快速訊息佇列 (FMQ)

HIDL 的遠端程序呼叫 (RPC) 基礎架構使用繫結器機制,也就是說,呼叫會涉及額外負擔、需要核心作業,並可觸發排程器動作。不過,如果資料必須在過程間傳輸,且不涉及核心,且不涉及核心,則會使用快速訊息佇列 (FMQ) 系統。

FMQ 會使用所需的屬性建立訊息佇列。您可以透過 HIDL RPC 呼叫傳送 MQDescriptorSyncMQDescriptorUnsync 物件,接收程序會使用該物件存取訊息佇列。

佇列類型

Android 支援兩種佇列類型 (稱為「flavor」):

  • 未同步的佇列可溢位,且可有許多讀取器;每個讀取器都必須及時讀取資料,否則資料就會遺失。
  • 同步佇列不允許溢位,且只能有一個讀取器。

這兩種佇列類型都不允許負溢 (從空佇列讀取會失敗),且只能有一個寫入器。

未同步的佇列

未同步化的佇列只有一個寫入者,但可有任意數量的讀取者。佇列只有一個寫入位置,但每個讀取器都會追蹤自己的獨立讀取位置。

只要寫入佇列的資料量不超過設定的佇列容量 (寫入超過佇列容量的資料會立即失敗),寫入作業一律會成功 (不會檢查是否溢位)。由於每個讀取器的讀取位置可能不同,因此當新寫入作業需要空間時,系統會將資料從佇列中移除,而非等待每個讀取器讀取每個資料片段。

讀取器負責在資料從佇列中移除前,先行擷取資料。如果讀取作業嘗試讀取的資料量超過可用資料量,則會立即失敗 (如果是非阻斷式),或是等待可用資料量達到足夠的數量 (如果是阻斷式)。如果讀取作業嘗試讀取的資料量超過佇列容量,系統一律會立即失敗。

如果讀取器無法跟上寫入器,也就是讀取器尚未讀取的資料量超過佇列容量,則下次讀取不會傳回資料,而是將讀取器的讀取位置重設為寫入位置加上一半的容量,然後傳回失敗。這樣一來,系統可保留一半的緩衝區供讀取,並為新的寫入作業保留空間,避免佇列再次立即溢位。如果在溢位後但在下次讀取之前檢查可讀取的資料,系統會顯示可讀取的資料多於佇列容量,表示發生溢位。(如果在檢查可用資料和嘗試讀取該資料之間,佇列發生溢位,唯一的溢位指標就是讀取失敗)。

同步佇列

同步佇列包含一個寫入器和一個讀取器,且各自有一個寫入位置和一個讀取位置。寫入的資料量不得超過佇列的空間,讀取的資料量不得超過佇列目前的容量。視是否呼叫阻斷式或非阻斷式寫入或讀取函式而定,嘗試超出可用空間或資料的作業,會立即傳回失敗,或阻斷至所需作業完成為止。嘗試讀取或寫入的資料量超過佇列容量時,系統一律會立即失敗。

設定 FMQ

訊息佇列需要多個 MessageQueue 物件:一個用於寫入,一個或多個用於讀取。系統並未明確設定哪個物件用於寫入或讀取;使用者必須確保沒有物件同時用於讀取和寫入,且最多只有一個寫入器,而對於同步佇列,則最多只有一個讀取器。

建立第一個 MessageQueue 物件

透過單一呼叫建立及設定訊息佇列:

#include <fmq/MessageQueue.h>
using android::hardware::kSynchronizedReadWrite;
using android::hardware::kUnsynchronizedWrite;
using android::hardware::MQDescriptorSync;
using android::hardware::MQDescriptorUnsync;
using android::hardware::MessageQueue;
....
// For a synchronized nonblocking FMQ
mFmqSynchronized =
  new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite>
      (kNumElementsInQueue);
// For an unsynchronized FMQ that supports blocking
mFmqUnsynchronizedBlocking =
  new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
      (kNumElementsInQueue, true /* enable blocking operations */);
  • MessageQueue<T, flavor>(numElements) 初始化器會建立及初始化支援訊息佇列功能的物件。
  • MessageQueue<T, flavor>(numElements, configureEventFlagWord) 初始化器會建立並初始化支援訊息佇列功能的物件,並且會封鎖。
  • flavor 可以是 kSynchronizedReadWrite (同步佇列) 或 kUnsynchronizedWrite (非同步佇列)。
  • uint16_t (在本例中) 可以是任何不涉及巢狀緩衝區 (沒有 stringvec 類型)、句柄或介面的HIDL 定義類型
  • kNumElementsInQueue 會以項目數量表示佇列大小,並決定為佇列分配的共用記憶體緩衝區大小。

建立第二個 MessageQueue 物件

訊息佇列的第二個端會使用從第一個端取得的 MQDescriptor 物件建立。MQDescriptor 物件會透過 HIDL 或 AIDL RPC 呼叫傳送至保留訊息佇列第二端的程序。MQDescriptor 包含佇列的相關資訊,包括:

  • 用於對應緩衝區和寫入指標的資訊。
  • 用於對應讀取指標的資訊 (如果佇列已同步)。
  • 對應事件標記字詞的資訊 (如果佇列處於封鎖狀態)。
  • 物件類型 (<T, flavor>),包括佇列元素的 HIDL 定義類型和佇列類型 (同步或非同步)。

您可以使用 MQDescriptor 物件建構 MessageQueue 物件:

MessageQueue<T, flavor>::MessageQueue(const MQDescriptor<T, flavor>& Desc, bool resetPointers)

resetPointers 參數會指出是否要在建立此 MessageQueue 物件時,將讀取和寫入位置重設為 0。在未同步的佇列中,讀取位置 (即未同步佇列中每個 MessageQueue 物件的本機位置) 一律會在建立期間設為 0。通常,MQDescriptor 會在建立第一個訊息佇列物件時初始化。如要進一步控制共用記憶體,您可以手動設定 MQDescriptor (MQDescriptor 是在 system/libhidl/base/include/hidl/MQDescriptor.h 中定義),然後按照本節所述建立每個 MessageQueue 物件。

封鎖佇列和事件標記

根據預設,佇列不支援封鎖讀取和寫入作業。封鎖讀取和寫入呼叫有兩種:

  • 簡短格式:包含三個參數 (資料指標、項目數量、逾時),可在單一佇列上阻斷個別讀取和寫入作業。使用這個表單時,佇列會在內部處理事件旗標和位元遮罩,且第一個訊息佇列物件必須使用 true 的第二個參數進行初始化。例如:
    // For an unsynchronized FMQ that supports blocking
    mFmqUnsynchronizedBlocking =
      new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
          (kNumElementsInQueue, true /* enable blocking operations */);
    
  • 長格式:包含六個參數 (包括事件標記和位元遮罩),可在多個佇列之間使用共用 EventFlag 物件,並允許指定要使用的通知位元遮罩。在這種情況下,必須為每個讀取和寫入呼叫提供事件標記和位元遮罩。

針對長格式,您可以在每個 readBlocking()writeBlocking() 呼叫中明確提供 EventFlag。您可以使用內部事件標記初始化其中一個佇列,然後必須使用 getEventFlagWord() 從該佇列的 MessageQueue 物件中擷取,並用於在每個程序中建立 EventFlag 物件,以便與其他 FMQ 搭配使用。或者,您也可以使用任何適合的共用記憶體,初始化 EventFlag 物件。

一般來說,每個佇列都應只使用一種非阻斷、短格式阻斷或長格式阻斷。混合使用這兩種方法並不會發生錯誤,但必須謹慎編寫程式才能取得所需結果。

將記憶體標示為唯讀

根據預設,共用記憶體具有讀取和寫入權限。對於未同步的佇列 (kUnsynchronizedWrite),作者可能會在發出 MQDescriptorUnsync 物件之前,移除所有讀取器的寫入權限。這可確保其他程序無法寫入佇列,建議您這麼做,以防讀取程序中的錯誤或不良行為。如果作者希望讀取端在使用 MQDescriptorUnsync 建立佇列的讀取端時能夠重設佇列,則記憶體就無法標示為唯讀。這是 MessageQueue 建構函式的預設行為。因此,如果有使用這個佇列的現有使用者,就必須變更他們的程式碼,以便使用 resetPointer=false 建構佇列。

  • 寫入器:使用 MQDescriptor 檔案描述元和設為唯讀的區域呼叫 ashmem_set_prot_region (PROT_READ):
    int res = ashmem_set_prot_region(mqDesc->handle->data[0], PROT_READ)
  • 讀取器:使用 resetPointer=false 建立訊息佇列 (預設為 true):
    mFmq = new (std::nothrow) MessageQueue(mqDesc, false);

使用 MessageQueue

MessageQueue 物件的公開 API 如下:

size_t availableToWrite() // Space available (number of elements).
size_t availableToRead() // Number of elements available.
size_t getQuantumSize() // Size of type T in bytes.
size_t getQuantumCount() // Number of items of type T that fit in the FMQ.
bool isValid() // Whether the FMQ is configured correctly.
const MQDescriptor<T, flavor>* getDesc() // Return info to send to other process.

bool write(const T* data)  // Write one T to FMQ; true if successful.
bool write(const T* data, size_t count) // Write count T's; no partial writes.

bool read(T* data); // read one T from FMQ; true if successful.
bool read(T* data, size_t count); // Read count T's; no partial reads.

bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0);
bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);

// Allows multiple queues to share a single event flag word
std::atomic<uint32_t>* getEventFlagWord();

bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr); // Blocking write operation for count Ts.

bool readBlocking(T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr) // Blocking read operation for count Ts;

// APIs to allow zero copy read/write operations
bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);
bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);

您可以使用 availableToWrite()availableToRead(),判斷單一作業可傳輸的資料量。在未同步的佇列中:

  • availableToWrite() 一律會傳回佇列的容量。
  • 每個讀取器都有自己的讀取位置,並自行計算 availableToRead()
  • 從緩慢讀取器的角度來看,佇列可溢位;這可能導致 availableToRead() 傳回的值大於佇列大小。溢位失敗後,第一次讀取會導致讀取器的讀取位置設為與目前寫入指標相同,無論是否透過 availableToRead() 回報溢位情形。

如果所有要求的資料都能傳送至佇列,並從佇列傳送,read()write() 方法就會傳回 true。這些方法不會阻斷;它們會立即傳回成功 (並傳回 true),或傳回失敗 (false)。

readBlocking()writeBlocking() 方法會等待至可完成要求的作業,或等到作業逾時 (timeOutNanos 值為 0 表示永遠不會逾時)。

系統會使用事件旗標字來實作封鎖作業。根據預設,每個佇列都會建立並使用自己的旗標字,以支援 readBlocking()writeBlocking() 的短格式。多個佇列可以共用單一字詞,讓程序可以等待對任何佇列的寫入或讀取作業。呼叫 getEventFlagWord() 後,您可以取得佇列事件標記字的索引,並使用該索引 (或任何指向適當共用記憶體位置的索引) 建立 EventFlag 物件,以便將其傳遞至不同佇列的長格式 readBlocking()writeBlocking()readNotificationwriteNotification 參數會指出事件標記中的哪些位元應用於在該佇列上發出讀取和寫入信號。readNotificationwriteNotification 是 32 位元位元遮罩。

readBlocking() 會等待 writeNotification 位元;如果該參數為 0,呼叫一律會失敗。如果 readNotification 值為 0,呼叫不會失敗,但成功讀取後不會設定任何通知位元。在同步化佇列中,這表示除非在其他位置設定位元,否則對應的 writeBlocking() 呼叫永遠不會喚醒。在未同步的佇列中,writeBlocking() 不會等待 (仍應用於設定寫入通知位元),且讀取作業不應設定任何通知位元。同樣地,如果 readNotification 為 0,writeblocking() 就會失敗,而成功的寫入作業會設定指定的 writeNotification 位元。

如要同時等待多個佇列,請使用 EventFlag 物件的 wait() 方法,等待通知的位元遮罩。wait() 方法會傳回狀態字,其中包含導致喚醒集的位元。接著,系統會使用這項資訊,驗證對應的佇列是否有足夠的空間或資料,可執行所需的寫入和讀取作業,並執行非阻斷的 write()read()。如要取得後續作業通知,請使用另一個對 EventFlag 物件 wake() 方法的呼叫。如要瞭解 EventFlag 抽象化的定義,請參閱 system/libfmq/include/fmq/EventFlag.h

零複製作業

readwritereadBlockingwriteBlocking() 方法會將輸入/輸出緩衝區的指標做為引數,並在內部使用 memcpy() 呼叫,在相同的 FMQ 環狀緩衝區之間複製資料。為提升效能,Android 8.0 以上版本包含一組 API,可提供直接指標存取環形緩衝區,因此不必使用 memcpy 呼叫。

請使用下列公開 API 執行零複製 FMQ 作業:

bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);

bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);
  • beginWrite 方法會提供 FMQ 環緩衝區的基本指標。資料寫入後,請使用 commitWrite() 提交資料。beginReadcommitRead 方法的運作方式相同。
  • beginReadWrite 方法會將要讀取和寫入的訊息數量做為輸入內容,並傳回布林值,指出是否可以讀取或寫入。如果可以讀取或寫入,memTx 結構體會填入可用於直接指標存取環狀緩衝區共用記憶體的基礎指標。
  • MemRegion 結構體包含記憶體區塊的詳細資料,包括基底指標 (記憶體區塊的基底位址) 和 T 的長度 (以 HIDL 定義的訊息佇列類型為準的記憶體區塊長度)。
  • MemTransaction 結構體包含兩個 MemRegion 結構體,firstsecond,因為讀取或寫入環形緩衝區可能需要繞回佇列的開頭。這表示需要兩個基本指標,才能讀取及寫入 FMQ 環狀緩衝區的資料。

如要從 MemRegion 結構體取得基底位址和長度,請按照下列步驟操作:

T* getAddress(); // gets the base address
size_t getLength(); // gets the length of the memory region in terms of T
size_t getLengthInBytes(); // gets the length of the memory region in bytes

如要取得 MemTransaction 物件中第一個和第二個 MemRegion 結構體的參照,請按照下列步驟操作:

const MemRegion& getFirstRegion(); // get a reference to the first MemRegion
const MemRegion& getSecondRegion(); // get a reference to the second MemRegion

使用零複製 API 寫入 FMQ 的範例:

MessageQueueSync::MemTransaction tx;
if (mQueue->beginRead(dataLen, &tx)) {
    auto first = tx.getFirstRegion();
    auto second = tx.getSecondRegion();

    foo(first.getAddress(), first.getLength()); // method that performs the data write
    foo(second.getAddress(), second.getLength()); // method that performs the data write

    if(commitWrite(dataLen) == false) {
       // report error
    }
} else {
   // report error
}

以下輔助方法也是 MemTransaction 的一部分:

  • T* getSlot(size_t idx); 會傳回 MemRegions 中指向 idx 的資料指標,而 MemRegions 是這個 MemTransaction 物件的一部分。如果 MemTransaction 物件代表用於讀取及寫入 T 類型 N 個項目的記憶體區域,則 idx 的有效範圍介於 0 和 N-1 之間。
  • bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1); 會將 nMessages 型別的 T 項目寫入物件所描述的記憶體區域,從索引 startIdx 開始。這個方法會使用 memcpy(),且不應用於零複製作業。如果 MemTransaction 物件代表用於讀取及寫入 T 型別 N 個項目的記憶體,則 idx 的有效範圍介於 0 和 N-1 之間。
  • bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1); 是輔助方法,可從 startIdx 開始的物件所描述的記憶體區域讀取 T 類型的 nMessages 項目。這個方法會使用 memcpy(),且不應用於零複製作業。

透過 HIDL 傳送佇列

在建立方面:

  1. 按照上述步驟建立訊息佇列物件。
  2. 使用 isValid() 驗證物件是否有效。
  3. 如果您透過將 EventFlag 傳入 readBlocking()writeBlocking() 的長格式,等待多個佇列,您可以從已初始化用來建立標記的 MessageQueue 物件中,使用 getEventFlagWord() 擷取事件標記指標,然後使用該標記建立必要的 EventFlag 物件。
  4. 使用 MessageQueue 方法 getDesc() 取得描述元物件。
  5. 在 HAL 檔案中,為方法提供 fmq_syncfmq_unsync 類型的參數,其中 T 是適當的 HIDL 定義類型。使用這個方法,將 getDesc() 傳回的物件傳送至接收程序。

接收端:

  1. 使用描述元物件建立 MessageQueue 物件。請使用相同的佇列口味和資料類型,否則範本無法編譯。
  2. 如果您擷取事件旗標,請從接收程序中的對應 MessageQueue 物件擷取旗標。
  3. 使用 MessageQueue 物件傳輸資料。