File d'attente de messages rapide (FMQ)

L'infrastructure d'appel de procédure à distance (RPC) de HIDL utilise des mécanismes de liaison, ce qui signifie que les appels impliquent des frais généraux, nécessitent des opérations de noyau et peuvent déclencher une action de planification. Toutefois, dans les cas où les données doivent être transférées entre des processus avec moins de frais généraux et sans implication du noyau, le système de file d'attente de messages rapide (FMQ) est utilisé.

FMQ crée des files d'attente de messages avec les propriétés souhaitées. Vous pouvez envoyer un objet MQDescriptorSync ou MQDescriptorUnsync via un appel RPC HIDL. Le processus destinataire utilise cet objet pour accéder à la file d'attente de messages.

Types de files d'attente

Android prend en charge deux types de files d'attente (appelés saveurs):

  • Les files d'attente non synchronisées peuvent déborder et peuvent avoir de nombreux lecteurs. Chaque lecteur doit lire les données à temps ou les perdre.
  • Les files d'attente synchronisées ne sont pas autorisées à déborder et ne peuvent avoir qu'un seul lecteur.

Les deux types de files d'attente ne sont pas autorisés à sous-déborder (la lecture à partir d'une file d'attente vide échoue) et ne peuvent avoir qu'un seul éditeur.

Files d'attente non synchronisés

Une file d'attente non synchronisée n'a qu'un seul éditeur, mais peut avoir un nombre illimité de lecteurs. Il existe une seule position d'écriture pour la file d'attente. Toutefois, chaque lecteur suit sa propre position de lecture indépendante.

Les écritures dans la file d'attente réussissent toujours (aucun contrôle d'inondation n'est effectué) tant qu'elles ne dépassent pas la capacité de la file d'attente configurée (les écritures supérieures à la capacité de la file d'attente échouent immédiatement). Étant donné que chaque lecteur peut avoir une position de lecture différente, au lieu d'attendre que chaque lecteur lise chaque élément de données, les données sont supprimées de la file d'attente chaque fois que de nouvelles écritures ont besoin de l'espace.

Les lecteurs sont chargés de récupérer les données avant qu'elles ne quittent la fin de la file d'attente. Une lecture qui tente de lire plus de données que celles disponibles échoue immédiatement (si elle n'est pas bloquante) ou attend que suffisamment de données soient disponibles (si elle est bloquante). Une lecture qui tente de lire plus de données que la capacité de la file d'attente échoue toujours immédiatement.

Si un lecteur ne parvient pas à suivre l'écrivain, de sorte que la quantité de données écrites et non encore lues par ce lecteur dépasse la capacité de la file d'attente, la lecture suivante ne renvoie pas de données. Au lieu de cela, elle réinitialise la position de lecture du lecteur sur la position d'écriture plus la moitié de la capacité, puis renvoie une erreur. La moitié de la mémoire tampon est ainsi disponible pour la lecture et de l'espace est réservé pour les nouvelles écritures afin d'éviter de remplir à nouveau immédiatement la file d'attente. Si les données disponibles à lire sont vérifiées après un débordement, mais avant la lecture suivante, elles indiquent plus de données disponibles à lire que la capacité de la file d'attente, ce qui signifie qu'un débordement s'est produit. (Si la file d'attente déborde entre la vérification des données disponibles et la tentative de lecture de ces données, la seule indication de débordement est que la lecture échoue.)

Files d'attente synchronisés

Une file d'attente synchronisée comporte un seul écrivain et un seul lecteur, avec une seule position d'écriture et une seule position de lecture. Il est impossible d'écrire plus de données que la file d'attente ne peut en contenir ou de lire plus de données que la file d'attente ne contient actuellement. Selon que la fonction d'écriture ou de lecture bloquante ou non bloquante est appelée, les tentatives de dépassement de l'espace ou des données disponibles renvoient une erreur immédiatement ou sont bloquées jusqu'à ce que l'opération souhaitée puisse être effectuée. Les tentatives de lecture ou d'écriture de données supérieures à la capacité de la file d'attente échouent toujours immédiatement.

Configurer un FMQ

