Skip to content

Thread Pool API ​

The thread pool module provides parallel log processing capabilities with work stealing, priority queues, and concurrent sink writing.

Quick Reference: Method Aliases ​

Full MethodAlias(es)Description
init()create()Initialize thread pool
deinit()destroy()Deinitialize thread pool
start()begin()Start worker threads
shutdown()stop(), halt()Stop worker threads
submit()push(), enqueue(), add()Submit task to pool
submitFn()run()Submit function task
submitBatchWithRetry()submitBatchRetry()Batch submit with retry budget
waitAll()await(), join()Wait for all tasks
waitAllTimeout()waitForAll()Wait for completion with timeout
pendingTasks()queueDepth(), size()Get pending task count
pendingTasksByQueue()queueBreakdown()Get global/local queue depth snapshot
queueCapacity()totalQueueCapacity()Get total queue capacity
availableQueueCapacity()freeQueueCapacity(), availableCapacity()Get free queue capacity
canAcceptTasks()hasCapacityFor()Check if queue can accept N tasks
queueUtilization()queueLoad()Get queue utilization
isSaturated()saturated()Check if queue load is above threshold
waitUntilQueueBelow()waitForQueueBelow()Wait until pending queue <= threshold
activeThreads()workerCount()Get active thread count
clear()discard(), flush()Clear pending tasks
getStats()statistics()Get pool statistics

Overview ​

zig
const logly = @import("logly");
const ThreadPool = logly.ThreadPool;
const ThreadPoolPresets = logly.ThreadPoolPresets;

Centralized Configuration ​

Thread pool can be enabled through the central Config struct:

zig
var config = logly.Config.default();
config.thread_pool = .{
    .enabled = true,
    .thread_count = 4,  // 0 = auto-detect
    .queue_size = 10000,
    .work_stealing = true,
};
const logger = try logly.Logger.initWithConfig(allocator, config);

Or use the fluent API:

zig
const config = logly.Config.default().withThreadPool(.{ .thread_count = 4 });

Types ​

ThreadPool ​

The core thread pool implementation that manages worker threads and task distribution. It employs a work-stealing algorithm to balance load across multiple worker threads, ensuring high throughput and low latency.

Fields:

  • allocator: The memory allocator used for internal structures.
  • config: The active configuration for the thread pool.
  • workers: Slice of worker threads.
  • work_queue: The global task queue for incoming tasks.
  • stats: Performance statistics (submitted, completed, stolen tasks).
  • running: Atomic flag indicating the pool's operational state.
zig
pub const ThreadPool = struct {
    allocator: std.mem.Allocator,
    config: ThreadPoolConfig,
    workers: []Worker,
    work_queue: WorkQueue,
    stats: ThreadPoolStats,
    running: std.atomic.Value(bool),
};

ThreadPoolConfig ​

Configuration available through Config.ThreadPoolConfig. This struct controls the initialization and behavior of the thread pool.

Fields:

  • enabled: Master switch to enable/disable the thread pool.
  • thread_count: Number of worker threads to spawn. Set to 0 to automatically detect and use the number of available CPU cores.
  • queue_size: Capacity of the global task queue. If the queue is full, submission may block or fail depending on policy.
  • stack_size: Stack size allocated for each worker thread (in bytes). Default is 1MB.
  • work_stealing: Enables the work-stealing algorithm, allowing idle workers to take tasks from busy workers' local queues.
  • thread_name_prefix: Prefix for worker thread names (default: "logly-worker").
  • keep_alive_ms: Keep-alive time for idle threads in milliseconds.
  • thread_affinity: Enable thread affinity (pin threads to CPUs).
