Thread-Safe Queues
In the HoloMIT SDK, high-performance data flow between producer and consumer threads is handled using a custom QueueThreadSafe class. This system ensures thread-safe access to shared data buffers (e.g., frames, audio chunks, messages) without blocking the Unity main thread.
Purpose
QueueThreadSafe is a bounded, thread-safe queue with support for:
- Multiple producers/consumers
- Blocking and non-blocking operations
- Optional item dropping when full
- Clean shutdown handling
It is the primary communication mechanism used between Worker threads and the main thread or between parallel background systems.
Design Overview
- Uses
SemaphoreSlimfor producer/consumer signaling - Uses a shared
Queue<BaseMemoryChunk>for storing data - Lock-based synchronization for queue access
- Graceful closing via
CancellationTokenSource
Key Features
| Feature | Description |
|---|---|
| Fixed Capacity | Defined at construction (_size), e.g. new QueueThreadSafe("MyQueue", 8) |
| Drop When Full | If true, new data is dropped when full (non-blocking); if false, enqueue waits |
| Blocking Dequeue | Consumers block until data is available |
| Timed Enqueue/Dequeue | TryEnqueue() and TryDequeue() allow timeout-based operations |
| Shutdown Awareness | When Close() is called, all waiting threads are notified |
Common Usage Pattern
Producer (e.g. inside a Worker):
BaseMemoryChunk frame = AllocateFrame();
// fill data into frame
queue.Enqueue(frame); // blocks if full (unless dropWhenFull = true)
Consumer (e.g. in Unity Update loop or another thread):
BaseMemoryChunk frame = queue.TryDequeue(10);
if (frame != null) {
// process frame
frame.free(); // release memory
}
Graceful Shutdown
Call queue.Close() during application shutdown or scene unload to:
- Cancel all pending enqueue/dequeue operations
- Automatically free any unprocessed items
queue.Close();
if (queue.IsClosed()) { ... }
Runtime Diagnostics
You can query runtime state via:
queue._CanEnqueue()– returns true if there's spacequeue._CanDequeue()– returns true if data is availablequeue._Count– approx. number of items in queuequeue._Peek()– get next item without removing itqueue._PeekTimestamp()– get the timestamp of the next frame (if available)
Best Practices
- Always free memory chunks (
item.free()) when they are no longer needed - Avoid large queue sizes unless strictly necessary
- Use
TryEnqueue(timeout)instead of blockingEnqueue()in time-sensitive threads - Always call
Close()on shutdown to release waiting threads and free data - Prefer lock-free queues only when thread contention becomes a bottleneck
The
QueueThreadSafesystem is optimized for use in high-throughput pipelines where reliable and ordered data transfer is essential. It enables modular and decoupled multithreaded systems with predictable performance characteristics.