Szybka kolejka wiadomości (FMQ)

Jeśli szukasz obsługi AIDL, zobacz także FMQ z AIDL .

Infrastruktura zdalnego wywoływania procedur (RPC) HIDL wykorzystuje mechanizmy Binder, co oznacza, że ​​wywołania wiążą się z obciążeniem, wymagają operacji jądra i mogą wyzwalać działanie programu planującego. Jednakże w przypadkach, gdy dane muszą być przesyłane pomiędzy procesami przy mniejszym obciążeniu i bez angażowania jądra, używany jest system szybkiej kolejki komunikatów (FMQ).

FMQ tworzy kolejki komunikatów o pożądanych właściwościach. Obiekt MQDescriptorSync lub MQDescriptorUnsync może zostać wysłany poprzez wywołanie HIDL RPC i użyty przez proces odbierający w celu uzyskania dostępu do kolejki komunikatów.

Szybkie kolejki wiadomości są obsługiwane tylko w języku C++ i na urządzeniach z systemem Android 8.0 lub nowszym.

Typy kolejki wiadomości

Android obsługuje dwa typy kolejek (znane jako smaki ):

  • Niezsynchronizowane kolejki mogą się przepełniać i mogą mieć wielu czytelników; każdy czytelnik musi odczytać dane na czas, w przeciwnym razie je straci.
  • Zsynchronizowane kolejki nie mogą się przepełniać i mogą mieć tylko jednego czytnika.

Oba typy kolejek nie mogą mieć niedomiaru (odczyt z pustej kolejki zakończy się niepowodzeniem) i mogą mieć tylko jednego pisarza.

Niezsynchronizowany

Niezsynchronizowana kolejka ma tylko jednego autora, ale może mieć dowolną liczbę czytelników. W kolejce jest jedna pozycja zapisu; jednakże każdy czytnik śledzi swoją własną, niezależną pozycję odczytu.

Zapisy do kolejki zawsze kończą się sukcesem (nie są sprawdzane pod kątem przepełnienia), o ile nie są większe niż skonfigurowana pojemność kolejki (zapis większy niż pojemność kolejki natychmiast kończy się niepowodzeniem). Ponieważ każdy czytnik może mieć inną pozycję odczytu, zamiast czekać, aż każdy czytelnik przeczyta każdy fragment danych, dane mogą spaść z kolejki, gdy tylko nowe zapisy będą potrzebowały miejsca.

Czytelnicy są odpowiedzialni za pobranie danych, zanim spadną one na koniec kolejki. Odczyt próbujący odczytać więcej danych niż jest dostępne albo kończy się natychmiastowym niepowodzeniem (jeśli nie blokuje), albo czeka na dostępność wystarczającej ilości danych (jeśli blokuje). Odczyt próbujący odczytać więcej danych niż pojemność kolejki zawsze kończy się natychmiastowym niepowodzeniem.

Jeżeli czytnik nie nadąża za piszącym i ilość danych zapisanych i jeszcze nieodczytanych przez ten czytnik jest większa niż pojemność kolejki, następny odczyt nie zwróci danych; zamiast tego resetuje pozycję odczytu czytnika do równej ostatniej pozycji zapisu, a następnie zwraca błąd. Jeśli dane dostępne do odczytu zostaną sprawdzone po przepełnieniu, ale przed kolejnym odczytem, ​​wyświetli się więcej danych do odczytania niż pojemność kolejki, co oznacza, że ​​nastąpiło przepełnienie. (Jeśli kolejka przepełni się pomiędzy sprawdzeniem dostępnych danych a próbą ich odczytania, jedyną oznaką przepełnienia jest niepowodzenie odczytu.)

Czytelnicy niezsynchronizowanej kolejki prawdopodobnie nie chcą resetować wskaźników odczytu i zapisu kolejki. Zatem podczas tworzenia kolejki na podstawie deskryptorów czytelnicy powinni użyć argumentu „false” dla parametru „resetPointers”.

Zsynchronizowane

Zsynchronizowana kolejka ma jednego pisarza i jednego czytelnika z jedną pozycją zapisu i jedną pozycją odczytu. Nie da się zapisać większej ilości danych, niż mieści się w kolejce, ani odczytać większej ilości danych, niż aktualnie mieści się w kolejce. W zależności od tego, czy wywoływana jest blokująca czy nieblokująca funkcja zapisu lub odczytu, próby przekroczenia dostępnego miejsca lub danych albo natychmiast zwracają błąd, albo blokują do czasu zakończenia żądanej operacji. Próby odczytu lub zapisu większej ilości danych niż pojemność kolejki zawsze kończą się natychmiastowym niepowodzeniem.