zig
pub const ThreadPoolConfig = struct {
    /// Enable thread pool for parallel processing.
    enabled: bool = false,
    /// Number of worker threads (0 = auto-detect based on CPU cores).
    thread_count: usize = 0,
    /// Maximum queue size for pending tasks.
    queue_size: usize = 10000,
    /// Stack size per thread in bytes.
    stack_size: usize = 1024 * 1024,
    /// Enable work stealing between threads.
    work_stealing: bool = true,
    /// Enable per-worker arena allocator.
    enable_arena: bool = false,
    /// Thread naming prefix.
    thread_name_prefix: []const u8 = "logly-worker",
    /// Keep alive time for idle threads (milliseconds).
    keep_alive_ms: u64 = 60000,
    /// Enable thread affinity (pin threads to CPUs).
    thread_affinity: bool = false,
};

ThreadPoolPresets ​

Helper functions to create common thread pool configurations. All presets use Constants.ThreadDefaults for consistent defaults.

zig
pub const ThreadPoolPresets = struct {
    /// Default configuration: auto-detect threads, standard queue size.
    pub fn default() ThreadPoolConfig { ... }

    /// High-throughput configuration: larger queues, work stealing enabled.
    pub fn highThroughput() ThreadPoolConfig { ... }

    /// Low-resource configuration: minimal threads, small queues.
    pub fn lowResource() ThreadPoolConfig { ... }

    /// I/O-bound configuration: optimized for disk/network workloads.
    /// Uses 2x CPU cores for better I/O parallelism.
    pub fn ioBound() ThreadPoolConfig { ... }

    /// CPU-bound configuration: optimized for compute-heavy tasks.
    /// Uses exactly CPU core count with work stealing disabled.
    pub fn cpuBound() ThreadPoolConfig { ... }
};

TaskPriority ​

Priority levels for tasks.

zig
pub const TaskPriority = enum(u8) {
    low = 0,
    normal = 1,
    high = 2,
    critical = 3,
};

ThreadPoolStats ​

Statistics for the thread pool.

Note: Atomic counters are architecture-dependent. On 64-bit targets these use 64-bit atomics (u64); on 32-bit targets they use 32-bit atomics (u32).

zig
pub const ThreadPoolStats = struct {
    tasks_submitted: std.atomic.Value(u64),
    tasks_completed: std.atomic.Value(u64),
    tasks_dropped: std.atomic.Value(u64),
    tasks_stolen: std.atomic.Value(u64),
    total_wait_time_ns: std.atomic.Value(u64),
    total_exec_time_ns: std.atomic.Value(u64),
    active_threads: std.atomic.Value(u32),

    pub fn avgWaitTimeNs(self: *const ThreadPoolStats) u64;
    pub fn avgExecTimeNs(self: *const ThreadPoolStats) u64;
    pub fn throughput(self: *const ThreadPoolStats) f64;
};

ParallelSinkWriter ​

Writes to multiple sinks in parallel with full configuration support, buffering, retry logic, and statistics.

zig
pub const ParallelSinkWriter = struct {
    allocator: std.mem.Allocator,
    pool: *ThreadPool,
    config: ParallelConfig,
    sinks: std.ArrayList(SinkHandle),
    buffer: std.ArrayList([]const u8),
    mutex: std.Thread.Mutex,
    stats: ParallelStats,

    pub const SinkHandle = struct {
        write_fn: *const fn (data: []const u8) void,
        flush_fn: ?*const fn () void,
        name: []const u8,
        enabled: bool,
    };

    pub const ParallelStats = struct {
        writes_submitted: std.atomic.Value(u64),
        writes_completed: std.atomic.Value(u64),
        writes_failed: std.atomic.Value(u64),
        retries: std.atomic.Value(u64),
        bytes_written: std.atomic.Value(u64),

        pub fn successRate(self: *const ParallelStats) f64;
    };
};

ParallelConfig ​

Configuration for parallel sink writing operations. Available through Config.ParallelConfig.

