快速訊息佇列 (FMQ)

如果您正在尋找 AIDL 支持,另請參閱FMQ with AIDL

HIDL 的遠端過程呼叫 (RPC) 基礎結構使用 Binder 機制,這意味著呼叫涉及開銷、需要核心操作,並且可能會觸發調度程序操作。然而,對於必須以較少開銷且無需核心參與的方式在進程之間傳輸資料的情況,可以使用快速訊息佇列 (FMQ) 系統。

FMQ 建立具有所需屬性的訊息佇列。 MQDescriptorSyncMQDescriptorUnsync物件可以透過 HIDL RPC 呼叫傳送,並由接收程序用來存取訊息佇列。

僅 C++ 以及運行 Android 8.0 及更高版本的裝置支援快速訊息佇列。

訊息隊列類型

Android 支援兩種佇列類型(稱為Flavor ):

  • 不同步佇列允許溢出,並且可以有很多讀者;每個讀者必須及時讀取數據,否則就會遺失數據。
  • 同步佇列不允許溢出,並且只能有一個讀者。

兩種佇列類型都不允許下溢(從空佇列讀取將失敗)並且只能有一個寫入者。

不同步

非同步佇列只有一個寫入者,但可以有任意數量的讀取者。隊列有一個寫入位置;然而,每個讀者都會追蹤自己獨立的閱讀位置。

只要寫入佇列的容量不大於配置的佇列容量(大於佇列容量的寫入會立即失敗),對佇列的寫入總是會成功(不會檢查溢位)。由於每個讀取器可能具有不同的讀取位置,因此每當新的寫入需要空間時,就允許資料從佇列中掉落,而不是等待每個讀取器讀取每個資料。

讀取器負責在資料落在佇列末尾之前檢索資料。嘗試讀取比可用資料更多的資料的讀取要么立即失敗(如果是非阻塞),要么等待足夠的資料可用(如果是阻塞)。嘗試讀取超出佇列容量的資料的讀取總是會立即失敗。

如果讀取器跟不上寫入器,導致該讀取器已寫入但尚未讀取的資料量大於佇列容量,則下次讀取不會傳回資料;相反,它將讀取器的讀取位置重置為等於最新的寫入位置,然後返回失敗。如果在溢位後、下次讀取之前檢查可讀取的數據,則顯示可讀取的數據多於佇列容量,表示發生了溢位。 (如果佇列在檢查可用資料和嘗試讀取該資料之間發生溢出,則溢出的唯一指示是讀取失敗。)

不同步佇列的讀取者可能不想重置佇列的讀寫指標。因此,當從描述符建立佇列時,讀取器應該對“resetPointers”參數使用“false”參數。

同步

同步佇列有一個寫入器和一個讀取器,具有單一寫入位置和單一讀取位置。寫入的資料不可能多於佇列可容納的空間,讀取的資料也不可能多於佇列目前容納的資料。根據呼叫的是阻塞還是非阻塞寫入或讀取函數,嘗試超出可用空間或資料要么立即返回失敗,要么阻塞直到完成所需的操作。嘗試讀取或寫入超過佇列容量的資料總是會立即失敗。

設定 FMQ

訊息佇列需要多個MessageQueue物件:一個要寫入,一個或多個要從中讀取。沒有明確配置哪個物件用於寫入或讀取;使用者需要確保沒有物件同時用於讀取和寫入,最多有一個寫入者,並且對於同步佇列,最多有一個讀取者。

建立第一個 MessageQueue 對象

透過一次呼叫即可建立和配置訊息隊列:

#include <fmq/MessageQueue.h>
using android::hardware::kSynchronizedReadWrite;
using android::hardware::kUnsynchronizedWrite;
using android::hardware::MQDescriptorSync;
using android::hardware::MQDescriptorUnsync;
using android::hardware::MessageQueue;
....
// For a synchronized non-blocking FMQ
mFmqSynchronized =
  new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite>
      (kNumElementsInQueue);
