תור הודעות מהיר (FMQ)

התשתית של הקריאה לשירות מרוחק (RPC) של HIDL משתמשת במנגנוני קישור (binder), כלומר הקריאות כוללות תקורה, דורשות פעולות ליבה ויכולות להפעיל פעולה של מתזמן. עם זאת, במקרים שבהם צריך להעביר נתונים בין תהליכים עם פחות תקורה וללא מעורבות של הליבה, משתמשים במערכת Fast Message Queue‏ (FMQ).

FMQ יוצר תורים של הודעות עם המאפיינים הרצויים. אפשר לשלוח אובייקט MQDescriptorSync או MQDescriptorUnsync באמצעות קריאה ל-RPC של HIDL, והתהליך המקבל משתמש באובייקט כדי לגשת לתור ההודעות.

סוגי תורים

ב-Android יש תמיכה בשני סוגים של תורים (שנקראים טעמים):

  • בתורים לא מסונכרנים מותר לחרוג מהנפח, ויכולים להיות בהם הרבה קוראים. כל קורא צריך לקרוא את הנתונים בזמן, אחרת הם יאבדו.
  • אסור שתור סונכרן יעלה על המכסה, ויכול להיות לו רק קורא אחד.

בשני סוגי התורים אסור להגיע למצב של זרימה נמוכה מדי (קריאה מתור ריק נכשלת), ויכול להיות רק כותב אחד.

תורים לא מסונכרנים

לתור לא מסונכרן יש רק כותב אחד, אבל יכול להיות בו מספר בלתי מוגבל של קוראים. יש מיקום כתיבה אחד בתור, אבל כל קורא עוקב אחרי מיקום הקריאה שלו בנפרד.

פעולות כתיבה לתור תמיד מצליחות (לא מתבצעת בדיקה לחריגה ממלאי) כל עוד הן לא גדולות מיכולת האחסון שהוגדרה לתור (פעולות כתיבה גדולות מיכולת האחסון של התור נכשלות באופן מיידי). מכיוון שלכל קורא יכולה להיות מיקום קריאה שונה, במקום להמתין לכל קורא כדי שיקריא כל פיסת נתונים, הנתונים יוצאים מהתור בכל פעם שכתיבה חדשה זקוקה למרחב.

הקוראים אחראים לאחזר את הנתונים לפני שהם נופלים מהקצה של התור. קריאה שמנסה לקרוא יותר נתונים ממה שזמין תיכשל באופן מיידי (אם היא לא חוסמת) או תמתין עד שיהיו זמינים מספיק נתונים (אם היא חוסמת). קריאה שמנסה לקרוא יותר נתונים מהקיבולת של התור תמיד תיכשל באופן מיידי.

אם הקורא לא מצליח לעמוד בקצב של הסופר, כך שכמות הנתונים שנכתבו ועדיין לא נקראו על ידו חורגת מיכולת האחסון של התור, הקריאה הבאה לא מחזירה נתונים. במקום זאת, מיקום הקריאה של הקורא מתאפס למיקום הכתיבה בתוספת מחצית מהקיבולת, ולאחר מכן מוחזר סטטוס כשל. כך מחצית מהמאגר זמינה לקריאה, ומשמרים מקום לכתיבות חדשות כדי למנוע הצפה מיידית של התור שוב. אם הנתונים שזמינים לקריאה נבדקים אחרי חריגה ממלאי אבל לפני הקריאה הבאה, יוצגו יותר נתונים שזמינים לקריאה מאשר קיבולת התור, ותהיה אינדיקציה לכך שהתרחשה חריגה ממלאי. (אם התור יתמלא בין בדיקת הנתונים הזמינים לבין הניסיון לקרוא את הנתונים האלה, הקריאה תיכשל ותהיה זו הדרך היחידה לדעת שהתור מלא).

תורים מסונכרנים