Une file d'attente de messages nécessite plusieurs objets MessageQueue: un sur lequel écrire et un ou plusieurs à partir desquels lire. Il n'y a pas de configuration explicite de l'objet utilisé pour l'écriture ou la lecture. L'utilisateur est responsable de s'assurer qu'aucun objet n'est utilisé à la fois pour la lecture et l'écriture, qu'il n'y a qu'un seul écrivain et, pour les files d'attente synchronisées, qu'il n'y a qu'un seul lecteur.

Créer le premier objet MessageQueue

Une file d'attente de messages est créée et configurée avec un seul appel:

#include <fmq/MessageQueue.h>
using android::hardware::kSynchronizedReadWrite;
using android::hardware::kUnsynchronizedWrite;
using android::hardware::MQDescriptorSync;
using android::hardware::MQDescriptorUnsync;
using android::hardware::MessageQueue;
....
// For a synchronized nonblocking FMQ
mFmqSynchronized =
  new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite>
      (kNumElementsInQueue);
// For an unsynchronized FMQ that supports blocking
mFmqUnsynchronizedBlocking =
  new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
      (kNumElementsInQueue, true /* enable blocking operations */);
  • L'initialiseur MessageQueue<T, flavor>(numElements) crée et initialise un objet compatible avec la fonctionnalité de file d'attente de messages.
  • L'initialiseur MessageQueue<T, flavor>(numElements, configureEventFlagWord) crée et initialise un objet compatible avec la fonctionnalité de file d'attente de messages avec blocage.
  • flavor peut être kSynchronizedReadWrite pour une file d'attente synchronisée ou kUnsynchronizedWrite pour une file d'attente non synchronisée.
  • uint16_t (dans cet exemple) peut être n'importe quel type défini par HIDL qui n'implique pas de tampons imbriqués (pas de types string ou vec), de poignées ou d'interfaces.
  • kNumElementsInQueue indique la taille de la file d'attente en nombre d'entrées. Il détermine la taille de la mémoire tampon partagée allouée à la file d'attente.

Créer le deuxième objet MessageQueue