// For an unsynchronized FMQ that supports blocking
mFmqUnsynchronizedBlocking =
  new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
      (kNumElementsInQueue, true /* enable blocking operations */);
  • MessageQueue<T, flavor>(numElements)初始值設定項建立並初始化支援訊息佇列功能的物件。
  • MessageQueue<T, flavor>(numElements, configureEventFlagWord)初始值設定項目建立並初始化一個支援具有阻塞功能的訊息佇列功能的物件。
  • flavor可以是同步隊列的kSynchronizedReadWrite或非同步隊列的kUnsynchronizedWrite
  • uint16_t (在此範例中)可以是任何不涉及巢狀緩衝區(無stringvec類型)、句柄或介面的 HIDL 定義類型
  • kNumElementsInQueue表示佇列的大小(以條目數為單位);它決定將為佇列分配的共享記憶體緩衝區的大小。

建立第二個 MessageQueue 對象

訊息佇列的第二側是使用從第一側獲得的MQDescriptor物件建立的。 MQDescriptor物件透過 HIDL 或 AIDL RPC 呼叫傳送到將保存訊息佇列第二端的進程。 MQDescriptor包含有關佇列的信息,包括:

  • 映射緩衝區和寫入指標的資訊。
  • 映射讀指標的資訊(如果佇列是同步的)。
  • 用於對應事件標誌字的資訊(如果佇列阻塞)。
  • 物件類型 ( <T, flavor> ),包括HIDL 定義的佇列元素類型和佇列風味(同步或非同步)。

MQDescriptor物件可用來建構MessageQueue物件:

MessageQueue<T, flavor>::MessageQueue(const MQDescriptor<T, flavor>& Desc, bool resetPointers)

resetPointers參數指示在建立此MessageQueue物件時是否將讀寫位置重設為 0。在非同步佇列中,讀取位置(對於非同步佇列中的每個MessageQueue物件而言是本地的)在建立期間始終設定為 0。通常, MQDescriptor在建立第一個訊息佇列物件期間初始化。為了對共享記憶體進行額外控制,您可以手動設定MQDescriptorMQDescriptorsystem/libhidl/base/include/hidl/MQDescriptor.h中定義),然後按照本節所述建立每個MessageQueue物件。

阻塞隊列和事件標誌

預設情況下,佇列不支援阻塞讀/寫。有兩種阻塞讀/寫呼叫:

  • 簡短形式,有三個參數(資料指標、項目數、超時)。支援阻塞單一佇列上的單一讀取/寫入操作。使用此形式時,佇列將在內部處理事件標誌和位元遮罩,並且必須使用第二個參數true來初始化第一個訊息佇列物件。例如:
    // For an unsynchronized FMQ that supports blocking
    mFmqUnsynchronizedBlocking =
      new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
          (kNumElementsInQueue, true /* enable blocking operations */);
    
  • 長格式,具有六個參數(包括事件標誌和位元遮罩)。支援在多個佇列之間使用共用的EventFlag對象,並允許指定要使用的通知位元遮罩。在這種情況下,必須向每個讀取和寫入呼叫提供事件標誌和位元遮罩。

對於長格式,可以在每個readBlocking()writeBlocking()呼叫中明確提供EventFlag 。其中一個佇列可以使用內部事件標誌進行初始化,然後必須使用getEventFlagWord()從該佇列的MessageQueue物件中提取該標誌,並用於在每個進程中建立EventFlag物件以與其他 FMQ 一起使用。或者,可以使用任何合適的共享記憶體來初始化EventFlag物件。

一般來說,每個佇列應該只使用非阻塞、短格式阻塞或長格式阻塞中的一種。混合它們並不是錯誤,但需要仔細編程才能獲得所需的結果。

將記憶體標記為唯讀

預設情況下,共享記憶體具有讀寫權限。對於不同步佇列 ( kUnsynchronizedWrite ),編寫器可能希望在分發MQDescriptorUnsync物件之前​​刪除所有讀取器的寫入權限。這可確保其他進程無法寫入佇列,建議這樣做以防止讀取器進程中出現錯誤或不良行為。如果編寫者希望讀者在使用MQDescriptorUnsync建立佇列的讀取端時能夠重置佇列,則不能將記憶體標記為唯讀。這是“MessageQueue”建構函數的預設行為。因此,如果該佇列已經存在用戶,則需要更改他們的程式碼以使用resetPointer=false建構佇列。

  • 編寫器:使用MQDescriptor檔案描述子呼叫ashmem_set_prot_region ,並將區域設為唯讀 ( PROT_READ ):
    int res = ashmem_set_prot_region(mqDesc->handle->data[0], PROT_READ)
  • Reader:使用resetPointer=false建立訊息佇列(預設為true ):
    mFmq = new (std::nothrow) MessageQueue(mqDesc, false);