בתור מסונכרן יש כותב אחד וקורא אחד, עם מיקום כתיבה אחד ומיקום קריאה אחד. אי אפשר לכתוב יותר נתונים ממה שיש בתור, או לקרוא יותר נתונים ממה שיש בתור כרגע. בהתאם לקריאה לפונקציית הכתיבה או הקריאה החוסמת או הלא חוסמת, ניסיונות לחרוג מהמקום או מהנתונים הזמינים יחזירו כשל באופן מיידי או ייחסמו עד שאפשר יהיה להשלים את הפעולה הרצויה. ניסיונות לקרוא או לכתוב יותר נתונים מאשר קיבולת התור תמיד נכשלים באופן מיידי.

הגדרת FMQ

כדי ליצור תור הודעות נדרשים כמה אובייקטים של MessageQueue: אחד לכתיבה ואחד או יותר לקריאה. אין הגדרה מפורשת של האובייקט שמשמש לכתיבה או לקריאה. המשתמש אחראי לוודא שאף אובייקט לא משמש גם לקריאה וגם לכתיבה, שיש לפחות כותב אחד, ובתורות מסונכרנות, שיש לפחות קורא אחד.

יצירת האובייקט MessageQueue הראשון

אפשר ליצור ולקבוע את ההגדרות של תור הודעות באמצעות קריאה אחת:

#include <fmq/MessageQueue.h>
using android::hardware::kSynchronizedReadWrite;
using android::hardware::kUnsynchronizedWrite;
using android::hardware::MQDescriptorSync;
using android::hardware::MQDescriptorUnsync;
using android::hardware::MessageQueue;
....
// For a synchronized nonblocking FMQ
mFmqSynchronized =
  new (std::nothrow) MessageQueue<uint16_t, kSynchronizedReadWrite>
      (kNumElementsInQueue);
// For an unsynchronized FMQ that supports blocking
mFmqUnsynchronizedBlocking =
  new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
      (kNumElementsInQueue, true /* enable blocking operations */);
  • ה-initializer של MessageQueue<T, flavor>(numElements) יוצר ומפעיל אובייקט שתומך בפונקציונליות של תור ההודעות.
  • ה-initializer של MessageQueue<T, flavor>(numElements, configureEventFlagWord) יוצר ומפעיל אובייקט שתומך בפונקציונליות של תור ההודעות עם חסימה.
  • הערך של flavor יכול להיות kSynchronizedReadWrite עבור תור מסונכרן או kUnsynchronizedWrite עבור תור לא מסונכרן.
  • uint16_t (בדוגמה הזו) יכול להיות כל סוג שהוגדר ב-HIDL שלא כולל מאגרים בתצוגת עץ (אין סוגי string או vec), אחזקים או ממשקים.
  • kNumElementsInQueue מציין את גודל התור במספר הרשאות הגישה. הוא קובע את גודל מאגר הנתונים הזמני של הזיכרון המשותף שהוקצה לתור.

יצירת האובייקט השני של MessageQueue

הצד השני של תור ההודעות נוצר באמצעות אובייקט MQDescriptor שמתקבל מהצד הראשון. האובייקט MQDescriptor נשלח באמצעות קריאה ל-RPC של HIDL או AIDL לתהליך שמחזיק בקצה השני של תור ההודעות. השדה MQDescriptor מכיל מידע על התור, כולל:

  • מידע למיפוי המאגר ולמצב של מצביע הכתיבה.
  • מידע למיפוי של מצבית הקריאה (אם התור מסונכרן).
  • מידע למיפוי של מילה של דגל אירוע (אם התור חוסם).
  • סוג האובייקט (<T, flavor>), שכולל את הסוג שהוגדר ב-HIDL של רכיבי התור ואת סוג התור (מסונכרן או לא מסונכרן).

אפשר להשתמש באובייקט MQDescriptor כדי ליצור אובייקט MessageQueue:

MessageQueue<T, flavor>::MessageQueue(const MQDescriptor<T, flavor>& Desc, bool resetPointers)

