Thread-Safe Queues
In the HoloMIT SDK, high-performance data flow between producer and consumer threads is handled using a custom QueueThreadSafe
class. This system ensures thread-safe access to shared data buffers (e.g., frames, audio chunks, messages) without blocking the Unity main thread.
Purpose
QueueThreadSafe
is a bounded, thread-safe queue with support for:
- Multiple producers/consumers
- Blocking and non-blocking operations
- Optional item dropping when full
- Clean shutdown handling
It is the primary communication mechanism used between Worker threads and the main thread or between parallel background systems.
Design Overview
- Uses
SemaphoreSlim
for producer/consumer signaling - Uses a shared
Queue<BaseMemoryChunk>
for storing data - Lock-based synchronization for queue access
- Graceful closing via
CancellationTokenSource
Key Features
Feature | Description |
---|---|
Fixed Capacity | Defined at construction (_size ), e.g. new QueueThreadSafe("MyQueue", 8) |
Drop When Full | If true , new data is dropped when full (non-blocking); if false , enqueue waits |
Blocking Dequeue | Consumers block until data is available |
Timed Enqueue/Dequeue | TryEnqueue() and TryDequeue() allow timeout-based operations |
Shutdown Awareness | When Close() is called, all waiting threads are notified |
Common Usage Pattern
Producer (e.g. inside a Worker):
BaseMemoryChunk frame = AllocateFrame();
// fill data into frame
queue.Enqueue(frame); // blocks if full (unless dropWhenFull = true)
Consumer (e.g. in Unity Update loop or another thread):
BaseMemoryChunk frame = queue.TryDequeue(10);
if (frame != null) {
// process frame
frame.free(); // release memory
}
Graceful Shutdown
Call queue.Close()
during application shutdown or scene unload to:
- Cancel all pending enqueue/dequeue operations
- Automatically free any unprocessed items
queue.Close();
if (queue.IsClosed()) { ... }
Runtime Diagnostics
You can query runtime state via:
queue._CanEnqueue()
– returns true if there's spacequeue._CanDequeue()
– returns true if data is availablequeue._Count
– approx. number of items in queuequeue._Peek()
– get next item without removing itqueue._PeekTimestamp()
– get the timestamp of the next frame (if available)
Best Practices
- Always free memory chunks (
item.free()
) when they are no longer needed - Avoid large queue sizes unless strictly necessary
- Use
TryEnqueue(timeout)
instead of blockingEnqueue()
in time-sensitive threads - Always call
Close()
on shutdown to release waiting threads and free data - Prefer lock-free queues only when thread contention becomes a bottleneck
The
QueueThreadSafe
system is optimized for use in high-throughput pipelines where reliable and ordered data transfer is essential. It enables modular and decoupled multithreaded systems with predictable performance characteristics.