Konfigurowanie FMQ

Kolejka komunikatów wymaga wielu obiektów MessageQueue : jednego do zapisu i jednego lub większej liczby do odczytu. Nie ma wyraźnej konfiguracji tego, który obiekt jest używany do zapisu lub odczytu; do użytkownika należy upewnienie się, że żaden obiekt nie jest używany zarówno do odczytu, jak i zapisu, że istnieje co najwyżej jeden moduł piszący, a w przypadku kolejek zsynchronizowanych – co najwyżej jeden czytnik.

Tworzenie pierwszego obiektu MessageQueue

Kolejka komunikatów jest tworzona i konfigurowana za pomocą jednego wywołania:

#include <fmq/MessageQueue.h>
using android::hardware::kSynchronizedReadWrite;
using android::hardware::kUnsynchronizedWrite;
using android::hardware::MQDescriptorSync;
using android::hardware::MQDescriptorUnsync;
using android::hardware::MessageQueue;
....
// For a synchronized non-blocking FMQ
mFmqSynchronized =
  new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite>
      (kNumElementsInQueue);
// For an unsynchronized FMQ that supports blocking
mFmqUnsynchronizedBlocking =
  new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
      (kNumElementsInQueue, true /* enable blocking operations */);
  • Inicjator MessageQueue<T, flavor>(numElements) tworzy i inicjuje obiekt obsługujący funkcjonalność kolejki komunikatów.
  • MessageQueue<T, flavor>(numElements, configureEventFlagWord) tworzy i inicjuje obiekt, który obsługuje funkcjonalność kolejki komunikatów z blokowaniem.
  • flavor może mieć wartość kSynchronizedReadWrite dla zsynchronizowanej kolejki lub kUnsynchronizedWrite dla niezsynchronizowanej kolejki.
  • uint16_t (w tym przykładzie) może być dowolnym typem zdefiniowanym w języku HIDL , który nie obejmuje zagnieżdżonych buforów (bez typów string lub vec ), uchwytów ani interfejsów.
  • kNumElementsInQueue wskazuje rozmiar kolejki w liczbie wpisów; określa rozmiar bufora pamięci współdzielonej, który zostanie przydzielony dla kolejki.

Tworzenie drugiego obiektu MessageQueue

Druga strona kolejki komunikatów tworzona jest przy użyciu obiektu MQDescriptor uzyskanego z pierwszej strony. Obiekt MQDescriptor jest wysyłany poprzez wywołanie HIDL lub AIDL RPC do procesu, który będzie przetrzymywał drugi koniec kolejki komunikatów. MQDescriptor zawiera informacje o kolejce, w tym:

  • Informacje do mapowania bufora i wskaźnika zapisu.
  • Informacje do mapowania wskaźnika odczytu (jeśli kolejka jest zsynchronizowana).
  • Informacje do mapowania słowa flagi zdarzenia (jeśli kolejka blokuje).
  • Typ obiektu ( <T, flavor> ), który obejmuje typ elementów kolejki zdefiniowany w języku HIDL oraz rodzaj kolejki (zsynchronizowany lub niezsynchronizowany).

Obiekt MQDescriptor można wykorzystać do skonstruowania obiektu MessageQueue :

MessageQueue<T, flavor>::MessageQueue(const MQDescriptor<T, flavor>& Desc, bool resetPointers)

Parametr resetPointers wskazuje, czy podczas tworzenia obiektu MessageQueue pozycje odczytu i zapisu mają zostać zresetowane do 0. W niezsynchronizowanej kolejce pozycja odczytu (która jest lokalna dla każdego obiektu MessageQueue w niezsynchronizowanych kolejkach) podczas tworzenia jest zawsze ustawiana na 0. Zazwyczaj MQDescriptor jest inicjowany podczas tworzenia pierwszego obiektu kolejki komunikatów. Aby uzyskać dodatkową kontrolę nad pamięcią współdzieloną, możesz ręcznie skonfigurować MQDescriptor ( MQDescriptor jest zdefiniowany w system/libhidl/base/include/hidl/MQDescriptor.h ), a następnie utworzyć każdy obiekt MessageQueue zgodnie z opisem w tej sekcji.

Blokowanie kolejek i flag zdarzeń

