快速訊息佇列 (FMQ)

如需 AIDL 的相關支援,另請參閱 採用 AIDL 的 FMQ

HIDL 的遠端程序呼叫 (RPC) 基礎架構採用 Binder 機制, 表示呼叫通常需要負擔及核心作業, 排程器動作然而,在某些情況下,資料必須在 「快速訊息佇列」功能 (FMQ) 系統的運作。

FMQ 會使用所需的屬性建立訊息佇列。一個 MQDescriptorSyncMQDescriptorUnsync 物件可 透過 HIDL RPC 呼叫傳送,並由接收程序使用以存取 訊息佇列。

快速訊息佇列僅支援 C++ 和裝置 搭載 Android 8.0 以上版本。

MessageQueue 類型

Android 支援兩種佇列類型 (稱為「變種版本」):

  • 未同步處理的佇列會溢位,因此可以有多個 讀者;每個讀取器都必須即時讀取或遺失資料
  • 已同步佇列無法溢位, 只有一位讀者

這兩種佇列類型都不得發生反向溢位現象 (從空白佇列讀取資料) 失敗),且只能有一位寫入者。

未同步

未同步處理的佇列只有一個寫入者,但可以有任意數量的 讀者。有一個佇列的寫入位置;但每位讀者 追蹤獨立讀取位置

只要、寫入佇列並一律成功 (未檢查溢位) 不會超過所設的佇列容量 (寫入大於 佇列容量會立即失敗)。因為每位讀者 不必等每位讀者讀完所有資料 ,任何新的寫入作業需要空間時,就會從佇列中消失。

讀取器資料到期前,讀取器必須負責擷取資料 佇列。讀取作業嘗試讀取的資料量超過可用的 失敗 (如未封鎖),或是等待系統收集足夠的資料 (如果 封鎖)。嘗試讀取的資料量一律超過佇列容量 就會立即失敗

如果讀者難以跟上寫入者的關係,導致資料量 大於佇列容量, 下次讀取時不會傳回資料;而是將讀取器的 等於最新寫入位置,然後傳回失敗。如果 溢位後會檢查可供讀取的資料,但在下次讀取之前, 顯示可供讀取的資料量超過佇列容量, 發生溢位。(如果佇列與檢查可用資料之間出現溢位 嘗試讀取該資料,唯一表示溢位的指標是 讀取失敗)。

未同步處理佇列的讀取者可能不想重設 佇列的讀取和寫入指標。因此在從 Container Registry 建立佇列時 描述元讀取器應針對 `resetPointers` 使用 `false` 引數 參數。

已同步處理

同步佇列具有一個寫入者和一個讀取者, 以及單一讀取位置相較於 佇列擁有或讀取的資料量超過目前佇列所保留的空間。 取決於阻斷或非阻斷的寫入或讀取函式 嘗試超過可用空間時,或資料傳回失敗 或立即封鎖,直到所需作業完成為止嘗試 超過佇列容量的讀取或寫入資料量一律會立即失敗。

設定 FMQ

訊息佇列需要多個 MessageQueue 物件:一個至 以及一或多個可讀取的內容沒有煽情露骨內容 設定寫入或讀取作業時使用的物件取決於 確保沒有物件用於讀取和寫入 最多有一個 Writer,而對同步佇列而言,最多只能有一個 Writer 讀取器。

建立第一個 MessageQueue 物件

訊息佇列會建立並設定單一呼叫:

#include <fmq/MessageQueue.h>
using android::hardware::kSynchronizedReadWrite;
using android::hardware::kUnsynchronizedWrite;
using android::hardware::MQDescriptorSync;
using android::hardware::MQDescriptorUnsync;
using android::hardware::MessageQueue;
....
// For a synchronized non-blocking FMQ
mFmqSynchronized =
  new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite>
      (kNumElementsInQueue);