使用訊息隊列

MessageQueue物件的公共API是:

size_t availableToWrite()  // Space available (number of elements).
size_t availableToRead()  // Number of elements available.
size_t getQuantumSize()  // Size of type T in bytes.
size_t getQuantumCount() // Number of items of type T that fit in the FMQ.
bool isValid() // Whether the FMQ is configured correctly.
const MQDescriptor<T, flavor>* getDesc()  // Return info to send to other process.

bool write(const T* data)  // Write one T to FMQ; true if successful.
bool write(const T* data, size_t count) // Write count T's; no partial writes.

bool read(T* data);  // read one T from FMQ; true if successful.
bool read(T* data, size_t count);  // Read count T's; no partial reads.

bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0);
bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);

// Allows multiple queues to share a single event flag word
std::atomic<uint32_t>* getEventFlagWord();

bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr); // Blocking write operation for count Ts.

bool readBlocking(T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr) // Blocking read operation for count Ts;

//APIs to allow zero copy read/write operations
bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);
bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);

availableToWrite()availableToRead()可用來決定在單一操作中可以傳輸多少資料。在非同步佇列中:

  • availableToWrite()總是傳回佇列的容量。
  • 每個讀取器都有自己的讀取位置,並對availableToRead()進行自己的計算。
  • 從慢速讀取者的角度來看,隊列是允許溢出的;這可能會導致availableToRead()傳回的值大於佇列的大小。溢位後的第一次讀取將會失敗,並導致該讀取器的讀取位置設定為等於目前寫入指針,無論是否透過availableToRead()報告溢位。

如果所有請求的資料都可以(並且已經)傳輸到佇列或從佇列傳輸,則read()write()方法傳回true 。這些方法不會阻塞;它們要么成功(並返回true ),要么立即返回失敗( false )。

readBlocking()writeBlocking()方法會等待,直到請求的操作完成,或直到逾時( timeOutNanos值為 0 表示永不逾時)。

阻塞操作是使用事件標誌字來實現的。預設情況下,每個佇列都會建立並使用自己的標誌字來支援readBlocking()writeBlocking()的縮寫形式。多個佇列可以共用單字,以便進程可以等待對任何佇列的寫入或讀取。可以透過呼叫getEventFlagWord()來獲得指向隊列事件標誌字的指針,並且該指針(或任何指向合適共享內存位置的指針)可用於創建EventFlag對像以傳遞到readBlocking()的長形式中,並且writeBlocking()用於不同的隊列。 readNotificationwriteNotification參數告訴事件標誌中的哪些位元應該用來指示該佇列上的讀取和寫入。 readNotificationwriteNotification是 32 位元位元遮罩。

readBlocking()等待writeNotification位;如果該參數為 0,則呼叫始終失敗。如果readNotification值為 0,則呼叫不會失敗,但成功讀取不會設定任何通知位元。在同步佇列中,這表示對應的writeBlocking()呼叫永遠不會喚醒,除非該位元在其他地方設定。在非同步佇列中, writeBlocking()不會等待(它仍然應該用於設定寫入通知位),並且適合讀取不設定任何通知位。同樣,如果readNotification為 0,則writeblocking()將失敗,並且成功的寫入會設定指定的writeNotification位元。

若要同時等待多個佇列,請使用EventFlag物件的wait()方法來等待通知的位元遮罩。 wait()方法傳回一個狀態字,其中包含導致喚醒設定的位元。然後,該資訊用於驗證對應的隊列是否有足夠的空間或資料來進行所需的寫入/讀取操作,並執行非阻塞write() / read() 。若要取得操作後通知,請再次呼叫EventFlagwake()方法。有關EventFlag抽象的定義,請參閱system/libfmq/include/fmq/EventFlag.h

零複製操作

