Hàng đợi tin nhắn nhanh (FMQ)

Cơ sở hạ tầng lệnh gọi quy trình từ xa (RPC) của HIDL sử dụng cơ chế liên kết, nghĩa là các lệnh gọi liên quan đến mức hao tổn, yêu cầu hoạt động trong nhân và có thể kích hoạt thao tác của trình lập lịch. Tuy nhiên, đối với các trường hợp phải chuyển dữ liệu giữa các quy trình với mức hao tổn thấp hơn và không liên quan đến nhân, hệ thống Hàng đợi thông báo nhanh (FMQ) sẽ được sử dụng.

FMQ tạo các hàng đợi thông báo có các thuộc tính mong muốn. Bạn có thể gửi đối tượng MQDescriptorSync hoặc MQDescriptorUnsync qua lệnh gọi RPC HIDL và đối tượng này được quy trình nhận sử dụng để truy cập vào hàng đợi thông báo.

Loại hàng đợi

Android hỗ trợ hai loại hàng đợi (còn gọi là phiên bản):

  • Hàng đợi không đồng bộ hoá được phép tràn và có thể có nhiều trình đọc; mỗi độc giả phải đọc dữ liệu kịp thời hoặc bị mất dữ liệu đó.
  • Hàng đợi đã đồng bộ hoá không được phép tràn và chỉ có thể có một trình đọc.

Cả hai loại hàng đợi đều không được phép thiếu dữ liệu (không đọc được từ hàng đợi trống) và chỉ có thể có một trình ghi.

Hàng đợi không đồng bộ

Hàng đợi chưa đồng bộ hoá chỉ có một người viết nhưng có thể có số lượng trình đọc bất kỳ. Có một vị trí ghi cho hàng đợi; tuy nhiên, mỗi độc giả theo dõi vị trí đọc độc lập của riêng mình.

Các hoạt động ghi vào hàng đợi luôn thành công (không được kiểm tra tình trạng tràn) miễn là các hoạt động đó không lớn hơn dung lượng hàng đợi đã định cấu hình (các hoạt động ghi lớn hơn dung lượng hàng đợi sẽ ngay lập tức không thành công). Vì mỗi trình đọc có thể có một vị trí đọc khác nhau, thay vì chờ mọi trình đọc đọc mọi phần dữ liệu, dữ liệu sẽ rơi khỏi hàng đợi bất cứ khi nào các hoạt động ghi mới cần không gian.

Độc giả chịu trách nhiệm truy xuất dữ liệu trước khi dữ liệu đó rơi vào cuối hàng đợi. Quá trình đọc cố gắng đọc nhiều dữ liệu hơn mức có sẵn sẽ thất bại ngay lập tức (nếu không chặn) hoặc chờ có đủ dữ liệu (nếu đang chặn). Quá trình đọc cố gắng đọc nhiều dữ liệu hơn dung lượng của hàng đợi sẽ luôn bị lỗi ngay lập tức.

Nếu trình đọc không theo kịp trình ghi, do đó, lượng dữ liệu được ghi và chưa được trình đọc đó đọc lớn hơn dung lượng hàng đợi, thì lượt đọc tiếp theo sẽ không trả về dữ liệu; thay vào đó, lượt đọc này sẽ đặt lại vị trí đọc của trình đọc bằng vị trí ghi gần đây nhất, sau đó trả về lỗi. Nếu dữ liệu có thể đọc được kiểm tra sau khi tràn nhưng trước lần đọc tiếp theo, thì dữ liệu có thể đọc sẽ nhiều hơn dung lượng hàng đợi, cho biết đã xảy ra tình trạng tràn. (Nếu hàng đợi tràn giữa việc kiểm tra dữ liệu có sẵn và cố gắng đọc dữ liệu đó, thì chỉ báo duy nhất về tình trạng tràn là việc đọc không thành công.)

Hàng đợi đã đồng bộ hoá

