Antrean Pesan Cepat (FMQ)

Infrastruktur remote procedure call (RPC) HIDL menggunakan mekanisme binder, yang berarti panggilan melibatkan overhead, memerlukan operasi kernel, dan dapat memicu tindakan penjadwal. Namun, untuk kasus saat data harus ditransfer antar proses dengan overhead yang lebih sedikit dan tanpa keterlibatan kernel, sistem Fast Message Queue (FMQ) akan digunakan.

FMQ membuat antrean pesan dengan properti yang diinginkan. Anda dapat mengirim objek MQDescriptorSync atau MQDescriptorUnsync melalui panggilan RPC HIDL dan objek tersebut digunakan oleh proses penerima untuk mengakses antrean pesan.

Jenis antrean

Android mendukung dua jenis antrean (dikenal sebagai ragam):

  • Antrean yang tidak disinkronkan diizinkan untuk meluap, dan dapat memiliki banyak pembaca; setiap pembaca harus membaca data tepat waktu atau kehilangannya.
  • Antrean Sinkron tidak diizinkan untuk meluap, dan hanya dapat memiliki satu pembaca.

Kedua jenis antrean tidak diizinkan untuk underflow (membaca dari antrean kosong gagal) dan hanya dapat memiliki satu penulis.

Antrean yang tidak disinkronkan

Antrean yang tidak disinkronkan hanya memiliki satu penulis, tetapi dapat memiliki sejumlah pembaca. Ada satu posisi tulis untuk antrean; tetapi, setiap pembaca terus melacak posisi baca independennya sendiri.

Operasi tulis ke antrean selalu berhasil (tidak diperiksa untuk mengetahui apakah terjadi overflow) selama tidak lebih besar dari kapasitas antrean yang dikonfigurasi (operasi tulis yang lebih besar dari kapasitas antrean akan langsung gagal). Karena setiap pembaca mungkin memiliki posisi baca yang berbeda, data akan keluar dari antrean setiap kali penulisan baru memerlukan ruang, bukan menunggu setiap pembaca membaca setiap bagian data.

Pembaca bertanggung jawab untuk mengambil data sebelum data tersebut keluar dari akhir antrean. Operasi baca yang mencoba membaca lebih banyak data daripada yang tersedia akan langsung gagal (jika tidak memblokir) atau menunggu data yang cukup tersedia (jika memblokir). Pembacaan yang mencoba membaca lebih banyak data daripada kapasitas antrean selalu gagal dengan segera.

Jika pembaca gagal mengimbangi penulis, sehingga jumlah data yang ditulis dan belum dibaca oleh pembaca tersebut melebihi kapasitas antrean, pembacaan berikutnya tidak akan menampilkan data; sebagai gantinya, pembacaan akan mereset posisi baca pembaca ke posisi tulis ditambah setengah dari kapasitas, lalu menampilkan kegagalan. Hal ini akan membuat setengah buffer tersedia untuk dibaca dan mencadangkan ruang untuk penulisan baru agar antrean tidak langsung terisi penuh lagi. Jika data yang tersedia untuk dibaca diperiksa setelah overflow, tetapi sebelum pembacaan berikutnya, data tersebut akan menampilkan lebih banyak data yang tersedia untuk dibaca daripada kapasitas antrean, yang menunjukkan bahwa overflow telah terjadi. (Jika antrean meluap antara memeriksa data yang tersedia dan mencoba membaca data tersebut, satu-satunya indikasi kelebihan kapasitas adalah pembacaan gagal.)

Antrean yang disinkronkan

Antrean tersinkron memiliki satu penulis dan satu pembaca dengan satu posisi menulis dan satu posisi membaca. Anda tidak dapat menulis lebih banyak data daripada ruang yang tersedia di antrean atau membaca lebih banyak data daripada yang saat ini disimpan di antrean. Bergantung pada apakah fungsi tulis atau baca pemblokiran atau tidak pemblokiran dipanggil, upaya untuk melebihi ruang atau data yang tersedia akan segera menampilkan kegagalan atau memblokir hingga operasi yang diinginkan dapat diselesaikan. Upaya untuk membaca atau menulis lebih banyak data daripada kapasitas antrean selalu langsung gagal.

Menyiapkan FMQ