La deuxième partie de la file de messages est créée à l'aide d'un objet MQDescriptor obtenu à partir de la première partie. L'objet MQDescriptor est envoyé via un appel RPC HIDL ou AIDL au processus qui détient l'autre extrémité de la file d'attente de messages. MQDescriptor contient des informations sur la file d'attente, y compris les suivantes:

  • Informations permettant de mapper le tampon et le pointeur d'écriture.
  • Informations permettant de mapper le pointeur de lecture (si la file d'attente est synchronisée).
  • Informations permettant de mapper le mot d'indicateur d'événement (si la file d'attente est bloquante).
  • Type d'objet (<T, flavor>), qui inclut le type défini par HIDL des éléments de file d'attente et la saveur de la file d'attente (synchronisée ou non).

Vous pouvez utiliser l'objet MQDescriptor pour créer un objet MessageQueue:

MessageQueue<T, flavor>::MessageQueue(const MQDescriptor<T, flavor>& Desc, bool resetPointers)

Le paramètre resetPointers indique si les positions de lecture et d'écriture doivent être réinitialisées sur 0 lors de la création de cet objet MessageQueue. Dans une file d'attente non synchronisée, la position de lecture (qui est locale pour chaque objet MessageQueue dans les files d'attente non synchronisées) est toujours définie sur 0 lors de la création. En règle générale, MQDescriptor est initialisé lors de la création du premier objet de file d'attente de messages. Pour un contrôle supplémentaire sur la mémoire partagée, vous pouvez configurer MQDescriptor manuellement (MQDescriptor est défini dans system/libhidl/base/include/hidl/MQDescriptor.h), puis créer chaque objet MessageQueue comme décrit dans cette section.

Files d'attente de blocage et indicateurs d'événement

Par défaut, les files d'attente ne prennent pas en charge le blocage des lectures et des écritures. Il existe deux types d'appels de lecture et d'écriture bloquants:

  • La forme courte, avec trois paramètres (pointeur de données, nombre d'éléments, délai avant expiration), prend en charge le blocage sur des opérations de lecture et d'écriture individuelles sur une seule file d'attente. Lorsque vous utilisez ce formulaire, la file d'attente gère le flag d'événement et les masques de bits en interne, et le premier objet de file d'attente de messages doit être initialisé avec un deuxième paramètre de true. Exemple :
    // For an unsynchronized FMQ that supports blocking
    mFmqUnsynchronizedBlocking =
      new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
          (kNumElementsInQueue, true /* enable blocking operations */);
    
  • La forme longue, avec six paramètres (y compris le flag d'événement et les masques de bits), permet d'utiliser un objet EventFlag partagé entre plusieurs files d'attente et de spécifier les masques de bits de notification à utiliser. Dans ce cas, le flag d'événement et les masques de bits doivent être fournis à chaque appel de lecture et d'écriture.

Pour la forme longue, vous pouvez fournir explicitement EventFlag dans chaque appel readBlocking() et writeBlocking(). Vous pouvez initialiser l'une des files d'attente avec un indicateur d'événement interne, qui doit ensuite être extrait des objets MessageQueue de cette file d'attente à l'aide de getEventFlagWord() et utilisé pour créer des objets EventFlag dans chaque processus à utiliser avec d'autres files de messages de file d'attente. Vous pouvez également initialiser les objets EventFlag avec n'importe quelle mémoire partagée appropriée.

En règle générale, chaque file d'attente ne doit utiliser qu'un seul type de blocage : non bloquant, court ou long. Il n'est pas une erreur de les mélanger, mais une programmation minutieuse est nécessaire pour obtenir le résultat souhaité.

Marquer la mémoire en lecture seule

Par défaut, la mémoire partagée dispose d'autorisations de lecture et d'écriture. Pour les files d'attente non synchronisées (kUnsynchronizedWrite), l'écrivain peut vouloir supprimer les autorisations d'écriture pour tous les lecteurs avant de distribuer les objets MQDescriptorUnsync. Cela garantit que les autres processus ne peuvent pas écrire dans la file d'attente, ce qui est recommandé pour se protéger contre les bugs ou les mauvais comportements dans les processus de lecture. Si l'écrivain souhaite que les lecteurs puissent réinitialiser la file d'attente chaque fois qu'ils utilisent MQDescriptorUnsync pour créer le côté lecture de la file d'attente, la mémoire ne peut pas être marquée en lecture seule. Il s'agit du comportement par défaut du constructeur MessageQueue. Par conséquent, si des utilisateurs utilisent déjà cette file d'attente, leur code doit être modifié pour créer la file d'attente avec resetPointer=false.

  • Écrivain: appelez ashmem_set_prot_region avec un descripteur de fichier MQDescriptor et une région définie en lecture seule (PROT_READ):
    int res = ashmem_set_prot_region(mqDesc->handle->data[0], PROT_READ)
  • Lecteur: créez une file d'attente de messages avec resetPointer=false (la valeur par défaut est true):
    mFmq = new (std::nothrow) MessageQueue(mqDesc, false);

Utiliser MessageQueue

L'API publique de l'objet MessageQueue est la suivante:

size_t availableToWrite() // Space available (number of elements).
size_t availableToRead() // Number of elements available.
size_t getQuantumSize() // Size of type T in bytes.
size_t getQuantumCount() // Number of items of type T that fit in the FMQ.
bool isValid() // Whether the FMQ is configured correctly.
const MQDescriptor<T, flavor>* getDesc() // Return info to send to other process.

bool write(const T* data)  // Write one T to FMQ; true if successful.
bool write(const T* data, size_t count) // Write count T's; no partial writes.

bool read(T* data); // read one T from FMQ; true if successful.
bool read(T* data, size_t count); // Read count T's; no partial reads.

bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0);
bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);

// Allows multiple queues to share a single event flag word
std::atomic<uint32_t>* getEventFlagWord();

bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr); // Blocking write operation for count Ts.

bool readBlocking(T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr) // Blocking read operation for count Ts;

// APIs to allow zero copy read/write operations
bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);
bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);