// For an unsynchronized FMQ that supports blocking
mFmqUnsynchronizedBlocking =
  new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
      (kNumElementsInQueue, true /* enable blocking operations */);
  • MessageQueue<T, flavor>(numElements) 初始化器 會建立並初始化支援訊息佇列功能的物件。
  • MessageQueue<T, flavor>(numElements, configureEventFlagWord) 初始化工具會建立並初始化物件 ,藉由封鎖功能支援訊息佇列功能。
  • flavor 可以是 kSynchronizedReadWrite,適用於 同步處理佇列,或 kUnsynchronizedWrite (表示未同步處理) 佇列。
  • uint16_t (在此範例中) 可以是任何值 會解讀的 HIDL 定義類型 未包含巢狀緩衝區 (無 stringvec 類型)、控制代碼或介面。
  • kNumElementsInQueue 表示佇列大小 (以 項目;會決定分配的共用記憶體緩衝區大小 佇列。

建立第二個 MessageQueue 物件

訊息佇列的第二端會使用 從第一端取得的 MQDescriptor 物件。 MQDescriptor 物件會透過 HIDL 或 AIDL 遠端程序呼叫 (RPC) 傳送至程序 保留訊息佇列的第二端。 MQDescriptor 包含佇列相關資訊,包括:

  • 對應緩衝區和寫入指標的資訊。
  • 對應讀取指標的資訊 (如果佇列保持同步的話)。
  • 事件標記字詞對應的資訊 (如果佇列遭到封鎖)。
  • 物件類型 (<T, flavor>),其中包含 HIDL 定義類型 佇列元素和佇列變種版本 (同步或未同步)。

MQDescriptor 物件可用來建構 MessageQueue 物件:

MessageQueue<T, flavor>::MessageQueue(const MQDescriptor<T, flavor>& Desc, bool resetPointers)

resetPointers 參數會指出是否要重設讀取 建立這個 MessageQueue 物件時,並將位置寫入 0。 在未同步處理的佇列中,讀取位置 (每個 未同步處理佇列中的 MessageQueue 個物件會一律設為 0 建立容器MQDescriptor 通常會在期間初始化 建立第一個訊息佇列物件如要進一步控管共用權限 您可以手動設定 MQDescriptor (MQDescriptor 定義於 system/libhidl/base/include/hidl/MQDescriptor.h) 然後按照本節所述的方式建立每個 MessageQueue 物件。

封鎖佇列和事件旗標

根據預設,佇列不支援封鎖讀取/寫入作業。有兩種 封鎖讀取/寫入呼叫

  • 簡短形式,內含三個參數 (資料點、項目數量、 逾時)。支援對單一單一讀取/寫入作業進行封鎖 佇列。使用這份表單時,佇列會處理事件標記和位元遮罩 而且第一個訊息佇列物件必須 透過 true 的第二個參數初始化。例如:
    // For an unsynchronized FMQ that supports blocking
    mFmqUnsynchronizedBlocking =
      new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
          (kNumElementsInQueue, true /* enable blocking operations */);
    
  • 長格式,內含 6 個參數 (包括事件標記和位元遮罩)。 支援在多個佇列之間使用共用 EventFlag 物件 並可指定要使用的通知位元遮罩。在此情況下, 必須為每個讀取與寫入呼叫提供事件旗標和位元遮罩。

如果是長格式,EventFlag 能以 每 readBlocking()writeBlocking() 呼叫一次,下列其中一項 您可以透過內部事件旗標初始化佇列,之後 從該佇列的 MessageQueue 物件中擷取 getEventFlagWord(),並用於建立 EventFlag 物件,以便與其他 FMQ 搭配使用或者,您也可以使用 EventFlag 物件可透過任何適用的共用方式進行初始化 記憶體用量

一般來說,每個佇列只應使用其中一種非阻斷式短篇影片 封鎖或長篇影片封鎖功能混合使用時雖然沒有錯誤,但還是要小心 需要編寫程式才能取得理想的結果

將記憶體標示為唯讀

根據預設,共用記憶體具有讀取和寫入權限。未同步 佇列 (kUnsynchronizedWrite),寫入者可能需要移除所有的寫入權限 才會說出 MQDescriptorUnsync 物件。如此可確保 程序無法寫入佇列,為避免 讀取及寫入資料 如果寫入者希望讀者每次使用 MQDescriptorUnsync 用於建立佇列讀取端,因此無法標記記憶體 並設為唯讀。這是 `MessageQueue` 建構函式的預設行為。如果已有 這個佇列的現有使用者,需要變更他們的程式碼,才能以 resetPointer=false

  • 寫入者:使用 MQDescriptor 檔案描述元呼叫 ashmem_set_prot_region 並將區域設為唯讀 (PROT_READ):
    int res = ashmem_set_prot_region(mqDesc->handle->data[0], PROT_READ)
  • 讀取者:使用 resetPointer=false 建立訊息佇列 ( 預設值為 true):
    mFmq = new (std::nothrow) MessageQueue(mqDesc, false);

使用 MessageQueue

MessageQueue 物件的公用 API 如下:

size_t availableToWrite()  // Space available (number of elements).
size_t availableToRead()  // Number of elements available.
size_t getQuantumSize()  // Size of type T in bytes.
size_t getQuantumCount() // Number of items of type T that fit in the FMQ.
bool isValid() // Whether the FMQ is configured correctly.
const MQDescriptor<T, flavor>* getDesc()  // Return info to send to other process.

bool write(const T* data)  // Write one T to FMQ; true if successful.
bool write(const T* data, size_t count) // Write count T's; no partial writes.

bool read(T* data);  // read one T from FMQ; true if successful.
bool read(T* data, size_t count);  // Read count T's; no partial reads.

bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0);
bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);