zig
pub const ParallelConfig = struct {
    /// Maximum concurrent writes allowed at once.
    max_concurrent: usize = 8,
    /// Timeout for each write operation (ms).
    write_timeout_ms: u64 = 1000,
    /// Retry failed writes automatically.
    retry_on_failure: bool = true,
    /// Maximum number of retry attempts.
    max_retries: u3 = 3,
    /// Fail-fast mode: abort on any sink error.
    fail_fast: bool = false,
    /// Buffer writes before parallel dispatch.
    buffered: bool = true,
    /// Buffer size for buffered writes.
    buffer_size: usize = 64,

    // Presets
    pub fn default() ParallelConfig;
    pub fn highThroughput() ParallelConfig;
    pub fn lowLatency() ParallelConfig;
    pub fn reliable() ParallelConfig;
};

ThreadPool Methods ​

init ​

Create a new thread pool with default configuration.

Alias: create

zig
pub fn init(allocator: std.mem.Allocator) !*ThreadPool

initWithConfig ​

Create a new thread pool with custom configuration.

zig
pub fn initWithConfig(allocator: std.mem.Allocator, config: ThreadPoolConfig) !*ThreadPool

Parameters:

  • allocator: Memory allocator
  • config: Thread pool configuration

Returns: A pointer to the new ThreadPool instance

deinit ​

Clean up resources and stop all workers.

Alias: destroy

zig
pub fn deinit(self: *ThreadPool) void

start ​

Start all worker threads.

zig
pub fn start(self: *ThreadPool) !void

stop ​

Stop all workers gracefully.

zig
pub fn stop(self: *ThreadPool) void

submit ​

Submits a task to the pool.

zig
pub fn submit(self: *ThreadPool, task: Task, priority: WorkItem.Priority) bool

submitFn ​

Submits a function for execution with normal priority.

zig
pub fn submitFn(self: *ThreadPool, func: *const fn (?std.mem.Allocator) void) bool

submitCallback ​

Submits a callback with context with normal priority.

zig
pub fn submitCallback(self: *ThreadPool, func: *const fn (*anyopaque, ?std.mem.Allocator) void, context: *anyopaque) bool

submitHighPriority ​

Shortcut for submitting a high priority callback.

zig
pub fn submitHighPriority(self: *ThreadPool, func: *const fn (*anyopaque, ?std.mem.Allocator) void, context: *anyopaque) bool

submitCritical ​

Shortcut for submitting a critical priority callback.

zig
pub fn submitCritical(self: *ThreadPool, func: *const fn (*anyopaque, ?std.mem.Allocator) void, context: *anyopaque) bool

submitBatch ​

Submits multiple tasks at once. Returns the number of successfully submitted tasks.

zig
pub fn submitBatch(self: *ThreadPool, tasks: []const Task, priority: WorkItem.Priority) usize

submitBatchWithRetry ​

Submits tasks with bounded retries for transient queue pressure.

zig
pub fn submitBatchWithRetry(self: *ThreadPool, tasks: []const Task, priority: WorkItem.Priority, max_attempts: u8, retry_delay_us: u32) usize

trySubmit ​

Attempts to submit without blocking. Returns true if successful.

zig
pub fn trySubmit(self: *ThreadPool, task: Task, priority: WorkItem.Priority) bool

submitToWorker ​

Submits to a specific worker's local queue.

zig
pub fn submitToWorker(self: *ThreadPool, worker_id: usize, task: Task, priority: WorkItem.Priority) bool

waitAll ​

Wait until all submitted tasks are completed.

zig
pub fn waitAll(self: *ThreadPool) void

waitAllTimeout ​

Wait for all submitted tasks until timeout.

zig
pub fn waitAllTimeout(self: *ThreadPool, timeout_ms: u64) bool

waitUntilQueueBelow ​

Wait until pending queue depth is below or equal to a threshold.

zig
pub fn waitUntilQueueBelow(self: *ThreadPool, threshold: usize, timeout_ms: u64) bool

Returns true when queue depth reaches threshold before timeout.

queueCapacity ​

Get total queue capacity across global and worker-local queues.