Hàng đợi được đồng bộ hoá có một trình ghi và một trình đọc với một vị trí ghi và một vị trí đọc. Không thể ghi nhiều dữ liệu hơn mức có không gian cho hàng đợi hoặc đọc nhiều dữ liệu hơn hàng đợi hiện đang lưu giữ. Tuỳ thuộc vào việc hàm đọc hoặc ghi có bị chặn hay không, các thao tác vượt quá dung lượng hoặc dữ liệu có sẵn sẽ trả về lỗi ngay lập tức hoặc chặn cho đến khi hoàn tất thao tác mong muốn. Các nỗ lực đọc hoặc ghi nhiều dữ liệu hơn dung lượng hàng đợi luôn không thành công ngay lập tức.

Thiết lập FMQ

Hàng đợi thông báo yêu cầu nhiều đối tượng MessageQueue: một đối tượng để ghi và một hoặc nhiều đối tượng để đọc. Không có cấu hình rõ ràng về việc đối tượng nào được dùng để ghi hoặc đọc; người dùng chịu trách nhiệm đảm bảo rằng không có đối tượng nào được sử dụng cho cả thao tác đọc và ghi, chỉ có tối đa một trình ghi và đối với các hàng đợi được đồng bộ hoá có tối đa một trình đọc.

Tạo đối tượng MessageQueue đầu tiên

Một hàng đợi thông báo được tạo và định cấu hình bằng một lệnh gọi duy nhất:

#include <fmq/MessageQueue.h>
using android::hardware::kSynchronizedReadWrite;
using android::hardware::kUnsynchronizedWrite;
using android::hardware::MQDescriptorSync;
using android::hardware::MQDescriptorUnsync;
using android::hardware::MessageQueue;
....
// For a synchronized nonblocking FMQ
mFmqSynchronized =
  new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite>
      (kNumElementsInQueue);
// For an unsynchronized FMQ that supports blocking
mFmqUnsynchronizedBlocking =
  new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
      (kNumElementsInQueue, true /* enable blocking operations */);
  • Trình khởi chạy MessageQueue<T, flavor>(numElements) tạo và khởi chạy một đối tượng hỗ trợ chức năng hàng đợi thông báo.
  • Trình khởi tạo MessageQueue<T, flavor>(numElements, configureEventFlagWord) tạo và khởi chạy một đối tượng hỗ trợ chức năng hàng đợi thông báo bằng tính năng chặn.
  • flavor có thể là kSynchronizedReadWrite đối với hàng đợi đồng bộ hoặc kUnsynchronizedWrite đối với hàng đợi không đồng bộ.
  • uint16_t (trong ví dụ này) có thể là bất kỳ loại do HIDL xác định nào không liên quan đến vùng đệm lồng nhau (không có loại string hoặc vec), tay điều khiển hoặc giao diện.
  • kNumElementsInQueue cho biết kích thước của hàng đợi theo số mục nhập; kích thước này xác định kích thước của vùng đệm bộ nhớ dùng chung được phân bổ cho hàng đợi.

Tạo đối tượng MessageQueue thứ hai

Phía thứ hai của hàng đợi thông báo được tạo bằng cách sử dụng đối tượng MQDescriptor lấy được từ phía đầu tiên. Đối tượng MQDescriptor được gửi qua lệnh gọi RPC HIDL hoặc AIDL đến quy trình giữ đầu thứ hai của hàng đợi thông báo. MQDescriptor chứa thông tin về hàng đợi, bao gồm:

  • Thông tin để ánh xạ vùng đệm và con trỏ ghi.
  • Thông tin để liên kết con trỏ đọc (nếu hàng đợi được đồng bộ hoá).
  • Thông tin để liên kết từ cờ sự kiện (nếu hàng đợi đang chặn).
  • Loại đối tượng (<T, flavor>), bao gồm cả loại do HIDL xác định của các phần tử hàng đợi và phiên bản hàng đợi (đồng bộ hoặc không đồng bộ).

Bạn có thể sử dụng đối tượng MQDescriptor để tạo đối tượng MessageQueue:

MessageQueue<T, flavor>::MessageQueue(const MQDescriptor<T, flavor>& Desc, bool resetPointers)

Tham số resetPointers cho biết liệu có đặt lại vị trí đọc và ghi thành 0 trong khi tạo đối tượng MessageQueue này hay không. Trong hàng đợi không đồng bộ, vị trí đọc (là vị trí cục bộ của từng đối tượng MessageQueue trong hàng đợi không đồng bộ) luôn được đặt thành 0 trong quá trình tạo. Thông thường, MQDescriptor được khởi tạo trong quá trình tạo đối tượng hàng đợi thông báo đầu tiên. Để có thêm quyền kiểm soát đối với bộ nhớ dùng chung, bạn có thể thiết lập MQDescriptor theo cách thủ công (MQDescriptor được định nghĩa trong system/libhidl/base/include/hidl/MQDescriptor.h), sau đó tạo mọi đối tượng MessageQueue như mô tả trong phần này.

Chặn hàng đợi và cờ sự kiện

Theo mặc định, hàng đợi không hỗ trợ việc chặn lượt đọc và ghi. Có hai loại chặn lệnh gọi đọc và ghi:

  • Biểu mẫu ngắn, với 3 tham số (con trỏ dữ liệu, số lượng mục, thời gian chờ), hỗ trợ chặn trên từng thao tác đọc và ghi trên một hàng đợi. Khi sử dụng biểu mẫu này, hàng đợi này sẽ xử lý cờ sự kiện và mặt nạ bit trong nội bộ, đồng thời đối tượng hàng đợi thông báo đầu tiên phải được khởi chạy bằng tham số thứ hai là true. Ví dụ:
    // For an unsynchronized FMQ that supports blocking
    mFmqUnsynchronizedBlocking =
      new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
          (kNumElementsInQueue, true /* enable blocking operations */);
    
  • Biểu mẫu dài, với 6 tham số (bao gồm cờ sự kiện và mặt nạ bit), hỗ trợ sử dụng đối tượng EventFlag dùng chung giữa nhiều hàng đợi và cho phép chỉ định mặt nạ bit thông báo sẽ được sử dụng. Trong trường hợp này, bạn phải cung cấp cờ sự kiện và mặt nạ bit cho từng lệnh gọi đọc và ghi.

Đối với biểu mẫu dài, bạn có thể cung cấp EventFlag một cách rõ ràng trong mỗi lệnh gọi readBlocking()writeBlocking(). Bạn có thể khởi chạy một trong các hàng đợi bằng cờ sự kiện nội bộ, sau đó phải trích xuất cờ này từ các đối tượng MessageQueue của hàng đợi đó bằng cách sử dụng getEventFlagWord() và dùng để tạo đối tượng EventFlag trong mỗi quy trình để sử dụng với các FMQ khác. Ngoài ra, bạn có thể khởi tạo các đối tượng EventFlag bằng bất kỳ bộ nhớ dùng chung nào phù hợp.

Nhìn chung, mỗi hàng đợi chỉ nên sử dụng một trong các chế độ không chặn, chặn dạng ngắn hoặc chặn dạng dài. Việc kết hợp các loại này không phải là lỗi, nhưng bạn cần phải lập trình cẩn thận để có được kết quả mong muốn.

Đánh dấu bộ nhớ là chỉ có thể đọc