Antrean pesan memerlukan beberapa objek MessageQueue: satu untuk diisi, dan satu atau beberapa untuk dibaca. Tidak ada konfigurasi eksplisit objek mana yang digunakan untuk menulis atau membaca; pengguna bertanggung jawab untuk memastikan bahwa tidak ada objek yang digunakan untuk membaca dan menulis, bahwa ada maksimal satu penulis, dan untuk antrean yang disinkronkan, bahwa ada maksimal satu pembaca.

Membuat objek MessageQueue pertama

Antrean pesan dibuat dan dikonfigurasi dengan satu panggilan:

#include <fmq/MessageQueue.h>
using android::hardware::kSynchronizedReadWrite;
using android::hardware::kUnsynchronizedWrite;
using android::hardware::MQDescriptorSync;
using android::hardware::MQDescriptorUnsync;
using android::hardware::MessageQueue;
....
// For a synchronized nonblocking FMQ
mFmqSynchronized =
  new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite>
      (kNumElementsInQueue);
// For an unsynchronized FMQ that supports blocking
mFmqUnsynchronizedBlocking =
  new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
      (kNumElementsInQueue, true /* enable blocking operations */);
  • Penginisialisasi MessageQueue<T, flavor>(numElements) membuat dan menginisialisasi objek yang mendukung fungsi antrean pesan.
  • Penginisialisasi MessageQueue<T, flavor>(numElements, configureEventFlagWord) membuat dan melakukan inisialisasi objek yang mendukung fungsi antrean pesan dengan pemblokiran.
  • flavor dapat berupa kSynchronizedReadWrite untuk antrean yang disinkronkan atau kUnsynchronizedWrite untuk antrean yang tidak disinkronkan.
  • uint16_t (dalam contoh ini) dapat berupa jenis yang ditentukan HIDL yang tidak melibatkan buffering bertingkat (tidak ada jenis string atau vec), handle, atau antarmuka.
  • kNumElementsInQueue menunjukkan ukuran antrean dalam jumlah entri; ukuran ini menentukan ukuran buffering memori bersama yang dialokasikan untuk antrean.

Membuat objek MessageQueue kedua

Sisi kedua antrean pesan dibuat menggunakan objek MQDescriptor yang diperoleh dari sisi pertama. Objek MQDescriptor dikirim melalui panggilan RPC HIDL atau AIDL ke proses yang menyimpan ujung kedua antrean pesan. MQDescriptor berisi informasi tentang antrean, termasuk:

  • Informasi untuk memetakan buffer dan pointer tulis.
  • Informasi untuk memetakan pointer baca (jika antrean disinkronkan).
  • Informasi untuk memetakan kata tanda peristiwa (jika antrean memblokir).
  • Jenis objek (<T, flavor>), yang mencakup jenis yang ditentukan HIDL elemen antrean dan ragam antrean (sinkron atau tidak sinkron).

Anda dapat menggunakan objek MQDescriptor untuk membuat objek MessageQueue:

MessageQueue<T, flavor>::MessageQueue(const MQDescriptor<T, flavor>& Desc, bool resetPointers)

Parameter resetPointers menunjukkan apakah akan mereset posisi baca dan tulis ke 0 saat membuat objek MessageQueue ini. Dalam antrean yang tidak disinkronkan, posisi baca (yang bersifat lokal untuk setiap objek MessageQueue dalam antrean yang tidak disinkronkan) selalu ditetapkan ke 0 selama pembuatan. Biasanya, MQDescriptor diinisialisasi selama pembuatan objek antrean pesan pertama. Untuk kontrol tambahan atas memori bersama, Anda dapat menyiapkan MQDescriptor secara manual (MQDescriptor ditentukan di system/libhidl/base/include/hidl/MQDescriptor.h), lalu membuat setiap objek MessageQueue seperti yang dijelaskan di bagian ini.

Antrean blok dan tanda peristiwa