הפרמטר resetPointers מציין אם צריך לאפס את המיקומים של הקריאה והכתיבה ל-0 בזמן יצירת האובייקט MessageQueue. בתור לא מסונכרן, מיקום הקריאה (שמייצג את המיקום המקומי של כל אובייקט MessageQueue בתורים לא מסונכרנים) תמיד מוגדר ל-0 במהלך היצירה. בדרך כלל, MQDescriptor מופעל במהלך יצירת אובייקט תור ההודעות הראשון. כדי לשלוט יותר טוב בזיכרון המשותף, אפשר להגדיר את MQDescriptor באופן ידני (MQDescriptor מוגדר בקובץ system/libhidl/base/include/hidl/MQDescriptor.h), ואז ליצור כל אובייקט MessageQueue כפי שמתואר בקטע הזה.

רצפי חסימה ודגלים של אירועים

כברירת מחדל, תורים לא תומכים בחסימת קריאות וכתיבה. יש שני סוגים של קריאות חסימה לקריאה ולכתיבה:

  • טופס קצר, עם שלושה פרמטרים (מצביע נתונים, מספר פריטים, זמן קצוב לתפוגה), תומך בחסימה של פעולות קריאה וכתיבה נפרדות בתור יחיד. כשמשתמשים בטופס הזה, המערכת מטפלת בדגל האירוע ובמסכות הביטים באופן פנימי, ואובייקט תור ההודעות הראשון צריך להיות מאותחלל עם פרמטר שני של true. לדוגמה:
    // For an unsynchronized FMQ that supports blocking
    mFmqUnsynchronizedBlocking =
      new (std::nothrow) MessageQueue<uint16_t, kUnsynchronizedWrite>
          (kNumElementsInQueue, true /* enable blocking operations */);
    
  • טופס ארוך, עם שישה פרמטרים (כולל דגל אירוע ומסכות ביט), תומך בשימוש באובייקט EventFlag משותף בין מספר תורים ומאפשר לציין את מסכות הביט של ההתראות שבהן רוצים להשתמש. במקרה כזה, צריך לספק את דגל האירוע ואת מסיכות הביטים לכל קריאה וכתיבה.

בטופס הארוך, אפשר לספק את EventFlag באופן מפורש בכל קריאה ל-readBlocking() ול-writeBlocking(). אפשר לאתחל אחת מהתורניות באמצעות דגל אירוע פנימי, ולאחר מכן לחלץ אותו מהאובייקטים מסוג MessageQueue של התורנית הזו באמצעות getEventFlagWord() ולהשתמש בו כדי ליצור אובייקטים מסוג EventFlag בכל תהליך לשימוש עם תורניות FMQ אחרות. לחלופין, אפשר לאתחל את האובייקטים של EventFlag באמצעות כל זיכרון משותף מתאים.

באופן כללי, בכל תור צריך להשתמש רק באחת מהאפשרויות הבאות: חסימה ללא ניתוק, חסימה של סרטוני Shorts או חסימה של סרטונים ארוכים. אין שגיאה בשימוש בשני הרכיבים יחד, אבל נדרש תכנות זהיר כדי לקבל את התוצאה הרצויה.

סימון הזיכרון כזיכרון לקריאה בלבד

כברירת מחדל, לזיכרון המשותף יש הרשאות קריאה וכתיבה. במקרה של תורים לא מסונכרנים (kUnsynchronizedWrite), יכול להיות שמי שכותב את הנתונים ירצה להסיר את הרשאות הכתיבה מכל הקוראים לפני שהוא נותן את האובייקטים מסוג MQDescriptorUnsync. כך מוודאים שהתהליכים האחרים לא יכולים לכתוב לתור, מומלץ לעשות זאת כדי להגן מפני באגים או התנהגות לא טובה בתהליכי הקריאה. אם הכותבים רוצים שהקוראים יוכלו לאפס את התור בכל פעם שהם משתמשים ב-MQDescriptorUnsync כדי ליצור את הצד לקריאה של התור, לא ניתן לסמן את הזיכרון כקריאה בלבד. זוהי התנהגות ברירת המחדל של ה-constructor של MessageQueue. לכן, אם יש משתמשים קיימים בתור הזה, צריך לשנות את הקוד שלהם כדי ליצור את התור באמצעות resetPointer=false.

  • כותב: קוראים ל-ashmem_set_prot_region עם מתאר קובץ MQDescriptor והאזור מוגדר לקריאה בלבד (PROT_READ):
    int res = ashmem_set_prot_region(mqDesc->handle->data[0], PROT_READ)
  • קוראים: יוצרים תור הודעות באמצעות resetPointer=false (ברירת המחדל היא true):
    mFmq = new (std::nothrow) MessageQueue(mqDesc, false);