Domyślnie kolejki nie obsługują blokowania odczytu/zapisu. Istnieją dwa rodzaje blokowania połączeń odczytu/zapisu:

  • Krótka forma z trzema parametrami (wskaźnik danych, liczba elementów, limit czasu). Obsługuje blokowanie poszczególnych operacji odczytu/zapisu w pojedynczej kolejce. Podczas korzystania z tego formularza kolejka będzie wewnętrznie obsługiwać flagę zdarzenia i maski bitowe, a pierwszy obiekt kolejki komunikatów musi zostać zainicjowany drugim parametrem true . Na przykład:
    // For an unsynchronized FMQ that supports blocking
    mFmqUnsynchronizedBlocking =
      new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
          (kNumElementsInQueue, true /* enable blocking operations */);
    
  • Długa forma z sześcioma parametrami (zawiera flagę zdarzenia i maski bitowe). Obsługuje użycie współdzielonego obiektu EventFlag pomiędzy wieloma kolejkami i umożliwia określenie używanych masek bitowych powiadomień. W takim przypadku flaga zdarzenia i maski bitowe muszą być dostarczone do każdego wywołania odczytu i zapisu.

W przypadku długiej formy EventFlag można podać jawnie w każdym wywołaniu readBlocking() i writeBlocking() . Jedna z kolejek może zostać zainicjowana wewnętrzną flagą zdarzenia, która następnie musi zostać wyodrębniona z obiektów MessageQueue tej kolejki za pomocą getEventFlagWord() i użyta do utworzenia obiektów EventFlag w każdym procesie do wykorzystania z innymi FMQ. Alternatywnie obiekty EventFlag można inicjować przy użyciu dowolnej odpowiedniej pamięci współdzielonej.

Ogólnie rzecz biorąc, każda kolejka powinna używać tylko jednego z bloków nieblokujących, blokujących w formie krótkiej lub blokowania w formie długiej. Mieszanie ich nie jest błędem, ale aby uzyskać pożądany efekt, wymagane jest staranne zaprogramowanie.

Oznaczenie pamięci jako tylko do odczytu

Domyślnie pamięć współdzielona ma uprawnienia do odczytu i zapisu. W przypadku niezsynchronizowanych kolejek ( kUnsynchronizedWrite ) moduł piszący może chcieć usunąć uprawnienia do zapisu wszystkim czytelnikom, zanim przekaże obiekty MQDescriptorUnsync . Dzięki temu inne procesy nie będą mogły zapisywać w kolejce, co jest zalecane w celu ochrony przed błędami lub złym zachowaniem procesów czytnika. Jeśli autor chce, aby czytelnicy mogli zresetować kolejkę za każdym razem, gdy użyją MQDescriptorUnsync do utworzenia strony odczytu kolejki, wówczas pamięci nie można oznaczyć jako tylko do odczytu. Jest to domyślne zachowanie konstruktora `MessageQueue`. Jeśli więc w tej kolejce istnieją już użytkownicy, należy zmienić ich kod, aby skonstruować kolejkę z resetPointer=false .

  • Pisarz: wywołaj ashmem_set_prot_region z deskryptorem pliku MQDescriptor i regionem ustawionym na tylko do odczytu ( PROT_READ ):
    int res = ashmem_set_prot_region(mqDesc->handle->data[0], PROT_READ)
  • Czytelnik: utwórz kolejkę wiadomości z resetPointer=false (wartość domyślna to true ):
    mFmq = new (std::nothrow) MessageQueue(mqDesc, false);

Korzystanie z kolejki wiadomości

Publiczny interfejs API obiektu MessageQueue to:

size_t availableToWrite()  // Space available (number of elements).
size_t availableToRead()  // Number of elements available.
size_t getQuantumSize()  // Size of type T in bytes.
size_t getQuantumCount() // Number of items of type T that fit in the FMQ.
bool isValid() // Whether the FMQ is configured correctly.
const MQDescriptor<T, flavor>* getDesc()  // Return info to send to other process.

bool write(const T* data)  // Write one T to FMQ; true if successful.
bool write(const T* data, size_t count) // Write count T's; no partial writes.

bool read(T* data);  // read one T from FMQ; true if successful.
bool read(T* data, size_t count);  // Read count T's; no partial reads.

bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0);
bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);

// Allows multiple queues to share a single event flag word
std::atomic<uint32_t>* getEventFlagWord();

bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr); // Blocking write operation for count Ts.

bool readBlocking(T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr) // Blocking read operation for count Ts;

//APIs to allow zero copy read/write operations
bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);
bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);

availableToWrite() i availableToRead() można wykorzystać do określenia, ile danych można przesłać w jednej operacji. W niezsynchronizowanej kolejce:

  • availableToWrite() zawsze zwraca pojemność kolejki.
  • Każdy czytelnik ma swoją własną pozycję odczytu i wykonuje własne obliczenia dla availableToRead() .
  • Z punktu widzenia powolnego czytelnika kolejka może się przepełnić; może to spowodować, że availableToRead() zwróci wartość większą niż rozmiar kolejki. Pierwszy odczyt po przepełnieniu zakończy się niepowodzeniem i spowoduje, że pozycja odczytu dla tego czytnika będzie równa bieżącemu wskaźnikowi zapisu, niezależnie od tego, czy przepełnienie zostało zgłoszone za pomocą availableToRead() .