Vous pouvez utiliser availableToWrite() et availableToRead() pour déterminer la quantité de données pouvant être transférées en une seule opération. Dans une file d'attente non synchronisée:

  • availableToWrite() renvoie toujours la capacité de la file d'attente.
  • Chaque lecteur a sa propre position de lecture et effectue son propre calcul pour availableToRead().
  • Du point de vue d'un lecteur lent, la file d'attente peut déborder. availableToRead() peut donc renvoyer une valeur supérieure à la taille de la file d'attente. La première lecture après un débordement échoue et la position de lecture de ce lecteur est définie comme étant égale au pointeur d'écriture actuel, que le débordement ait été signalé via availableToRead() ou non.

Les méthodes read() et write() renvoient true si toutes les données demandées ont pu être transférées vers et depuis la file d'attente. Ces méthodes ne bloquent pas. Elles réussissent (et renvoient true) ou renvoient immédiatement un échec (false).

Les méthodes readBlocking() et writeBlocking() attendent que l'opération demandée puisse être effectuée ou jusqu'à ce qu'elles expirent (une valeur timeOutNanos de 0 signifie qu'elles n'expirent jamais).

Les opérations de blocage sont implémentées à l'aide d'un mot d'indicateur d'événement. Par défaut, chaque file d'attente crée et utilise son propre mot indicateur pour prendre en charge la forme courte de readBlocking() et writeBlocking(). Plusieurs files d'attente peuvent partager un seul mot, de sorte qu'un processus puisse attendre des écritures ou des lectures dans l'une des files d'attente. En appelant getEventFlagWord(), vous pouvez obtenir un pointeur vers le mot d'indicateur d'événement d'une file d'attente, et vous pouvez utiliser ce pointeur (ou tout pointeur vers un emplacement de mémoire partagée approprié) pour créer un objet EventFlag à transmettre dans la forme longue de readBlocking() et writeBlocking() pour une autre file d'attente. Les paramètres readNotification et writeNotification indiquent les bits du drapeau d'événement à utiliser pour signaler les lectures et les écritures sur cette file d'attente. readNotification et writeNotification sont des masques de bits 32 bits.

readBlocking() attend les bits writeNotification. Si ce paramètre est égal à 0, l'appel échoue toujours. Si la valeur readNotification est 0, l'appel ne échoue pas, mais une lecture réussie ne définit aucun bit de notification. Dans une file d'attente synchronisée, cela signifie que l'appel writeBlocking() correspondant ne se réveille jamais, sauf si le bit est défini ailleurs. Dans une file d'attente non synchronisée, writeBlocking() n'attend pas (il doit toujours être utilisé pour définir le bit de notification d'écriture), et il est approprié que les lectures ne définissent aucun bit de notification. De même, writeblocking() échoue si readNotification est défini sur 0, et une écriture réussie définit les bits writeNotification spécifiés.

Pour attendre plusieurs files d'attente à la fois, utilisez la méthode wait() d'un objet EventFlag pour attendre un masque de bits de notifications. La méthode wait() renvoie un mot d'état avec les bits qui ont déclenché l'ensemble de réveil. Ces informations sont ensuite utilisées pour vérifier que la file d'attente correspondante dispose d'assez d'espace ou de données pour l'opération d'écriture et de lecture souhaitée, et pour effectuer une write() et une read() non bloquantes. Pour obtenir une notification post-opération, utilisez un autre appel à la méthode wake() de l'objet EventFlag. Pour obtenir une définition de l'abstraction EventFlag, consultez system/libfmq/include/fmq/EventFlag.h.

Opérations de copie sans copie

Les méthodes read, write, readBlocking et writeBlocking() prennent un pointeur vers un tampon d'entrée-sortie comme argument et utilisent des appels memcpy() en interne pour copier des données entre le même et le tampon circulaire FMQ. Pour améliorer les performances, Android 8.0 et les versions ultérieures incluent un ensemble d'API qui fournissent un accès direct au pointeur dans le tampon circulaire, ce qui élimine le besoin d'utiliser des appels memcpy.

Utilisez les API publiques suivantes pour les opérations FMQ sans copie:

bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);

bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);
  • La méthode beginWrite fournit des pointeurs de base dans le tampon de anneau FMQ. Une fois les données écrites, validez-les à l'aide de commitWrite(). Les méthodes beginRead et commitRead fonctionnent de la même manière.
  • Les méthodes beginRead et Write prennent en entrée le nombre de messages à lire et à écrire, et renvoient une valeur booléenne indiquant si la lecture ou l'écriture est possible. Si la lecture ou l'écriture est possible, la structure memTx est remplie de pointeurs de base qui peuvent être utilisés pour un accès direct au pointeur dans la mémoire partagée du tampon circulaire.
  • La structure MemRegion contient des informations sur un bloc de mémoire, y compris le pointeur de base (adresse de base du bloc de mémoire) et la longueur en termes de T (longueur du bloc de mémoire en termes du type de la file de messages défini par HIDL).
  • La structure MemTransaction contient deux structures MemRegion, first et second, car une lecture ou une écriture dans le tampon circulaire peut nécessiter un retour à la fin de la file d'attente. Cela signifie que deux pointeurs de base sont nécessaires pour lire et écrire des données dans le tampon de anneau FMQ.

Pour obtenir l'adresse de base et la longueur d'une struct MemRegion:

T* getAddress(); // gets the base address
size_t getLength(); // gets the length of the memory region in terms of T
size_t getLengthInBytes(); // gets the length of the memory region in bytes

Pour obtenir des références aux premières et deuxièmes structures MemRegion dans un objet MemTransaction:

const MemRegion& getFirstRegion(); // get a reference to the first MemRegion
const MemRegion& getSecondRegion(); // get a reference to the second MemRegion

Exemple d'écriture dans la file de messages de file d'attente à l'aide d'API sans copie:

MessageQueueSync::MemTransaction tx;
if (mQueue->beginRead(dataLen, &tx)) {
    auto first = tx.getFirstRegion();
    auto second = tx.getSecondRegion();

    foo(first.getAddress(), first.getLength()); // method that performs the data write
    foo(second.getAddress(), second.getLength()); // method that performs the data write

    if(commitWrite(dataLen) == false) {
       // report error
    }
} else {
   // report error
}

Les méthodes d'assistance suivantes font également partie de MemTransaction:

  • T* getSlot(size_t idx); renvoie un pointeur vers l'emplacement idx dans les MemRegions qui font partie de cet objet MemTransaction. Si l'objet MemTransaction représente les régions de mémoire à lire et à écrire N éléments de type T, la plage valide de idx est comprise entre 0 et N-1.
  • bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1); écrit des éléments nMessages de type T dans les régions de mémoire décrites par l'objet, à partir de l'index startIdx. Cette méthode utilise memcpy() et n'est pas destinée à être utilisée pour une opération sans copie. Si l'objet MemTransaction représente une mémoire pour lire et écrire N éléments de type T, la plage valide de idx se situe entre 0 et N-1.
  • bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1); est une méthode d'assistance permettant de lire des éléments nMessages de type T à partir des régions de mémoire décrites par l'objet à partir de startIdx. Cette méthode utilise memcpy() et n'est pas destinée à être utilisée pour une opération sans copie.

Envoyer la file d'attente via HIDL

Côté création:

  1. Créez un objet de file d'attente de messages comme décrit ci-dessus.
  2. Vérifiez que l'objet est valide avec isValid().
  3. Si vous attendez sur plusieurs files d'attente en transmettant EventFlag dans la forme longue de readBlocking() ou writeBlocking(), vous pouvez extraire le pointeur d'indicateur d'événement (à l'aide de getEventFlagWord()) à partir d'un objet MessageQueue qui a été initialisé pour créer l'indicateur, puis utiliser cet indicateur pour créer l'objet EventFlag nécessaire.
  4. Utilisez la méthode getDesc() de MessageQueue pour obtenir un objet de descripteur.
  5. Dans le fichier HAL, attribuez à la méthode un paramètre de type fmq_sync ou fmq_unsync, où T est un type défini par HIDL approprié. Utilisez-le pour envoyer l'objet renvoyé par getDesc() au processus destinataire.

Côté destinataire:

  1. Utilisez l'objet descripteur pour créer un objet MessageQueue. Utilisez le même type de données et la même saveur de file d'attente, sinon la compilation du modèle échouera.
  2. Si vous avez extrait un indicateur d'événement, extrayez-le de l'objet MessageQueue correspondant dans le processus de réception.
  3. Utilisez l'objet MessageQueue pour transférer des données.