read / write / readBlocking / writeBlocking() API 將指向輸入/輸出緩衝區的指標作為參數,並在內部使用memcpy()呼叫在同一緩衝區和 FMQ 環形緩衝區之間複製資料。為了提高效能,Android 8.0 及更高版本包含一組 API,可提供對環形緩衝區的直接指標訪問,從而無需使用memcpy呼叫。

使用以下公共 API 進行零複製 FMQ 操作:

bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);

bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);
  • beginWrite方法提供指向 FMQ 環形緩衝區的基底指標。資料寫入後,使用commitWrite()提交。 beginRead / commitRead方法的作用相同。
  • beginRead / Write方法將要讀/寫的訊息數作為輸入,並傳回一個布林值,指示是否可以讀/寫。如果可以進行讀取或寫入,則memTx結構會填入基指針,這些基指針可用於直接指針存取環形緩衝區共享記憶體。
  • MemRegion結構體包含有關記憶體區塊的詳細信息,包括基底指標(記憶體區塊的基底位址)和以T表示的長度(以 HIDL 定義的訊息佇列類型表示的記憶體區塊的長度)。
  • MemTransaction結構包含兩個MemRegion結構, firstsecond作為對環形緩衝區的讀取或寫入可能需要環繞到佇列的開頭。這意味著需要兩個基指標來將資料讀/寫到 FMQ 環形緩衝區。

要從MemRegion結構體取得基底位址和長度:

T* getAddress(); // gets the base address
size_t getLength(); // gets the length of the memory region in terms of T
size_t getLengthInBytes(); // gets the length of the memory region in bytes

要取得MemTransaction物件中的第一個和第二個MemRegion參考:

const MemRegion& getFirstRegion(); // get a reference to the first MemRegion
const MemRegion& getSecondRegion(); // get a reference to the second MemRegion

使用零複製 API 寫入 FMQ 的範例:

MessageQueueSync::MemTransaction tx;
if (mQueue->beginRead(dataLen, &tx)) {
    auto first = tx.getFirstRegion();
    auto second = tx.getSecondRegion();

    foo(first.getAddress(), first.getLength()); // method that performs the data write
    foo(second.getAddress(), second.getLength()); // method that performs the data write

    if(commitWrite(dataLen) == false) {
       // report error
    }
} else {
   // report error
}

以下輔助方法也是MemTransaction的一部分:

  • T* getSlot(size_t idx);
    傳回指向屬於此MemTransaction物件一部分的MemRegions中的槽idx的指標。如果MemTransaction物件表示要讀取/寫入 N 個 T 類型項目的記憶體區域,則idx的有效範圍在 0 到 N-1 之間。
  • bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1);
    將 T 類型的nMessages項目寫入物件描述的記憶體區域,從索引startIdx開始。此方法使用memcpy() ,並不意味著用於零複製操作。如果MemTransaction物件代表記憶體讀取/寫入N個類型為T的項,則idx的有效範圍在0和N-1之間。
  • bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1);
    startIdx開始的物件描述的記憶體區域中讀取 T 類型的nMessages項目的幫助程式方法。此方法使用memcpy() ,並不意味著用於零複製操作。

透過 HIDL 發送隊列

在創作方面:

  1. 如上所述建立訊息隊列物件。
  2. 使用isValid()驗證物件是否有效。
  3. 如果您將透過將EventFlag傳遞到readBlocking() / writeBlocking()的長形式來等待多個佇列,則可以從初始化為建立標誌的MessageQueue物件中提取事件標誌指標(使用getEventFlagWord() ),並使用該標誌建立必要的EventFlag物件。
  4. 使用MessageQueue getDesc()方法取得描述符物件。
  5. .hal檔案中,為此方法提供一個fmq_sync類型的參數fmq_unsync其中T是適當的 HIDL 定義類型。使用它將getDesc()傳回的物件傳送到接收進程。

在接收端:

  1. 使用描述符物件建立MessageQueue物件。請務必使用相同的佇列風格和資料類型,否則模板將無法編譯。
  2. 如果提取了事件標誌,請從接收程序中對應的MessageQueue物件中提取該標誌。
  3. 使用MessageQueue物件來傳輸資料。