Thread Pool API ​
The thread pool module provides parallel log processing capabilities with work stealing, priority queues, and concurrent sink writing.
Quick Reference: Method Aliases ​
| Full Method | Alias(es) | Description |
|---|---|---|
init() | create() | Initialize thread pool |
deinit() | destroy() | Deinitialize thread pool |
start() | begin() | Start worker threads |
shutdown() | stop(), halt() | Stop worker threads |
submit() | push(), enqueue(), add() | Submit task to pool |
submitFn() | run() | Submit function task |
submitBatchWithRetry() | submitBatchRetry() | Batch submit with retry budget |
waitAll() | await(), join() | Wait for all tasks |
waitAllTimeout() | waitForAll() | Wait for completion with timeout |
pendingTasks() | queueDepth(), size() | Get pending task count |
pendingTasksByQueue() | queueBreakdown() | Get global/local queue depth snapshot |
queueCapacity() | totalQueueCapacity() | Get total queue capacity |
availableQueueCapacity() | freeQueueCapacity(), availableCapacity() | Get free queue capacity |
canAcceptTasks() | hasCapacityFor() | Check if queue can accept N tasks |
queueUtilization() | queueLoad() | Get queue utilization |
isSaturated() | saturated() | Check if queue load is above threshold |
waitUntilQueueBelow() | waitForQueueBelow() | Wait until pending queue <= threshold |
activeThreads() | workerCount() | Get active thread count |
clear() | discard(), flush() | Clear pending tasks |
getStats() | statistics() | Get pool statistics |
Overview ​
const logly = @import("logly");
const ThreadPool = logly.ThreadPool;
const ThreadPoolPresets = logly.ThreadPoolPresets;Centralized Configuration ​
Thread pool can be enabled through the central Config struct:
var config = logly.Config.default();
config.thread_pool = .{
.enabled = true,
.thread_count = 4, // 0 = auto-detect
.queue_size = 10000,
.work_stealing = true,
};
const logger = try logly.Logger.initWithConfig(allocator, config);Or use the fluent API:
const config = logly.Config.default().withThreadPool(.{ .thread_count = 4 });Types ​
ThreadPool ​
The core thread pool implementation that manages worker threads and task distribution. It employs a work-stealing algorithm to balance load across multiple worker threads, ensuring high throughput and low latency.
Fields:
allocator: The memory allocator used for internal structures.config: The active configuration for the thread pool.workers: Slice of worker threads.work_queue: The global task queue for incoming tasks.stats: Performance statistics (submitted, completed, stolen tasks).running: Atomic flag indicating the pool's operational state.
pub const ThreadPool = struct {
allocator: std.mem.Allocator,
config: ThreadPoolConfig,
workers: []Worker,
work_queue: WorkQueue,
stats: ThreadPoolStats,
running: std.atomic.Value(bool),
};ThreadPoolConfig ​
Configuration available through Config.ThreadPoolConfig. This struct controls the initialization and behavior of the thread pool.
Fields:
enabled: Master switch to enable/disable the thread pool.thread_count: Number of worker threads to spawn. Set to 0 to automatically detect and use the number of available CPU cores.queue_size: Capacity of the global task queue. If the queue is full, submission may block or fail depending on policy.stack_size: Stack size allocated for each worker thread (in bytes). Default is 1MB.work_stealing: Enables the work-stealing algorithm, allowing idle workers to take tasks from busy workers' local queues.thread_name_prefix: Prefix for worker thread names (default: "logly-worker").keep_alive_ms: Keep-alive time for idle threads in milliseconds.thread_affinity: Enable thread affinity (pin threads to CPUs).
pub const ThreadPoolConfig = struct {
/// Enable thread pool for parallel processing.
enabled: bool = false,
/// Number of worker threads (0 = auto-detect based on CPU cores).
thread_count: usize = 0,
/// Maximum queue size for pending tasks.
queue_size: usize = 10000,
/// Stack size per thread in bytes.
stack_size: usize = 1024 * 1024,
/// Enable work stealing between threads.
work_stealing: bool = true,
/// Enable per-worker arena allocator.
enable_arena: bool = false,
/// Thread naming prefix.
thread_name_prefix: []const u8 = "logly-worker",
/// Keep alive time for idle threads (milliseconds).
keep_alive_ms: u64 = 60000,
/// Enable thread affinity (pin threads to CPUs).
thread_affinity: bool = false,
};ThreadPoolPresets ​
Helper functions to create common thread pool configurations. All presets use Constants.ThreadDefaults for consistent defaults.
pub const ThreadPoolPresets = struct {
/// Default configuration: auto-detect threads, standard queue size.
pub fn default() ThreadPoolConfig { ... }
/// High-throughput configuration: larger queues, work stealing enabled.
pub fn highThroughput() ThreadPoolConfig { ... }
/// Low-resource configuration: minimal threads, small queues.
pub fn lowResource() ThreadPoolConfig { ... }
/// I/O-bound configuration: optimized for disk/network workloads.
/// Uses 2x CPU cores for better I/O parallelism.
pub fn ioBound() ThreadPoolConfig { ... }
/// CPU-bound configuration: optimized for compute-heavy tasks.
/// Uses exactly CPU core count with work stealing disabled.
pub fn cpuBound() ThreadPoolConfig { ... }
};TaskPriority ​
Priority levels for tasks.
pub const TaskPriority = enum(u8) {
low = 0,
normal = 1,
high = 2,
critical = 3,
};ThreadPoolStats ​
Statistics for the thread pool.
Note: Atomic counters are architecture-dependent. On 64-bit targets these use 64-bit atomics (u64); on 32-bit targets they use 32-bit atomics (u32).
pub const ThreadPoolStats = struct {
tasks_submitted: std.atomic.Value(u64),
tasks_completed: std.atomic.Value(u64),
tasks_dropped: std.atomic.Value(u64),
tasks_stolen: std.atomic.Value(u64),
total_wait_time_ns: std.atomic.Value(u64),
total_exec_time_ns: std.atomic.Value(u64),
active_threads: std.atomic.Value(u32),
pub fn avgWaitTimeNs(self: *const ThreadPoolStats) u64;
pub fn avgExecTimeNs(self: *const ThreadPoolStats) u64;
pub fn throughput(self: *const ThreadPoolStats) f64;
};ParallelSinkWriter ​
Writes to multiple sinks in parallel with full configuration support, buffering, retry logic, and statistics.
pub const ParallelSinkWriter = struct {
allocator: std.mem.Allocator,
pool: *ThreadPool,
config: ParallelConfig,
sinks: std.ArrayList(SinkHandle),
buffer: std.ArrayList([]const u8),
mutex: std.Thread.Mutex,
stats: ParallelStats,
pub const SinkHandle = struct {
write_fn: *const fn (data: []const u8) void,
flush_fn: ?*const fn () void,
name: []const u8,
enabled: bool,
};
pub const ParallelStats = struct {
writes_submitted: std.atomic.Value(u64),
writes_completed: std.atomic.Value(u64),
writes_failed: std.atomic.Value(u64),
retries: std.atomic.Value(u64),
bytes_written: std.atomic.Value(u64),
pub fn successRate(self: *const ParallelStats) f64;
};
};ParallelConfig ​
Configuration for parallel sink writing operations. Available through Config.ParallelConfig.
pub const ParallelConfig = struct {
/// Maximum concurrent writes allowed at once.
max_concurrent: usize = 8,
/// Timeout for each write operation (ms).
write_timeout_ms: u64 = 1000,
/// Retry failed writes automatically.
retry_on_failure: bool = true,
/// Maximum number of retry attempts.
max_retries: u3 = 3,
/// Fail-fast mode: abort on any sink error.
fail_fast: bool = false,
/// Buffer writes before parallel dispatch.
buffered: bool = true,
/// Buffer size for buffered writes.
buffer_size: usize = 64,
// Presets
pub fn default() ParallelConfig;
pub fn highThroughput() ParallelConfig;
pub fn lowLatency() ParallelConfig;
pub fn reliable() ParallelConfig;
};ThreadPool Methods ​
init ​
Create a new thread pool with default configuration.
Alias: create
pub fn init(allocator: std.mem.Allocator) !*ThreadPoolinitWithConfig ​
Create a new thread pool with custom configuration.
pub fn initWithConfig(allocator: std.mem.Allocator, config: ThreadPoolConfig) !*ThreadPoolParameters:
allocator: Memory allocatorconfig: Thread pool configuration
Returns: A pointer to the new ThreadPool instance
deinit ​
Clean up resources and stop all workers.
Alias: destroy
pub fn deinit(self: *ThreadPool) voidstart ​
Start all worker threads.
pub fn start(self: *ThreadPool) !voidstop ​
Stop all workers gracefully.
pub fn stop(self: *ThreadPool) voidsubmit ​
Submits a task to the pool.
pub fn submit(self: *ThreadPool, task: Task, priority: WorkItem.Priority) boolsubmitFn ​
Submits a function for execution with normal priority.
pub fn submitFn(self: *ThreadPool, func: *const fn (?std.mem.Allocator) void) boolsubmitCallback ​
Submits a callback with context with normal priority.
pub fn submitCallback(self: *ThreadPool, func: *const fn (*anyopaque, ?std.mem.Allocator) void, context: *anyopaque) boolsubmitHighPriority ​
Shortcut for submitting a high priority callback.
pub fn submitHighPriority(self: *ThreadPool, func: *const fn (*anyopaque, ?std.mem.Allocator) void, context: *anyopaque) boolsubmitCritical ​
Shortcut for submitting a critical priority callback.
pub fn submitCritical(self: *ThreadPool, func: *const fn (*anyopaque, ?std.mem.Allocator) void, context: *anyopaque) boolsubmitBatch ​
Submits multiple tasks at once. Returns the number of successfully submitted tasks.
pub fn submitBatch(self: *ThreadPool, tasks: []const Task, priority: WorkItem.Priority) usizesubmitBatchWithRetry ​
Submits tasks with bounded retries for transient queue pressure.
pub fn submitBatchWithRetry(self: *ThreadPool, tasks: []const Task, priority: WorkItem.Priority, max_attempts: u8, retry_delay_us: u32) usizetrySubmit ​
Attempts to submit without blocking. Returns true if successful.
pub fn trySubmit(self: *ThreadPool, task: Task, priority: WorkItem.Priority) boolsubmitToWorker ​
Submits to a specific worker's local queue.
pub fn submitToWorker(self: *ThreadPool, worker_id: usize, task: Task, priority: WorkItem.Priority) boolwaitAll ​
Wait until all submitted tasks are completed.
pub fn waitAll(self: *ThreadPool) voidwaitAllTimeout ​
Wait for all submitted tasks until timeout.
pub fn waitAllTimeout(self: *ThreadPool, timeout_ms: u64) boolwaitUntilQueueBelow ​
Wait until pending queue depth is below or equal to a threshold.
pub fn waitUntilQueueBelow(self: *ThreadPool, threshold: usize, timeout_ms: u64) boolReturns true when queue depth reaches threshold before timeout.
queueCapacity ​
Get total queue capacity across global and worker-local queues.
pub fn queueCapacity(self: *const ThreadPool) usizependingTasksByQueue ​
Returns queue depth split by global and worker-local queues.
pub fn pendingTasksByQueue(self: *ThreadPool) QueueDepthQueueDepth fields:
global: tasks in global queuelocal: total tasks across all worker local queuestotal:global + local
availableQueueCapacity ​
Get currently available queue slots.
pub fn availableQueueCapacity(self: *ThreadPool) usizecanAcceptTasks ​
Checks whether the pool has enough free queue slots for required_slots.
pub fn canAcceptTasks(self: *ThreadPool, required_slots: usize) boolqueueUtilization ​
Get queue load ratio in [0.0, 1.0].
pub fn queueUtilization(self: *ThreadPool) f64isSaturated ​
Returns true if queue utilization is above a threshold.
pub fn isSaturated(self: *ThreadPool, threshold: f64) boolgetStats ​
Get current pool statistics.
pub fn getStats(self: *const ThreadPool) ThreadPoolStatsThreadPoolStats Methods ​
Getter Methods ​
| Method | Return | Description |
|---|---|---|
getSubmitted() | u64 | Get total tasks submitted |
getCompleted() | u64 | Get total tasks completed |
getDropped() | u64 | Get total tasks dropped |
getStolen() | u64 | Get total tasks stolen (work stealing) |
getTotalWaitTimeNs() | u64 | Get total wait time in nanoseconds |
getTotalExecTimeNs() | u64 | Get total execution time in nanoseconds |
getActiveThreads() | u32 | Get currently active threads |
Boolean Checks ​
| Method | Return | Description |
|---|---|---|
hasSubmitted() | bool | Check if any tasks have been submitted |
hasCompleted() | bool | Check if any tasks have been completed |
hasDropped() | bool | Check if any tasks have been dropped |
hasStolen() | bool | Check if any tasks have been stolen |
Rate Calculations ​
| Method | Return | Description |
|---|---|---|
completionRate() | f64 | Calculate task completion rate (0.0 - 1.0) |
dropRate() | f64 | Calculate task drop rate (0.0 - 1.0) |
stealRate() | f64 | Calculate work stealing rate (0.0 - 1.0) |
throughput() | f64 | Calculate tasks per second |
avgWaitTimeNs() | u64 | Get average task wait time in nanoseconds |
avgWaitTimeMs() | f64 | Get average task wait time in milliseconds |
avgExecTimeNs() | u64 | Get average task execution time in nanoseconds |
avgExecTimeMs() | f64 | Get average task execution time in milliseconds |
Reset ​
| Method | Description |
|---|---|
reset() | Reset all statistics to initial state |
Legacy Methods ​
pub fn throughput(self: *const ThreadPoolStats) f64
pub fn avgWaitTimeNs(self: *const ThreadPoolStats) u64
pub fn avgExecTimeNs(self: *const ThreadPoolStats) u64ParallelSinkWriter Methods ​
init ​
Create a new parallel sink writer with default configuration.
pub fn init(allocator: std.mem.Allocator, pool: *ThreadPool) !*ParallelSinkWriterinitWithConfig ​
Create a new parallel sink writer with custom configuration.
pub fn initWithConfig(allocator: std.mem.Allocator, pool: *ThreadPool, config: ParallelConfig) !*ParallelSinkWriterdeinit ​
Clean up resources and flush any buffered data.
pub fn deinit(self: *ParallelSinkWriter) voidaddSink ​
Add a sink for parallel writing.
pub fn addSink(self: *ParallelSinkWriter, handle: SinkHandle) !voidremoveSink ​
Remove a sink by name.
pub fn removeSink(self: *ParallelSinkWriter, name: []const u8) voidsetSinkEnabled ​
Enable or disable a sink by name.
pub fn setSinkEnabled(self: *ParallelSinkWriter, name: []const u8, enabled: bool) voidwriteParallel / write ​
Write to all enabled sinks in parallel.
pub fn writeParallel(self: *ParallelSinkWriter, data: []const u8) voidflushBuffer ​
Flush any buffered writes immediately.
pub fn flushBuffer(self: *ParallelSinkWriter) voidflushAll / flush ​
Flush buffer and all sinks.
pub fn flushAll(self: *ParallelSinkWriter) voidgetStats ​
Get current parallel write statistics.
pub fn getStats(self: *const ParallelSinkWriter) ParallelStatssinkCount ​
Get the number of registered sinks.
pub fn sinkCount(self: *const ParallelSinkWriter) usizehasEnabledSinks ​
Check if any sinks are enabled.
pub fn hasEnabledSinks(self: *ParallelSinkWriter) boolPresets ​
singleThread ​
Single-threaded pool for testing or simple use cases.
pub fn singleThread() ThreadPoolConfig {
return .{
.thread_count = 1,
.work_stealing = false,
.queue_size = 256,
};
}cpuBound ​
Optimized for CPU-intensive tasks. Uses Constants.ThreadDefaults.cpuBoundThreadCount().
pub fn cpuBound() ThreadPoolConfig {
return .{
.thread_count = Constants.ThreadDefaults.cpuBoundThreadCount(),
.queue_size = Constants.ThreadDefaults.queue_size,
.work_stealing = false,
.stack_size = Constants.ThreadDefaults.stack_size,
};
}ioBound ​
Optimized for I/O-intensive tasks. Uses Constants.ThreadDefaults.ioBoundThreadCount().
pub fn ioBound() ThreadPoolConfig {
return .{
.thread_count = Constants.ThreadDefaults.ioBoundThreadCount(),
.queue_size = Constants.ThreadDefaults.queue_size * 2,
.work_stealing = true,
.stack_size = Constants.ThreadDefaults.stack_size,
};
}highThroughput ​
Maximum throughput configuration. Uses Constants.ThreadDefaults.max_tasks for larger queue.
pub fn highThroughput() ThreadPoolConfig {
return .{
.thread_count = 0, // Auto-detect
.queue_size = Constants.ThreadDefaults.max_tasks,
.work_stealing = true,
.stack_size = 2 * Constants.ThreadDefaults.stack_size,
};
}Usage Example ​
const std = @import("std");
const logly = @import("logly");
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
// Create thread pool with CPU-bound preset
var pool = try logly.ThreadPool.init(
allocator,
logly.ThreadPoolPresets.cpuBound(),
);
defer pool.deinit();
// Start workers
try pool.start();
defer pool.stop();
// Submit tasks
var counter: std.atomic.Value(u32) = std.atomic.Value(u32).init(0);
for (0..100) |i| {
_ = pool.submitCallback(incrementTask, &counter);
}
// Wait for completion
pool.waitAll();
// Check stats
const stats = pool.getStats();
std.debug.print("Completed: {d}, Throughput: {d:.2} tasks/sec\\n", .{
stats.getCompleted(),
stats.throughput(),
});
}
fn incrementTask(ctx: *anyopaque, _: ?std.mem.Allocator) void {
const counter: *std.atomic.Value(u32) = @alignCast(@ptrCast(ctx));
_ = counter.fetchAdd(1, .monotonic);
}Aliases ​
The ThreadPool module provides convenience aliases:
| Alias | Method |
|---|---|
flush | clear |
statistics | getStats |
stop | shutdown |
halt | shutdown |
begin | start |
add | submit |
Additional Methods ​
isEmpty() bool- Returns true if no pending tasksisFull() bool- Returns true if queue is at capacityutilization() f64- Returns thread pool utilization ratio (0.0 - 1.0)resetStats() void- Resets all statistics
See Also ​
- Thread Pool Guide - Usage patterns
- Async API - Async logging with ring buffers
- Scheduler API - Scheduled tasks
- Configuration Guide - Full configuration options
