Skip to main content

Thread-Safe Queues

In the HoloMIT SDK, high-performance data flow between producer and consumer threads is handled using a custom QueueThreadSafe class. This system ensures thread-safe access to shared data buffers (e.g., frames, audio chunks, messages) without blocking the Unity main thread.


Purpose

QueueThreadSafe is a bounded, thread-safe queue with support for:

  • Multiple producers/consumers
  • Blocking and non-blocking operations
  • Optional item dropping when full
  • Clean shutdown handling

It is the primary communication mechanism used between Worker threads and the main thread or between parallel background systems.


Design Overview

  • Uses SemaphoreSlim for producer/consumer signaling
  • Uses a shared Queue<BaseMemoryChunk> for storing data
  • Lock-based synchronization for queue access
  • Graceful closing via CancellationTokenSource

Key Features

FeatureDescription
Fixed CapacityDefined at construction (_size), e.g. new QueueThreadSafe("MyQueue", 8)
Drop When FullIf true, new data is dropped when full (non-blocking); if false, enqueue waits
Blocking DequeueConsumers block until data is available
Timed Enqueue/DequeueTryEnqueue() and TryDequeue() allow timeout-based operations
Shutdown AwarenessWhen Close() is called, all waiting threads are notified

Common Usage Pattern

Producer (e.g. inside a Worker):

BaseMemoryChunk frame = AllocateFrame();
// fill data into frame
queue.Enqueue(frame); // blocks if full (unless dropWhenFull = true)

Consumer (e.g. in Unity Update loop or another thread):

BaseMemoryChunk frame = queue.TryDequeue(10);
if (frame != null) {
// process frame
frame.free(); // release memory
}

Graceful Shutdown

Call queue.Close() during application shutdown or scene unload to:

  • Cancel all pending enqueue/dequeue operations
  • Automatically free any unprocessed items
queue.Close();
if (queue.IsClosed()) { ... }

Runtime Diagnostics

You can query runtime state via:

  • queue._CanEnqueue() – returns true if there's space
  • queue._CanDequeue() – returns true if data is available
  • queue._Count – approx. number of items in queue
  • queue._Peek() – get next item without removing it
  • queue._PeekTimestamp() – get the timestamp of the next frame (if available)

Best Practices

  • Always free memory chunks (item.free()) when they are no longer needed
  • Avoid large queue sizes unless strictly necessary
  • Use TryEnqueue(timeout) instead of blocking Enqueue() in time-sensitive threads
  • Always call Close() on shutdown to release waiting threads and free data
  • Prefer lock-free queues only when thread contention becomes a bottleneck

The QueueThreadSafe system is optimized for use in high-throughput pipelines where reliable and ordered data transfer is essential. It enables modular and decoupled multithreaded systems with predictable performance characteristics.