שימוש ב-MessageQueue

ממשק ה-API הציבורי של האובייקט MessageQueue הוא:

size_t availableToWrite() // Space available (number of elements).
size_t availableToRead() // Number of elements available.
size_t getQuantumSize() // Size of type T in bytes.
size_t getQuantumCount() // Number of items of type T that fit in the FMQ.
bool isValid() // Whether the FMQ is configured correctly.
const MQDescriptor<T, flavor>* getDesc() // Return info to send to other process.

bool write(const T* data)  // Write one T to FMQ; true if successful.
bool write(const T* data, size_t count) // Write count T's; no partial writes.

bool read(T* data); // read one T from FMQ; true if successful.
bool read(T* data, size_t count); // Read count T's; no partial reads.

bool writeBlocking(const T* data, size_t count, int64_t timeOutNanos = 0);
bool readBlocking(T* data, size_t count, int64_t timeOutNanos = 0);

// Allows multiple queues to share a single event flag word
std::atomic<uint32_t>* getEventFlagWord();

bool writeBlocking(const T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr); // Blocking write operation for count Ts.

bool readBlocking(T* data, size_t count, uint32_t readNotification,
uint32_t writeNotification, int64_t timeOutNanos = 0,
android::hardware::EventFlag* evFlag = nullptr) // Blocking read operation for count Ts;

// APIs to allow zero copy read/write operations
bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);
bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);

אפשר להשתמש ב-availableToWrite() וב-availableToRead() כדי לקבוע כמה נתונים אפשר להעביר בפעולה אחת. בתור לא מסונכרן:

  • הפונקציה availableToWrite() תמיד מחזירה את הקיבולת של התור.
  • לכל קורא יש מיקום קריאה משלו והוא מבצע חישוב משלו של availableToRead().
  • מנקודת המבט של קורא איטי, התור יכול לחרוג מנפחו. כתוצאה מכך, יכול להיות ש-availableToRead() יחזיר ערך גדול יותר מהגודל של התור. הקריאה הראשונה אחרי חריגה מברירת המחדל נכשלת, וכתוצאה מכך מיקום הקריאה של הקורא הזה מוגדר כערך של מצביע הכתיבה הנוכחי, גם אם חריגת המכסה דווחה דרך availableToRead() וגם אם לא.

השיטות read() ו-write() מחזירות את הערך true אם ניתן היה להעביר (והועברו) את כל הנתונים המבוקשים אל התור וממנו. השיטות האלה לא חוסמות את הקוד. הן מצליחות (ומחזירות את הערך true) או חוזרות עם שגיאה (false) באופן מיידי.

השיטות readBlocking() ו-writeBlocking() ממתינות עד שאפשר להשלים את הפעולה המבוקשת, או עד שהן מגיעות לזמן קצוב לתפוגה (ערך timeOutNanos של 0 מציין שהזמן הקצוב לתפוגה לא יפוג אף פעם).

פעולות חסימה מיושמות באמצעות מילה של דגל אירוע. כברירת מחדל, כל תור יוצר מילה משלו לדגל ומשתמש בה כדי לתמוך בפורמט המקוצר של readBlocking() ו-writeBlocking(). מספר תורים יכולים לשתף מילה אחת, כך שתהליך יכול להמתין לכתיבת נתונים או לקריאת נתונים מכל אחד מהתור. קריאה לפונקציה getEventFlagWord() מאפשרת לקבל הפניה למילה של דגל האירוע של תור, וניתן להשתמש בהפניה הזו (או בכל הפניה למיקום מתאים בזיכרון המשותף) כדי ליצור אובייקט EventFlag שאפשר להעביר לטופס הארוך של readBlocking() ושל writeBlocking() לתור אחר. הפרמטרים readNotification ו-writeNotification קובעים באילו ביטים בדגל האירוע צריך להשתמש כדי לסמן קריאות וכתיבה באותה תור. readNotification ו-writeNotification הם מסיכות ביט ב-32 ביט.

