Message Queue with per priority memory allocation#5205
Conversation
| fpm_types_values.BooleanType | ||
| )) | ||
|
|
||
| def resolved_type_to_cpp_size_expr(self, resolved_type: fpm_types_values.Type, analysis: fpm_Analysis) -> str: |
| virtual ~PriorityMemQueue(); | ||
|
|
||
| //! \brief copy constructor is forbidden | ||
| PriorityMemQueue(const QueueInterface& other) = delete; |
| PriorityMemQueue(const QueueInterface& other) = delete; | ||
|
|
||
| //! \brief copy constructor is forbidden | ||
| PriorityMemQueue(const QueueInterface* other) = delete; |
There was a problem hiding this comment.
CodeQL found more than 20 potential problems in the proposed changes. Check the Files changed tab for more details.
…re conflicts Adopt the reviewed CountingSemaphore API from nasa#5204 (now merged into devel) and adapt PriorityMemQueue/AtomicQueue to the single-argument constructor. Co-Authored-By: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This PR introduces a new generic, priority-aware queueing implementation intended to prevent low-importance “drop” traffic from exhausting queue resources needed by higher-importance traffic (e.g., avoiding ComQueue asserts). It adds a lock-free Types::AtomicQueue as a building block, an Os::Generic::PriorityMemQueue that composes one atomic queue per priority, and a topology-driven Python analyzer to auto-generate per-component/per-priority buffer sizing constants.
Changes:
- Add
Types::AtomicQueue(lock-free MPMC circular buffer) plus unit tests and a design document. - Add
Os::Generic::PriorityMemQueue(per-priority queues + priority tracking + blocking receive) plus extensive rule-based and validation unit tests. - Add
priority_buffer_analyzer.pyto generatePriorityBufferSizesAc.hppfrom topology JSON artifacts.
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
Os/Generic/Types/AtomicQueue.hpp |
Declares new lock-free atomic queue API with optional blocking enqueue support. |
Os/Generic/Types/AtomicQueue.cpp |
Implements atomic queue create/teardown/enqueue/dequeue/size helpers. |
Os/Generic/Types/test/ut/AtomicQueue/AtomicQueueTest.cpp |
Adds unit tests for basic and blocking behaviors of AtomicQueue. |
Os/Generic/Types/docs/sdd.md |
Adds SDD documenting the AtomicQueue algorithm and memory-ordering model. |
Os/Generic/Types/CMakeLists.txt |
Wires AtomicQueue into the Os_Generic_Types module and attempts to register its UT. |
Os/Generic/PriorityMemQueue.hpp |
Declares PriorityMemQueue and configuration structures/handle. |
Os/Generic/PriorityMemQueue.cpp |
Implements configured per-priority queues, send/receive, and config lifecycle. |
Os/Generic/test/ut/PriorityMemQueueTests.cpp |
Adds functional + concurrency + rule-based tests for PriorityMemQueue. |
Os/Generic/test/ut/PriorityMemQueueRules.hpp |
Adds STest rules used by PriorityMemQueueTests.cpp. |
Os/Generic/test/ut/PriorityMemQueueInputValidationTests.cpp |
Adds input validation tests for edge cases and invalid usage. |
Os/Generic/docs/sdd.md |
Extends OS Generic SDD with a new PriorityMemQueue section and usage notes. |
Os/Generic/DefaultPriorityMemQueue.cpp |
Adds an Os::Queue delegate implementation backed by PriorityMemQueue. |
Os/Generic/CMakeLists.txt |
Registers the new PriorityMemQueue implementation and unit tests. |
cmake/autocoder/scripts/priority_buffer_analyzer.py |
Adds topology analysis script to generate per-priority buffer size constants. |
.github/actions/spelling/expect.txt |
Adds spelling allow-list entries for new terminology (atomics, bitmask, MPMC, etc.). |
| FwSizeType AtomicQueue::getSize() const { | ||
| // Safe to call on uninitialized queue | ||
| if (this->m_capacity == 0) { | ||
| return 0; | ||
| } | ||
|
|
||
| FwSizeType enq = this->m_enqueuePos.load(std::memory_order_relaxed); | ||
| FwSizeType deq = this->m_dequeuePos.load(std::memory_order_relaxed); | ||
| FwSignedSizeType diff = static_cast<FwSignedSizeType>(enq) - static_cast<FwSignedSizeType>(deq); | ||
| FW_ASSERT(diff >= 0, static_cast<FwAssertArgType>(enq), static_cast<FwAssertArgType>(deq)); | ||
| FW_ASSERT(static_cast<FwSizeType>(diff) <= this->m_capacity, static_cast<FwAssertArgType>(diff), | ||
| static_cast<FwAssertArgType>(this->m_capacity)); | ||
|
|
||
| // Approximate size (may be slightly stale in concurrent access) | ||
| return static_cast<FwSizeType>(diff); |
| void PriorityMemQueueHandle::deallocateArrays(Fw::MemAllocator& allocator, FwEnumStoreType allocatorId) { | ||
| // Deallocate arrays in reverse order using stored allocator ID | ||
| FwSizeType arraySize = static_cast<FwSizeType>(this->m_maxPriority) + 1; | ||
| if (this->m_highWaterMarks != nullptr) { | ||
| // Explicitly destroy atomics before deallocation | ||
| for (FwSizeType i = 0; i < arraySize; ++i) { | ||
| this->m_highWaterMarks[i].~atomic(); | ||
| } | ||
| allocator.deallocate(allocatorId, this->m_highWaterMarks); | ||
| this->m_highWaterMarks = nullptr; | ||
| } | ||
| if (this->m_atomicQueues != nullptr) { | ||
| // Explicitly destroy AtomicQueues before deallocation | ||
| for (FwSizeType i = 0; i < arraySize; ++i) { | ||
| this->m_atomicQueues[i].~AtomicQueue(); | ||
| } | ||
| allocator.deallocate(allocatorId, this->m_atomicQueues); | ||
| this->m_atomicQueues = nullptr; | ||
| } | ||
| this->m_maxPriority = 0; | ||
| this->m_allocatorId = 0; | ||
| } |
| // Get enabled priorities and non-empty hint mask | ||
| U32 enabledPriorities = this->m_handle.m_priorityMask.load(std::memory_order_acquire); | ||
| U32 nonEmptyHint = this->m_handle.m_nonEmptyMask.load(std::memory_order_acquire); | ||
|
|
||
| // Combine masks: only scan priorities that are both enabled and likely non-empty | ||
| U32 scanMask = enabledPriorities & nonEmptyHint; | ||
|
|
||
| // Scan priorities and attempt dequeue | ||
| if (scanAndDequeue(this->m_handle, destination, capacity, actualSize, priority, scanMask)) { | ||
| return QueueInterface::Status::OP_OK; | ||
| } |
| void PriorityMemQueue::resetConfig() { | ||
| // Only call this in test environments after all queues are destroyed | ||
| if (s_configsUsed != nullptr) { | ||
| // Get allocator (same as used in config()) | ||
| Fw::MemAllocator& allocator = Fw::MemAllocatorRegistry::getInstance().getAnAllocator( | ||
| Fw::MemoryAllocation::MemoryAllocatorType::OS_GENERIC_PRIORITY_QUEUE); | ||
| FwEnumStoreType allocatorId = 0; | ||
|
|
||
| // Deallocate the tracking array | ||
| allocator.deallocate(allocatorId, s_configsUsed); | ||
| s_configsUsed = nullptr; |
|
|
||
| ## Os::PriorityMemQueue | ||
|
|
||
| Os::PriorityMemQueue is an ISR-safe and SMP-safe, priority-based memory queue implementation for F´ using VxWorks message queues (msgQ). This implementation leverages the VxWorks msgQ API to provide robust multi-core synchronization without custom spinlock management, eliminating interrupt starvation issues while maintaining ISR safety. |
There was a problem hiding this comment.
Lots of notes about VxWorks which doesn't seem right - likely the docs were generated before AtomicQueue was introduced? We need to update
| this->m_bufferSize = bufferSize; | ||
| this->m_allocator = &allocator; | ||
| this->m_allocatorId = allocatorId; | ||
| this->m_id = allocatorId; // Store for diagnostic logging |
There was a problem hiding this comment.
Is this using the same allocatorId on purpose ?
| namespace Queue { | ||
| // JPL heritage implementation supported up to 32 priorities per message queue, limiting to 16 to reduce memory | ||
| // footprint | ||
| constexpr static FwSizeType MAX_PRIORITIES = 16; |
There was a problem hiding this comment.
This should be clearly documented as it will limit usable priorities if we want to use standard components with PriorityMemQueue
Also need to remove the comment about JPL heritage
|
|
||
| // Test configuration with single queue and single priority | ||
| TEST(PriorityMemQueueConfig, SingleQueueSinglePriority) { | ||
| printf("\n=== TEST: SingleQueueSinglePriority ===\n"); |
There was a problem hiding this comment.
Googletest can handle the logging, we shouldn't need to do any of that
| Os::Generic::PriorityMemQueue queue; | ||
|
|
||
| // Configure multiple priorities | ||
| Os::Generic::PriorityMemQueue::QueuePriorityConfig priorityConfigs[] = {{0, 128, 10}, {1, 128, 10}, {2, 128, 10}}; |
There was a problem hiding this comment.
There's a lot of duplicated setup code - it'd be nice if AI could use a GTest fixture or helper functions to handle this and reduce the verbosity a bit
|
|
||
| //! \brief Constructor to initialize members | ||
| PriorityMemQueueHandle() | ||
| : m_atomicQueues(nullptr), m_maxPriority(0), m_notEmptySem(nullptr), m_highWaterMarks(nullptr) {} |
There was a problem hiding this comment.
Not all member variables are initialized properly. Could use the = 0/=nullptr on the definition rather than constructor init list
thomas-bc
left a comment
There was a problem hiding this comment.
Also I agree with your note in your PR, we really ought to be testing this in an actual deployment - we should tag up
Change Description
This PR adds:
The code in this PR depends on #5204 and cannot be merged until that PR is complete
Rationale
F' allows a component to mark some ports as drop and some ports as assert on full. This allows "drop" ports to fill up the message queue and cause "assert" ports to fail to send a message leading to an assert. E.g. If the queue is filled up by messages that could be dropped, and then a single message comes in that cannot be dropped, ComQueue triggers an assertion:
Adding a message queue with per priority memory allocation resolves that
Separating messages by priority is also useful for ports with the same drop behavior when the different messages have different priorities (e.g. it would be bad for EventManager to drop a WARNING HI event because DIAGNOSTIC events consumed memory pool).
Testing/Review Recommendations
There's unit tests and PriorityMemQueue has been tested in a private deployment. I'd like to add it to a deployment in fprime-community but I'm not sure which
Future Work
F' components should be updated to use different priorities when sending messages with different urgencies or with different queue full behavior.
AI Usage (see policy)
Claude code - Sonnet 4.5. Used to assist with code generation under human supervision (careful review for the flight code, less careful review for the unit test code) and with documentation