Theo mặc định, bộ nhớ dùng chung có quyền đọc và ghi. Đối với các hàng đợi không đồng bộ (kUnsynchronizedWrite), trình ghi có thể muốn xoá quyền ghi cho tất cả trình đọc trước khi phân phát các đối tượng MQDescriptorUnsync. Điều này đảm bảo các quy trình khác không thể ghi vào hàng đợi. Bạn nên làm như vậy để bảo vệ khỏi lỗi hoặc hành vi xấu trong các quy trình của trình đọc. Nếu trình ghi muốn trình đọc có thể đặt lại hàng đợi bất cứ khi nào họ sử dụng MQDescriptorUnsync để tạo phía đọc của hàng đợi, thì bộ nhớ không thể được đánh dấu là chỉ có thể đọc. Đây là hành vi mặc định của hàm khởi tạo MessageQueue. Vì vậy, nếu có người dùng hiện tại của hàng đợi này, bạn cần thay đổi mã của họ để tạo hàng đợi bằng resetPointer=false.

  • Trình ghi: Gọi ashmem_set_prot_region bằng chỉ số mô tả tệp MQDescriptor và đặt vùng thành chỉ có thể đọc (PROT_READ):
    int res = ashmem_set_prot_region(mqDesc->handle->data[0], PROT_READ)
  • Trình đọc: Tạo hàng đợi thông báo bằng resetPointer=false (mặc định là true):
    mFmq = new (std::nothrow) MessageQueue(mqDesc, false);

Sử dụng MessageQueue

API công khai của đối tượng MessageQueue là:

size_t availableToWrite() // Space available (number of elements).
size_t availableToRead() // Number of elements available.
size_t getQuantumSize() // Size of type T in bytes.
size_t getQuantumCount() // Number of items of type T that fit in the FMQ.
bool isValid() // Whether the FMQ is configured correctly.
const MQDescriptor<T, flavor>* getDesc() // Return info to send to other process.

bool write(const T* data)  // Write one T to FMQ; true if successful.
bool write(const T* data, size_t count) // Write count T's; no partial writes.

bool read(T* data); // read one T from FMQ; true if successful.
bool read(T* data, size_t count); // Read count T's; no partial reads.

bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0);
bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);

// Allows multiple queues to share a single event flag word
std::atomic<uint32_t>* getEventFlagWord();

bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr); // Blocking write operation for count Ts.

bool readBlocking(T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr) // Blocking read operation for count Ts;

// APIs to allow zero copy read/write operations
bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);
bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);

Bạn có thể sử dụng availableToWrite()availableToRead() để xác định lượng dữ liệu có thể được chuyển trong một thao tác. Trong hàng đợi không đồng bộ:

  • availableToWrite() luôn trả về dung lượng của hàng đợi.
  • Mỗi trình đọc có vị trí đọc riêng và thực hiện phép tính riêng cho availableToRead().
  • Từ quan điểm của một trình đọc chậm, hàng đợi được phép tràn; điều này có thể khiến availableToRead() trả về một giá trị lớn hơn kích thước của hàng đợi. Lần đọc đầu tiên sau khi một lần tràn không thành công và dẫn đến việc vị trí đọc của trình đọc đó được đặt bằng với con trỏ ghi hiện tại, cho dù tràn có được báo cáo thông qua availableToRead() hay không.

Các phương thức read()write() sẽ trả về true nếu tất cả dữ liệu được yêu cầu có thể (và đã) được chuyển đến và đi từ hàng đợi. Các phương thức này không chặn; chúng thành công (và trả về true) hoặc trả về lỗi (false) ngay lập tức.

Phương thức readBlocking()writeBlocking() sẽ chờ cho đến khi có thể hoàn tất thao tác đã yêu cầu hoặc cho đến khi hết thời gian chờ (giá trị timeOutNanos là 0 có nghĩa là không bao giờ hết thời gian chờ).

Các thao tác chặn được triển khai bằng cách sử dụng từ cờ sự kiện. Theo mặc định, mỗi hàng đợi sẽ tạo và sử dụng từ cờ riêng để hỗ trợ dạng ngắn của readBlocking()writeBlocking(). Nhiều hàng đợi có thể có chung một từ để một quy trình có thể đợi khi ghi hoặc đọc bất kỳ hàng đợi nào. Bằng cách gọi getEventFlagWord(), bạn có thể lấy con trỏ đến từ cờ sự kiện của hàng đợi và bạn có thể sử dụng con trỏ đó (hoặc bất kỳ con trỏ nào đến vị trí bộ nhớ dùng chung phù hợp) để tạo đối tượng EventFlag để truyền vào dạng dài của readBlocking()writeBlocking() cho một hàng đợi khác. Các thông số readNotificationwriteNotification cho biết bit nào trong cờ sự kiện sẽ được dùng để báo hiệu các hoạt động đọc và ghi trên hàng đợi đó. readNotificationwriteNotification là mặt nạ bit 32 bit.