readBlocking() ממתין לביטים של writeNotification. אם הערך של הפרמטר הזה הוא 0, הקריאה תמיד נכשלת. אם הערך של readNotification הוא 0, הקריאה לא תיכשל, אבל קריאה מוצלחת לא תגדיר ביטים של התראות. בתור מסונכרן, המשמעות היא שהקריאה המתאימה ל-writeBlocking() אף פעם לא מתעוררת, אלא אם הביט מוגדר במקום אחר. בתור לא מסונכרן, הפונקציה writeBlocking() לא ממתינה (עדיין צריך להשתמש בה כדי להגדיר את הביט של ההתראה על הכתיבה), וניתן להגדיר בקריאה אף ביט התראה. באופן דומה, הפונקציה writeblocking() נכשלת אם הערך של readNotification הוא 0, וכתיבה מוצלחת מגדירה את הביטים של writeNotification שצוינו.

כדי להמתין בכמה תורים בו-זמנית, משתמשים ב-method‏ wait() של אובייקט EventFlag כדי להמתין למסכת ביט של התראות. השיטה wait() מחזירה מילה של סטטוס עם הביטים שגרמו להפעלת הסטטוס 'התעוררות'. לאחר מכן, המערכת משתמשת במידע הזה כדי לוודא שיש בשורה המתאימה מספיק מקום או נתונים לפעולות הכתיבה והקריאה הרצויות, ולבצע write() ו-read() ללא חסימה. כדי לקבל התראה בסיום הפעולה, צריך לבצע קריאה נוספת לשיטה wake() של האובייקט EventFlag. להגדרה של הפונקציה הזו, ראו system/libfmq/include/fmq/EventFlag.h.EventFlag

פעולות אפס העתקה

השיטות read,‏ write,‏ readBlocking ו-writeBlocking() מקבלות כערך ארגומנטים את הפונקציה memcpy() כדי להעתיק נתונים בין מאגר הטבעות של FMQ לבין מאגר הטבעות של אותו מאגר. כדי לשפר את הביצועים, Android בגרסה 8.0 ואילך כולל קבוצה של ממשקי API שמספקים גישה ישירה של פוינטר למאגר הטבעת, וכך מבטלים את הצורך בשימוש בקריאות memcpy.

אפשר להשתמש בממשקי ה-API הציבוריים הבאים לפעולות FMQ ללא העתקה:

bool beginWrite(size_t nMessages, MemTransaction* memTx) const;
bool commitWrite(size_t nMessages);

bool beginRead(size_t nMessages, MemTransaction* memTx) const;
bool commitRead(size_t nMessages);
  • השיטה beginWrite מספקת נקודות בסיס למאגר הטבעת של FMQ. אחרי כתיבת הנתונים, מבצעים אותם באמצעות commitWrite(). השיטות beginRead ו-commitRead פועלות באותו אופן.
  • השיטות beginRead ו-Write מקבלות כקלט את מספר ההודעות שרוצים לקרוא ולכתוב, ומחזירות ערך בוליאני שמציין אם אפשר לקרוא או לכתוב את ההודעות. אם אפשר לקרוא או לכתוב, המבנה memTx מאוכלס ב-base pointers שאפשר להשתמש בהם כדי לגשת ישירות למצביעים בזיכרון המשותף של מאגר הטבעות.
  • המבנה MemRegion מכיל פרטים על בלוק של זיכרון, כולל מצביע הבסיס (כתובת הבסיס של בלוק הזיכרון) והאורך במונחים של T (אורך בלוק הזיכרון במונחים של הסוג של תור ההודעות שהוגדר ב-HIDL).
  • המבנה MemTransaction מכיל שני מבני MemRegion, first ו-second, כי קריאה או כתיבה למאגר הטבעת עשויה לדרוש חזרה לתחילת התור. המשמעות היא שצריך שני מצביע בסיס כדי לקרוא ולכתוב נתונים למאגר הטבעת של FMQ.