Secara default, antrean tidak mendukung pemblokiran operasi baca dan tulis. Ada dua jenis pemblokiran panggilan baca dan tulis:

  • Bentuk singkat, dengan tiga parameter (pointer data, jumlah item, waktu tunggu), mendukung pemblokiran pada setiap operasi baca dan tulis pada satu antrean. Saat menggunakan formulir ini, antrean menangani flag peristiwa dan bitmask secara internal, dan objek antrean pesan pertama harus diinisialisasi dengan parameter kedua true. Contoh:
    // For an unsynchronized FMQ that supports blocking
    mFmqUnsynchronizedBlocking =
      new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
          (kNumElementsInQueue, true /* enable blocking operations */);
    
  • Bentuk panjang, dengan enam parameter (termasuk flag peristiwa dan bitmask), mendukung penggunaan objek EventFlag bersama di antara beberapa antrean dan memungkinkan penentuan bitmask notifikasi yang akan digunakan. Dalam hal ini, flag peristiwa dan bitmask harus disediakan ke setiap panggilan baca dan tulis.

Untuk bentuk panjang, Anda dapat menyediakan EventFlag secara eksplisit dalam setiap panggilan readBlocking() dan writeBlocking(). Anda dapat menginisialisasi salah satu antrean dengan flag peristiwa internal, yang kemudian harus diekstrak dari objek MessageQueue antrean tersebut menggunakan getEventFlagWord() dan digunakan untuk membuat objek EventFlag di setiap proses untuk digunakan dengan FMQ lainnya. Atau, Anda dapat melakukan inisialisasi objek EventFlag dengan memori bersama yang sesuai.

Secara umum, setiap antrean hanya boleh menggunakan salah satu dari pemblokiran non-pemblokiran, pemblokiran berbentuk singkat, atau pemblokiran berbentuk panjang. Menggabungkannya bukanlah sebuah error, tetapi pemrograman yang cermat diperlukan untuk mendapatkan hasil yang diinginkan.

Menandai memori sebagai hanya baca

Secara default, memori bersama memiliki izin baca dan tulis. Untuk antrean yang tidak disinkronkan (kUnsynchronizedWrite), penulis mungkin ingin menghapus izin tulis untuk semua pembaca sebelum membagikan objek MQDescriptorUnsync. Hal ini memastikan proses lain tidak dapat menulis ke antrean, yang direkomendasikan untuk melindungi dari bug atau perilaku buruk dalam proses pembaca. Jika penulis ingin pembaca dapat mereset antrean setiap kali mereka menggunakan MQDescriptorUnsync untuk membuat sisi baca antrean, memori tidak dapat ditandai sebagai hanya baca. Ini adalah perilaku default konstruktor MessageQueue. Jadi, jika ada pengguna antrean ini, kode mereka perlu diubah untuk membuat antrean dengan resetPointer=false.

  • Penulis: Panggil ashmem_set_prot_region dengan deskripsi file MQDescriptor dan region yang disetel ke hanya baca (PROT_READ):
    int res = ashmem_set_prot_region(mqDesc->handle->data[0], PROT_READ)
  • Pembaca: Buat antrean pesan dengan resetPointer=false (defaultnya adalah true):
    mFmq = new (std::nothrow) MessageQueue(mqDesc, false);

Menggunakan MessageQueue

API publik objek MessageQueue adalah:

size_t availableToWrite() // Space available (number of elements).
size_t availableToRead() // Number of elements available.
size_t getQuantumSize() // Size of type T in bytes.
size_t getQuantumCount() // Number of items of type T that fit in the FMQ.
bool isValid() // Whether the FMQ is configured correctly.
const MQDescriptor<T, flavor>* getDesc() // Return info to send to other process.

bool write(const T* data)  // Write one T to FMQ; true if successful.
bool write(const T* data, size_t count) // Write count T's; no partial writes.

bool read(T* data); // read one T from FMQ; true if successful.
bool read(T* data, size_t count); // Read count T's; no partial reads.

bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0);
bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);

// Allows multiple queues to share a single event flag word
std::atomic<uint32_t>* getEventFlagWord();

bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr); // Blocking write operation for count Ts.

bool readBlocking(T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr) // Blocking read operation for count Ts;

// APIs to allow zero copy read/write operations
bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);
bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);

Anda dapat menggunakan availableToWrite() dan availableToRead() untuk menentukan jumlah data yang dapat ditransfer dalam satu operasi. Dalam antrean yang tidak disinkronkan:

  • availableToWrite() selalu menampilkan kapasitas antrean.
  • Setiap pembaca memiliki posisi bacanya sendiri dan melakukan penghitungannya sendiri untuk availableToRead().
  • Dari sudut pandang pembaca lambat, antrean diizinkan untuk meluap; hal ini dapat menyebabkan availableToRead() menampilkan nilai yang lebih besar dari ukuran antrean. Pembacaan pertama setelah overflow gagal dan mengakibatkan posisi baca untuk pembaca tersebut ditetapkan sama dengan pointer tulis saat ini, baik overflow dilaporkan melalui availableToRead() maupun tidak.