// Allows multiple queues to share a single event flag word
std::atomic<uint32_t>* getEventFlagWord();

bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr); // Blocking write operation for count Ts.

bool readBlocking(T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr) // Blocking read operation for count Ts;

//APIs to allow zero copy read/write operations
bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);
bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);

可使用 availableToWrite()availableToRead() 來判斷單一作業可傳輸的資料量。在 未同步處理的佇列:

  • availableToWrite() 一律會傳回佇列的容量。
  • 每個讀取器都有自己的讀取位置,且會自行計算 availableToRead()
  • 從緩慢讀取器的角度來看,佇列可溢位。 這可能會導致 availableToRead() 傳回的值大於 佇列的大小溢位後第一次讀取作業失敗,並導致 該讀取器的讀取位置設為等於目前寫入指標 是否回報溢位 availableToRead()

read()write() 方法會傳回 true,如果所有要求的資料可以 (且) 移轉至/傳出, 佇列。這些方法不會封鎖代表成功 (並傳回 true) 或立即傳回失敗 (false)。

readBlocking()writeBlocking() 方法等待 直到要求作業完成或逾時為止 ( 如果 timeOutNanos 的值為 0,表示一律不會逾時)。

使用事件標記字詞執行封鎖作業。根據預設 每個佇列都會建立並使用自己的旗標字詞,藉此支援 《readBlocking()》和《writeBlocking()》。有可能 多個佇列來共用一個字詞,因此程序可等待寫入作業或 讀取任何佇列。如要指向佇列的事件標記字詞 呼叫 getEventFlagWord()、該指標 (或任何 指向合適的共用記憶體位置的指標),可用來建立 EventFlag 物件會傳入 readBlocking()writeBlocking() 不同 佇列。《readNotification》和《writeNotification》 參數會判斷該使用事件標記中的哪些位元來傳送讀取作業和 寫入該佇列的寫入作業「readNotification」和 writeNotification 是 32 位元位元遮罩。

readBlocking() 會在 writeNotification 位元上等待; 如果參數為 0,則呼叫一律失敗。如果 readNotification 的值為 0,呼叫不會失敗,而是 不會設定任何通知位元在同步處理的佇列中 這表示對應的 writeBlocking() 呼叫 除非機器在其他地方設定,否則不會被喚醒。在未同步處理的佇列中, writeBlocking() 不會等待 (仍應用來設定 寫入通知位元),而適合讀取未設定任何 通知位元。同樣地,如果發生下列情況,writeblocking() 就會失敗 readNotification 為 0,而成功寫入設定了指定 writeNotification 位元。

如要一次等待多個佇列,請使用 EventFlag 物件的 wait() 方法,等待通知位元遮罩。 wait() 方法會傳回狀態字詞,其中含有造成 喚醒設定。然後,這項資訊就會用於驗證對應的佇列 足以讓所需寫入/讀取作業有足夠的空間或資料 非封鎖 write()/read()。取得發布作業 通知,請使用其他對 EventFlag 的呼叫 wake() 方法。如需 EventFlag 的定義,請參閱 抽象化機制,請參考 system/libfmq/include/fmq/EventFlag.h

零複製作業