כדי לקבל את כתובת הבסיס ואת האורך ממבנה MemRegion:

T* getAddress(); // gets the base address
size_t getLength(); // gets the length of the memory region in terms of T
size_t getLengthInBytes(); // gets the length of the memory region in bytes

כדי לקבל הפניות למבנים הראשונים והשניים של MemRegion בתוך אובייקט MemTransaction:

const MemRegion& getFirstRegion(); // get a reference to the first MemRegion
const MemRegion& getSecondRegion(); // get a reference to the second MemRegion

דוגמה לכתיבת נתונים ב-FMQ באמצעות ממשקי API ללא העתקה (zero copy):

MessageQueueSync::MemTransaction tx;
if (mQueue->beginRead(dataLen, &tx)) {
    auto first = tx.getFirstRegion();
    auto second = tx.getSecondRegion();

    foo(first.getAddress(), first.getLength()); // method that performs the data write
    foo(second.getAddress(), second.getLength()); // method that performs the data write

    if(commitWrite(dataLen) == false) {
       // report error
    }
} else {
   // report error
}

שיטות העזר הבאות הן גם חלק מ-MemTransaction:

  • הפונקציה T* getSlot(size_t idx); מחזירה הפניה למק"ט idx ב-MemRegions ששייכים לאובייקט MemTransaction הזה. אם האובייקט MemTransaction מייצג את אזורי הזיכרון לקריאה ולכתיבה של N פריטים מסוג T, הטווח החוקי של idx הוא בין 0 ל-N-1.
  • bool copyTo(const T* data, size_t startIdx, size_t nMessages = 1); כותב nMessages פריטים מסוג T באזורי הזיכרון שמתוארים על ידי האובייקט, החל מהאינדקס startIdx. השיטה הזו משתמשת ב-memcpy() ואינה מיועדת לפעולה ללא העתקה. אם האובייקט MemTransaction מייצג זיכרון לקריאה ולכתיבה של N פריטים מסוג T, טווח הערכים החוקי של idx הוא בין 0 ל-N-1.
  • bool copyFrom(T* data, size_t startIdx, size_t nMessages = 1); היא שיטת עזר לקריאת פריטים מסוג nMessages מסוג T מאזורי הזיכרון שמתוארים על ידי האובייקט, החל מ-startIdx. השיטה הזו משתמשת ב-memcpy() ואינה מיועדת לשימוש בפעולה ללא העתקה.

שליחת התור דרך HIDL

בצד היצירה:

  1. יוצרים אובייקט של תור הודעות כפי שמתואר למעלה.
  2. מוודאים שהאובייקט תקין באמצעות isValid().
  3. אם אתם ממתינים בכמה תורים על ידי העברת EventFlag לטופס הארוך של readBlocking() או writeBlocking(), תוכלו לחלץ את הפונקציה של אירוע הדגל (באמצעות getEventFlagWord()) מאובייקט MessageQueue שהותחל כדי ליצור את הדגל, ולהשתמש בדגל הזה כדי ליצור את האובייקט הנדרש EventFlag.
  4. משתמשים בשיטה MessageQueuegetDesc() כדי לקבל אובייקט מתאר.
  5. בקובץ ה-HAL, מקצים לשיטה פרמטר מסוג fmq_sync או fmq_unsync, כאשר T הוא סוג מתאים שהוגדר ב-HIDL. משתמשים ב-method הזה כדי לשלוח את האובייקט שהוחזר על ידי getDesc() לתהליך המקבל.

בצד המקבל:

  1. משתמשים באובייקט המתאר כדי ליצור אובייקט MessageQueue. צריך להשתמש באותו סוג נתונים ובאותו סוג תור, אחרת אי אפשר יהיה לקמפל את התבנית.
  2. אם חילוצתם דגל אירוע, חילוצו את הדגל מהאובייקט MessageQueue התואם בתהליך המקבל.
  3. משתמשים באובייקט MessageQueue כדי להעביר נתונים.