readBlocking() chờ trên các bit writeNotification; nếu tham số đó là 0, thì lệnh gọi sẽ luôn không thành công. Nếu giá trị readNotification là 0, thì lệnh gọi sẽ không thành công nhưng lượt đọc thành công sẽ không đặt bất kỳ bit thông báo nào. Trong hàng đợi đồng bộ hoá, điều này có nghĩa là lệnh gọi writeBlocking() tương ứng sẽ không bao giờ thức dậy trừ khi bit được đặt ở nơi khác. Trong hàng đợi chưa đồng bộ hoá, writeBlocking() sẽ không đợi (vẫn nên dùng để đặt bit thông báo ghi) và phù hợp với các lượt đọc không đặt bất kỳ bit thông báo nào. Tương tự, writeblocking() sẽ không thành công nếu readNotification là 0 và một lượt ghi thành công sẽ đặt các bit writeNotification đã chỉ định.

Để đợi nhiều hàng đợi cùng một lúc, hãy sử dụng phương thức wait() của đối tượng EventFlag để đợi một mặt nạ bit của thông báo. Phương thức wait() trả về một từ trạng thái có các bit đã kích hoạt chế độ thức. Sau đó, thông tin này được dùng để xác minh rằng hàng đợi tương ứng có đủ không gian hoặc dữ liệu cho thao tác ghi và đọc mong muốn, đồng thời thực hiện write()read() không chặn. Để nhận thông báo về thao tác đăng, hãy sử dụng một lệnh gọi khác đến phương thức wake() của đối tượng EventFlag. Để biết định nghĩa về khái niệm trừu tượng EventFlag, hãy xem system/libfmq/include/fmq/EventFlag.h.

Không có hoạt động sao chép

Các phương thức read, write, readBlockingwriteBlocking() lấy con trỏ đến vùng đệm đầu vào-đầu ra làm đối số và sử dụng các lệnh gọi memcpy() nội bộ để sao chép dữ liệu giữa vùng đệm đó và vùng đệm vòng FMQ. Để cải thiện hiệu suất, Android 8.0 trở lên bao gồm một tập hợp API cung cấp quyền truy cập con trỏ trực tiếp vào vùng đệm vòng, loại bỏ việc cần sử dụng lệnh gọi memcpy.

Sử dụng các API công khai sau đây cho các thao tác FMQ không sao chép:

bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);

bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);
  • Phương thức beginWrite cung cấp con trỏ cơ sở vào vùng đệm vòng FMQ. Sau khi ghi dữ liệu, hãy xác nhận dữ liệu đó bằng commitWrite(). Các phương thức beginReadcommitRead hoạt động theo cách tương tự.
  • Phương thức beginReadWrite lấy số lượng thông báo cần đọc và ghi làm dữ liệu đầu vào, đồng thời trả về một boolean cho biết liệu có thể đọc hoặc ghi hay không. Nếu có thể đọc hoặc ghi, cấu trúc memTx sẽ được điền sẵn các con trỏ cơ sở có thể dùng để truy cập trực tiếp vào bộ nhớ dùng chung vùng đệm vòng.
  • Cấu trúc MemRegion chứa thông tin chi tiết về một khối bộ nhớ, bao gồm con trỏ cơ sở (địa chỉ cơ sở của khối bộ nhớ) và chiều dài theo T (chiều dài của khối bộ nhớ theo loại hàng đợi thông báo do HIDL xác định).
  • Cấu trúc MemTransaction chứa hai cấu trúc MemRegion, firstsecond vì việc đọc hoặc ghi vào vùng đệm vòng có thể yêu cầu bao bọc lại đầu hàng đợi. Điều này có nghĩa là cần có hai con trỏ cơ sở để đọc và ghi dữ liệu vào vùng đệm vòng FMQ.