zig
pub fn queueCapacity(self: *const ThreadPool) usize

pendingTasksByQueue ​

Returns queue depth split by global and worker-local queues.

zig
pub fn pendingTasksByQueue(self: *ThreadPool) QueueDepth

QueueDepth fields:

  • global: tasks in global queue
  • local: total tasks across all worker local queues
  • total: global + local

availableQueueCapacity ​

Get currently available queue slots.

zig
pub fn availableQueueCapacity(self: *ThreadPool) usize

canAcceptTasks ​

Checks whether the pool has enough free queue slots for required_slots.

zig
pub fn canAcceptTasks(self: *ThreadPool, required_slots: usize) bool

queueUtilization ​

Get queue load ratio in [0.0, 1.0].

zig
pub fn queueUtilization(self: *ThreadPool) f64

isSaturated ​

Returns true if queue utilization is above a threshold.

zig
pub fn isSaturated(self: *ThreadPool, threshold: f64) bool

getStats ​

Get current pool statistics.

zig
pub fn getStats(self: *const ThreadPool) ThreadPoolStats

ThreadPoolStats Methods ​

Getter Methods ​

MethodReturnDescription
getSubmitted()u64Get total tasks submitted
getCompleted()u64Get total tasks completed
getDropped()u64Get total tasks dropped
getStolen()u64Get total tasks stolen (work stealing)
getTotalWaitTimeNs()u64Get total wait time in nanoseconds
getTotalExecTimeNs()u64Get total execution time in nanoseconds
getActiveThreads()u32Get currently active threads

Boolean Checks ​

MethodReturnDescription
hasSubmitted()boolCheck if any tasks have been submitted
hasCompleted()boolCheck if any tasks have been completed
hasDropped()boolCheck if any tasks have been dropped
hasStolen()boolCheck if any tasks have been stolen

Rate Calculations ​

MethodReturnDescription
completionRate()f64Calculate task completion rate (0.0 - 1.0)
dropRate()f64Calculate task drop rate (0.0 - 1.0)
stealRate()f64Calculate work stealing rate (0.0 - 1.0)
throughput()f64Calculate tasks per second
avgWaitTimeNs()u64Get average task wait time in nanoseconds
avgWaitTimeMs()f64Get average task wait time in milliseconds
avgExecTimeNs()u64Get average task execution time in nanoseconds
avgExecTimeMs()f64Get average task execution time in milliseconds

Reset ​

MethodDescription
reset()Reset all statistics to initial state

Legacy Methods ​

zig
pub fn throughput(self: *const ThreadPoolStats) f64
pub fn avgWaitTimeNs(self: *const ThreadPoolStats) u64
pub fn avgExecTimeNs(self: *const ThreadPoolStats) u64

ParallelSinkWriter Methods ​

init ​

Create a new parallel sink writer with default configuration.

zig
pub fn init(allocator: std.mem.Allocator, pool: *ThreadPool) !*ParallelSinkWriter

initWithConfig ​

Create a new parallel sink writer with custom configuration.

zig
pub fn initWithConfig(allocator: std.mem.Allocator, pool: *ThreadPool, config: ParallelConfig) !*ParallelSinkWriter

deinit ​

Clean up resources and flush any buffered data.

zig
pub fn deinit(self: *ParallelSinkWriter) void

addSink ​

Add a sink for parallel writing.

zig
pub fn addSink(self: *ParallelSinkWriter, handle: SinkHandle) !void

removeSink ​

Remove a sink by name.

zig
pub fn removeSink(self: *ParallelSinkWriter, name: []const u8) void

setSinkEnabled ​

Enable or disable a sink by name.

zig
pub fn setSinkEnabled(self: *ParallelSinkWriter, name: []const u8, enabled: bool) void

writeParallel / write ​

Write to all enabled sinks in parallel.

zig
pub fn writeParallel(self: *ParallelSinkWriter, data: []const u8) void