Metode read() dan write() menampilkan true jika semua data yang diminta dapat (dan telah) ditransfer ke dan dari antrean. Metode ini tidak memblokir; metode ini berhasil (dan menampilkan true), atau langsung menampilkan kegagalan (false).

Metode readBlocking() dan writeBlocking() menunggu hingga operasi yang diminta dapat diselesaikan, atau hingga waktu tunggunya habis (nilai timeOutNanos 0 berarti tidak pernah habis waktu tunggunya).

Operasi pemblokiran diterapkan menggunakan kata tanda peristiwa. Secara default, setiap antrean membuat dan menggunakan kata flag-nya sendiri untuk mendukung bentuk singkat readBlocking() dan writeBlocking(). Beberapa antrean dapat berbagi satu kata, sehingga proses dapat menunggu operasi tulis atau baca ke antrean mana pun. Dengan memanggil getEventFlagWord(), Anda bisa mendapatkan pointer ke kata flag peristiwa antrean, dan Anda dapat menggunakan pointer tersebut (atau pointer ke lokasi memori bersama yang sesuai) untuk membuat objek EventFlag yang akan diteruskan ke bentuk panjang readBlocking() dan writeBlocking()untuk antrean yang berbeda. Parameter readNotification dan writeNotification menunjukkan bit mana dalam flag peristiwa yang harus digunakan untuk memberi sinyal operasi baca dan menulis pada antrean tersebut. readNotification dan writeNotification adalah bitmask 32-bit.

readBlocking() menunggu bit writeNotification; jika parameter tersebut adalah 0, panggilan akan selalu gagal. Jika nilai readNotification adalah 0, panggilan tidak akan gagal, tetapi pembacaan yang berhasil tidak akan menetapkan bit notifikasi apa pun. Dalam antrean yang disinkronkan, hal ini berarti panggilan writeBlocking() yang sesuai tidak pernah aktif kecuali jika bit disetel di tempat lain. Dalam antrean yang tidak disinkronkan, writeBlocking() tidak menunggu (harus tetap digunakan untuk menetapkan bit notifikasi tulis), dan sesuai untuk pembacaan agar tidak menetapkan bit notifikasi apa pun. Demikian pula, writeblocking() akan gagal jika readNotification adalah 0, dan penulisan yang berhasil akan menetapkan bit writeNotification yang ditentukan.

Untuk menunggu beberapa antrean sekaligus, gunakan metode wait() objek EventFlag untuk menunggu bitmask notifikasi. Metode wait() menampilkan kata status dengan bit yang menyebabkan set bangun. Informasi ini kemudian digunakan untuk memverifikasi bahwa antrean yang sesuai memiliki ruang atau data yang cukup untuk operasi baca dan tulis yang diinginkan serta melakukan write() dan read() yang tidak memblokir. Untuk mendapatkan notifikasi setelah operasi, gunakan panggilan lain ke metode wake() objek EventFlag. Untuk definisi abstraksi EventFlag, lihat system/libfmq/include/fmq/EventFlag.h.

Operasi zero copy

Metode read, write, readBlocking, dan writeBlocking() mengambil pointer ke buffering input-output sebagai argumen dan menggunakan panggilan memcpy() secara internal untuk menyalin data antara buffer ring yang sama dan FMQ. Untuk meningkatkan performa, Android 8.0 dan yang lebih tinggi menyertakan kumpulan API yang memberikan akses pointer langsung ke ring buffer, sehingga menghilangkan kebutuhan untuk menggunakan panggilan memcpy.

Gunakan API publik berikut untuk operasi FMQ tanpa salinan:

bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);

bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);
  • Metode beginWrite menyediakan pointer dasar ke buffering ring FMQ. Setelah data ditulis, komit menggunakan commitWrite(). Metode beginRead dan commitRead berfungsi dengan cara yang sama.
  • Metode beginRead dan Write menggunakan sebagai input jumlah pesan yang akan dibaca dan ditulis, serta menampilkan boolean yang menunjukkan apakah baca atau tulis dapat dilakukan. Jika operasi baca atau tulis memungkinkan, struct memTx akan diisi dengan pointer dasar yang dapat digunakan untuk akses pointer langsung ke memori bersama buffering ring.
  • Struct MemRegion berisi detail tentang blok memori, termasuk pointer dasar (alamat dasar blok memori) dan panjang dalam hal T (panjang blok memori dalam hal jenis antrean pesan yang ditentukan HIDL).
  • Struktur MemTransaction berisi dua struktur MemRegion, first, dan second karena operasi baca atau tulis ke ring buffer mungkin memerlukan penggabungan ke awal antrean. Hal ini berarti bahwa dua pointer dasar diperlukan untuk membaca dan menulis data ke dalam buffer ring FMQ.

Untuk mendapatkan alamat dasar dan panjang dari struct MemRegion:

T* getAddress(); // gets the base address
size_t getLength(); // gets the length of the memory region in terms of T
size_t getLengthInBytes(); // gets the length of the memory region in bytes

Untuk mendapatkan referensi ke struct MemRegion pertama dan kedua dalam objek MemTransaction:

const MemRegion& getFirstRegion(); // get a reference to the first MemRegion
const MemRegion& getSecondRegion(); // get a reference to the second MemRegion

Contoh penulisan ke FMQ menggunakan API zero copy:

MessageQueueSync::MemTransaction tx;
if (mQueue->beginRead(dataLen, &tx)) {
    auto first = tx.getFirstRegion();
    auto second = tx.getSecondRegion();

    foo(first.getAddress(), first.getLength()); // method that performs the data write
    foo(second.getAddress(), second.getLength()); // method that performs the data write

    if(commitWrite(dataLen) == false) {
       // report error
    }
} else {
   // report error
}

Metode bantuan berikut juga merupakan bagian dari MemTransaction:

  • T* getSlot(size_t idx); menampilkan pointer ke slot idx dalam MemRegions yang merupakan bagian dari objek MemTransaction ini. Jika objek MemTransaction mewakili region memori untuk membaca dan menulis N item jenis T, rentang idx yang valid adalah antara 0 dan N-1.
  • bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1); menulis item nMessages dari jenis T ke dalam region memori yang dijelaskan oleh objek, mulai dari indeks startIdx. Metode ini menggunakan memcpy() dan tidak dimaksudkan untuk digunakan untuk operasi zero copy. Jika objek MemTransaction mewakili memori untuk membaca dan menulis N item jenis T, rentang idx yang valid adalah antara 0 dan N-1.
  • bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1); adalah metode bantuan untuk membaca item nMessages dari jenis T dari wilayah memori yang dijelaskan oleh objek mulai dari startIdx. Metode ini menggunakan memcpy() dan tidak dimaksudkan untuk digunakan untuk operasi zero copy.

Mengirim antrean melalui HIDL

Di sisi pembuatan:

  1. Buat objek antrean pesan seperti yang dijelaskan di atas.
  2. Verifikasi bahwa objek valid dengan isValid().
  3. Jika Anda menunggu beberapa antrean dengan meneruskan EventFlag ke dalam bentuk panjang readBlocking() atau writeBlocking(), Anda dapat mengekstrak pointer flag peristiwa (menggunakan getEventFlagWord()) dari objek MessageQueue yang diinisialisasi untuk membuat flag, dan menggunakan flag tersebut untuk membuat objek EventFlag yang diperlukan.
  4. Gunakan metode MessageQueue getDesc() untuk mendapatkan objek deskripsi.
  5. Dalam file HAL, berikan parameter jenis fmq_sync atau fmq_unsync ke metode dengan T adalah jenis yang ditentukan HIDL yang sesuai. Gunakan ini untuk mengirim objek yang ditampilkan oleh getDesc() ke proses penerimaan.

Di sisi penerima:

  1. Gunakan objek deskripsi untuk membuat objek MessageQueue. Gunakan ragam antrean dan jenis data yang sama, atau template akan gagal dikompilasi.
  2. Jika Anda mengekstrak flag peristiwa, ekstrak flag dari objek MessageQueue yang sesuai dalam proses penerimaan.
  3. Gunakan objek MessageQueue untuk mentransfer data.