Cách lấy địa chỉ cơ sở và chiều dài từ cấu trúc MemRegion:

T* getAddress(); // gets the base address
size_t getLength(); // gets the length of the memory region in terms of T
size_t getLengthInBytes(); // gets the length of the memory region in bytes

Để tham chiếu đến cấu trúc MemRegion đầu tiên và thứ hai trong đối tượng MemTransaction:

const MemRegion& getFirstRegion(); // get a reference to the first MemRegion
const MemRegion& getSecondRegion(); // get a reference to the second MemRegion

Ví dụ về cách ghi vào FMQ bằng các API không sao chép:

MessageQueueSync::MemTransaction tx;
if (mQueue->beginRead(dataLen, &tx)) {
    auto first = tx.getFirstRegion();
    auto second = tx.getSecondRegion();

    foo(first.getAddress(), first.getLength()); // method that performs the data write
    foo(second.getAddress(), second.getLength()); // method that performs the data write

    if(commitWrite(dataLen) == false) {
       // report error
    }
} else {
   // report error
}

Các phương thức trợ giúp sau đây cũng là một phần của MemTransaction:

  • T* getSlot(size_t idx); trả về một con trỏ cho ô idx trong MemRegions thuộc đối tượng MemTransaction này. Nếu đối tượng MemTransaction đại diện cho các vùng bộ nhớ để đọc và ghi N mục thuộc loại T, thì phạm vi hợp lệ của idx là từ 0 đến N-1.
  • bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1); ghi các mục nMessages thuộc loại T vào các vùng bộ nhớ do đối tượng mô tả, bắt đầu từ chỉ mục startIdx. Phương thức này sử dụng memcpy() và không được dùng cho thao tác sao chép rỗng. Nếu đối tượng MemTransaction biểu thị bộ nhớ để đọc và ghi N mục thuộc loại T, thì phạm vi hợp lệ của idx là từ 0 đến N-1.
  • bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1); là một phương thức trợ giúp để đọc các mục nMessages thuộc loại T từ các vùng bộ nhớ do đối tượng mô tả bắt đầu từ startIdx. Phương thức này sử dụng memcpy() và không được dùng cho thao tác sao chép rỗng.

Gửi hàng đợi qua HIDL

Về phía người tạo:

  1. Tạo một đối tượng hàng đợi thông báo như mô tả ở trên.
  2. Xác minh đối tượng là hợp lệ bằng isValid().
  3. Nếu đang chờ nhiều hàng đợi bằng cách truyền EventFlag vào dạng dài của readBlocking() hoặc writeBlocking(), bạn có thể trích xuất con trỏ cờ sự kiện (sử dụng getEventFlagWord()) từ đối tượng MessageQueue đã được khởi tạo để tạo cờ và sử dụng cờ đó để tạo đối tượng EventFlag cần thiết.
  4. Sử dụng phương thức MessageQueue getDesc() để lấy đối tượng mô tả.
  5. Trong tệp HAL, hãy cung cấp cho phương thức một tham số thuộc loại fmq_sync hoặc fmq_unsync, trong đó T là một loại phù hợp do HIDL xác định. Sử dụng thuộc tính này để gửi đối tượng do getDesc() trả về đến quy trình nhận.

Về phía người nhận:

  1. Sử dụng đối tượng mô tả để tạo đối tượng MessageQueue. Sử dụng cùng một phiên bản hàng đợi và loại dữ liệu, nếu không mẫu sẽ không biên dịch được.
  2. Nếu bạn đã trích xuất một cờ sự kiện, hãy trích xuất cờ đó từ đối tượng MessageQueue tương ứng trong quy trình nhận.
  3. Dùng đối tượng MessageQueue để chuyển dữ liệu.