Metody read() i write() zwracają true , jeśli wszystkie żądane dane mogły zostać (i zostały) przesłane do/z kolejki. Metody te nie blokują; albo powiedzie się to (i zwróci true ), albo natychmiast zwróci błąd ( false ).

Metody readBlocking() i writeBlocking() czekają, aż żądana operacja będzie mogła zostać ukończona lub upłynie limit czasu (wartość timeOutNanos równa 0 oznacza, że ​​nigdy nie nastąpi przekroczenie limitu czasu).

Operacje blokujące są realizowane przy użyciu słowa flagowego zdarzenia. Domyślnie każda kolejka tworzy i używa własnego słowa flagowego do obsługi krótkiej formy readBlocking() i writeBlocking() . Możliwe jest, że wiele kolejek współdzieli jedno słowo, dzięki czemu proces może czekać na zapisy lub odczyty w dowolnej kolejce. Wskaźnik do słowa flagi zdarzenia kolejki można uzyskać wywołując funkcję getEventFlagWord() , a wskaźnik ten (lub dowolny wskaźnik do odpowiedniego miejsca w pamięci współdzielonej) może zostać użyty do utworzenia obiektu EventFlag , który będzie przekazywany do długiej formy funkcji readBlocking() i writeBlocking() dla innej kolejki. Parametry readNotification i writeNotification mówią, które bity flagi zdarzenia powinny być użyte do sygnalizowania odczytów i zapisów w tej kolejce. readNotification i writeNotification są 32-bitowymi maskami bitowymi.

readBlocking() czeka na bity writeNotification ; jeśli ten parametr wynosi 0, wywołanie zawsze kończy się niepowodzeniem. Jeśli wartość readNotification wynosi 0, wywołanie nie zakończy się niepowodzeniem, ale pomyślny odczyt nie spowoduje ustawienia żadnych bitów powiadomienia. W zsynchronizowanej kolejce oznaczałoby to, że odpowiednie wywołanie funkcji writeBlocking() nigdy się nie obudzi, chyba że bit zostanie ustawiony gdzie indziej. W niezsynchronizowanej kolejce writeBlocking() nie będzie czekać (nadal powinna być używana do ustawiania bitu powiadomienia o zapisie), a w przypadku odczytów właściwe jest, aby nie ustawiać żadnych bitów powiadomienia. Podobnie writeblocking() nie powiedzie się, jeśli readNotification ma wartość 0, a pomyślny zapis ustawia określone bity writeNotification .

Aby czekać w wielu kolejkach jednocześnie, użyj metody wait() obiektu EventFlag w celu oczekiwania na maskę bitową powiadomień. Metoda wait() zwraca słowo statusowe z bitami, które spowodowały ustawienie wybudzenia. Informacje te są następnie wykorzystywane do sprawdzenia, czy w odpowiedniej kolejce jest wystarczająco dużo miejsca lub danych do żądanej operacji zapisu/odczytu i wykonania nieblokującego write() / read() . Aby otrzymać powiadomienie po operacji, użyj innego wywołania metody wake() EventFlag . Aby zapoznać się z definicją abstrakcji EventFlag , zobacz system/libfmq/include/fmq/EventFlag.h .

Zero operacji kopiowania

Funkcje API read / write / readBlocking / writeBlocking() przyjmują wskaźnik do bufora wejścia/wyjścia jako argument i używają wewnętrznie wywołań memcpy() do kopiowania danych pomiędzy tym samym buforem a buforem pierścieniowym FMQ. Aby poprawić wydajność, Android 8.0 i nowsze wersje zawierają zestaw interfejsów API, które zapewniają bezpośredni dostęp wskaźnika do bufora pierścieniowego, eliminując potrzebę używania wywołań memcpy .

Użyj następujących publicznych interfejsów API do operacji FMQ z zerową kopią:

bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);

bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);
  • Metoda beginWrite udostępnia wskaźniki bazowe do bufora pierścieniowego FMQ. Po zapisaniu danych zatwierdź je za pomocą funkcji commitWrite() . Metody beginRead / commitRead działają w ten sam sposób.
  • Metody beginRead / Write przyjmują jako dane wejściowe liczbę komunikatów do odczytania/zapisu i zwracają wartość logiczną wskazującą, czy odczyt/zapis jest możliwy. Jeśli możliwy jest odczyt lub zapis, struktura memTx jest wypełniana wskaźnikami bazowymi, których można używać do bezpośredniego dostępu wskaźników do pamięci współdzielonej bufora pierścieniowego.
  • Struktura MemRegion zawiera szczegółowe informacje na temat bloku pamięci, w tym wskaźnik bazowy (adres bazowy bloku pamięci) i długość wyrażoną w T (długość bloku pamięci wyrażoną w kategoriach typu kolejki komunikatów zdefiniowanego w języku HIDL).
  • Struktura MemTransaction zawiera dwie struktury MemRegion , first i second , ponieważ odczyt lub zapis w buforze pierścieniowym może wymagać zawinięcia na początek kolejki. Oznaczałoby to, że do odczytu/zapisu danych w buforze pierścieniowym FMQ potrzebne są dwa wskaźniki bazowe.

Aby uzyskać adres podstawowy i długość ze struktury MemRegion :

T* getAddress(); // gets the base address
size_t getLength(); // gets the length of the memory region in terms of T
size_t getLengthInBytes(); // gets the length of the memory region in bytes

Aby uzyskać odniesienia do pierwszego i drugiego MemRegion w obiekcie MemTransaction :

const MemRegion& getFirstRegion(); // get a reference to the first MemRegion
const MemRegion& getSecondRegion(); // get a reference to the second MemRegion

Przykładowy zapis do FMQ przy użyciu interfejsów API z zerową kopią:

MessageQueueSync::MemTransaction tx;
if (mQueue->beginRead(dataLen, &tx)) {
    auto first = tx.getFirstRegion();
    auto second = tx.getSecondRegion();

    foo(first.getAddress(), first.getLength()); // method that performs the data write
    foo(second.getAddress(), second.getLength()); // method that performs the data write

    if(commitWrite(dataLen) == false) {
       // report error
    }
} else {
   // report error
}

Następujące metody pomocnicze są również częścią MemTransaction :

  • T* getSlot(size_t idx);
    Zwraca wskaźnik do slotu idx w obrębie MemRegions , które są częścią tego obiektu MemTransaction . Jeśli obiekt MemTransaction reprezentuje obszary pamięci do odczytu/zapisu N elementów typu T, wówczas prawidłowy zakres idx mieści się w przedziale od 0 do N-1.
  • bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1);
    Zapisz elementy nMessages typu T w obszarach pamięci opisanych przez obiekt, zaczynając od indeksu startIdx . Ta metoda wykorzystuje memcpy() i nie jest przeznaczona do stosowania w przypadku operacji kopiowania zerowego. Jeśli obiekt MemTransaction reprezentuje pamięć do odczytu/zapisu N elementów typu T, wówczas prawidłowy zakres idx mieści się w przedziale od 0 do N-1.
  • bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1);
    Metoda pomocnicza do odczytywania elementów nMessages typu T z obszarów pamięci opisanych przez obiekt, zaczynając od startIdx . Ta metoda wykorzystuje memcpy() i nie jest przeznaczona do stosowania w przypadku operacji kopiowania zerowego.

Wysyłanie kolejki przez HIDL

Po stronie tworzenia:

  1. Utwórz obiekt kolejki komunikatów zgodnie z powyższym opisem.
  2. Sprawdź, czy obiekt jest prawidłowy za pomocą isValid() .
  3. Jeśli będziesz czekać w wielu kolejkach, przekazując EventFlag do długiej formy readBlocking() / writeBlocking() , możesz wyodrębnić wskaźnik flagi zdarzenia (używając getEventFlagWord() ) z obiektu MessageQueue , który został zainicjowany w celu utworzenia flagi, i użyj tej flagi, aby utworzyć niezbędny obiekt EventFlag .
  4. Aby uzyskać obiekt deskryptora, użyj metody MessageQueue getDesc() .
  5. W pliku .hal nadaj metodzie parametr typu fmq_sync lub fmq_unsync gdzie T jest odpowiednim typem zdefiniowanym w HIDL. Użyj tej opcji, aby wysłać obiekt zwrócony przez getDesc() do procesu odbierającego.

Po stronie odbiorcy:

  1. Użyj obiektu deskryptora, aby utworzyć obiekt MessageQueue . Pamiętaj, aby użyć tego samego rodzaju kolejki i typu danych, w przeciwnym razie szablon nie zostanie skompilowany.
  2. Jeśli wyodrębniłeś flagę zdarzenia, wyodrębnij flagę z odpowiedniego obiektu MessageQueue w procesie odbierającym.
  3. Do przesyłania danych użyj obiektu MessageQueue .