flushBuffer ​

Flush any buffered writes immediately.

zig
pub fn flushBuffer(self: *ParallelSinkWriter) void

flushAll / flush ​

Flush buffer and all sinks.

zig
pub fn flushAll(self: *ParallelSinkWriter) void

getStats ​

Get current parallel write statistics.

zig
pub fn getStats(self: *const ParallelSinkWriter) ParallelStats

sinkCount ​

Get the number of registered sinks.

zig
pub fn sinkCount(self: *const ParallelSinkWriter) usize

hasEnabledSinks ​

Check if any sinks are enabled.

zig
pub fn hasEnabledSinks(self: *ParallelSinkWriter) bool

Presets ​

singleThread ​

Single-threaded pool for testing or simple use cases.

zig
pub fn singleThread() ThreadPoolConfig {
    return .{
        .thread_count = 1,
        .work_stealing = false,
        .queue_size = 256,
    };
}

cpuBound ​

Optimized for CPU-intensive tasks. Uses Constants.ThreadDefaults.cpuBoundThreadCount().

zig
pub fn cpuBound() ThreadPoolConfig {
    return .{
        .thread_count = Constants.ThreadDefaults.cpuBoundThreadCount(),
        .queue_size = Constants.ThreadDefaults.queue_size,
        .work_stealing = false,
        .stack_size = Constants.ThreadDefaults.stack_size,
    };
}

ioBound ​

Optimized for I/O-intensive tasks. Uses Constants.ThreadDefaults.ioBoundThreadCount().

zig
pub fn ioBound() ThreadPoolConfig {
    return .{
        .thread_count = Constants.ThreadDefaults.ioBoundThreadCount(),
        .queue_size = Constants.ThreadDefaults.queue_size * 2,
        .work_stealing = true,
        .stack_size = Constants.ThreadDefaults.stack_size,
    };
}

highThroughput ​

Maximum throughput configuration. Uses Constants.ThreadDefaults.max_tasks for larger queue.

zig
pub fn highThroughput() ThreadPoolConfig {
    return .{
        .thread_count = 0, // Auto-detect
        .queue_size = Constants.ThreadDefaults.max_tasks,
        .work_stealing = true,
        .stack_size = 2 * Constants.ThreadDefaults.stack_size,
    };
}

Usage Example ​

zig
const std = @import("std");
const logly = @import("logly");

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer _ = gpa.deinit();
    const allocator = gpa.allocator();

    // Create thread pool with CPU-bound preset
    var pool = try logly.ThreadPool.init(
        allocator,
        logly.ThreadPoolPresets.cpuBound(),
    );
    defer pool.deinit();

    // Start workers
    try pool.start();
    defer pool.stop();

    // Submit tasks
    var counter: std.atomic.Value(u32) = std.atomic.Value(u32).init(0);
    
    for (0..100) |i| {
        _ = pool.submitCallback(incrementTask, &counter);
    }

    // Wait for completion
    pool.waitAll();

    // Check stats
    const stats = pool.getStats();
    std.debug.print("Completed: {d}, Throughput: {d:.2} tasks/sec\\n", .{
        stats.getCompleted(),
        stats.throughput(),
    });
}

fn incrementTask(ctx: *anyopaque, _: ?std.mem.Allocator) void {
    const counter: *std.atomic.Value(u32) = @alignCast(@ptrCast(ctx));
    _ = counter.fetchAdd(1, .monotonic);
}

Aliases ​

The ThreadPool module provides convenience aliases:

AliasMethod
flushclear
statisticsgetStats
stopshutdown
haltshutdown
beginstart
addsubmit

Additional Methods ​

  • isEmpty() bool - Returns true if no pending tasks
  • isFull() bool - Returns true if queue is at capacity
  • utilization() f64 - Returns thread pool utilization ratio (0.0 - 1.0)
  • resetStats() void - Resets all statistics

See Also ​

Released under the MIT License.