read/write/readBlocking/writeBlocking() API 會將輸入/輸出緩衝區的指標做為引數,並使用 memcpy() 會在內部呼叫,藉此在相同和 FMQ 環形緩衝區。為提升效能,Android 8.0 以上版本提供一組 可直接指標存取環形緩衝區的 API,免除 您必須使用 memcpy 呼叫。

請使用下列公用 API,執行零複製 FMQ 作業:

bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);

bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);
  • beginWrite 方法可為 FMQ 環提供基本指標 緩衝區。資料寫入完成後,請使用 commitWrite() 修訂。 beginRead/commitRead 方法的運作方式相同。
  • beginRead/Write 方法會做為輸入內容 要讀取/寫入的訊息數,並傳回布林值表示 執行可讀取/寫入的服務如果讀取或寫入可能原因,memTx struct 會填入基本指標,可用於直接指標 存取環形緩衝區共用記憶體。
  • MemRegion 結構體包含記憶體區塊的詳細資料。 包括基準指標 (記憶體區塊的基本位址) 和 的 T (以 HIDL 定義的記憶體區塊長度表示) 訊息佇列的類型)。
  • MemTransaction 結構包含兩個 MemRegion 結構體,firstsecond 做為讀取或寫入作業 環形緩衝區可能需要環繞佇列開頭。這個 這表示在 FMQ 讀取/寫入資料時,需要兩個基礎指標 環形緩衝區。

如何從 MemRegion 結構取得基本位址和長度:

T* getAddress(); // gets the base address
size_t getLength(); // gets the length of the memory region in terms of T
size_t getLengthInBytes(); // gets the length of the memory region in bytes

如要取得特定範圍內第一和第二個 MemRegion 的參照 MemTransaction 物件:

const MemRegion& getFirstRegion(); // get a reference to the first MemRegion
const MemRegion& getSecondRegion(); // get a reference to the second MemRegion

使用零複製 API 寫入 FMQ 的範例:

MessageQueueSync::MemTransaction tx;
if (mQueue->beginRead(dataLen, &tx)) {
    auto first = tx.getFirstRegion();
    auto second = tx.getSecondRegion();

    foo(first.getAddress(), first.getLength()); // method that performs the data write
    foo(second.getAddress(), second.getLength()); // method that performs the data write

    if(commitWrite(dataLen) == false) {
       // report error
    }
} else {
   // report error
}

下列輔助方法也是 MemTransaction 的一部分:

  • T* getSlot(size_t idx);
    傳回指向idx 本次 MemTransaction 中的 MemRegions 物件。如果 MemTransaction 物件代表記憶體 讀取/寫入 T 類型項目的區域,接著是 idx 介於 0 到 N-1 之間。
  • bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1);
    將 T 類型的 nMessages 項目寫入記憶體區域 物件描述,從索引 startIdx 開始。這個方法 使用 memcpy(),且不適合用於零副本 作業。如果 MemTransaction 物件代表 讀取/寫入 T 類型的項目,idx 的有效範圍是 介於 0 到 N-1 之間。
  • bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1);
    透過輔助方法讀取 T 類型的 nMessages 項目 自 startIdx起,物件所述記憶體區域。這個 方法使用 memcpy(),而非用於零副本 作業。

透過 HIDL 傳送佇列

在建立方面:

  1. 按照上述步驟建立訊息佇列物件。
  2. 請確認物件使用 isValid() 有效。
  3. 如果您要等待多個佇列,方法是傳送 EventFlag 轉換為長篇 readBlocking()/writeBlocking(),您可以擷取 事件標記指標 (使用 getEventFlagWord()), 為建立標記而初始化的 MessageQueue 物件 使用該旗標建立必要的 EventFlag 物件。
  4. 使用 MessageQueue getDesc() 方法即可 描述元物件
  5. .hal 檔案中,為方法提供類型的參數 fmq_syncfmq_unsync,其中T為 合適的 HIDL 定義類型。使用這個範本傳送 getDesc()至接收程序。

在接收端:

  1. 使用描述元物件建立 MessageQueue 物件。成為 請務必使用相同的佇列變種版本和資料類型,否則範本無法 下一步是使用 compile 方法 指定訓練方式,完成模型編譯
  2. 如果您先前擷取了事件旗標,請從對應的 MessageQueue 物件。
  3. 使用 MessageQueue 物件轉移資料。