Thread Pool API ​
The thread pool module provides parallel log processing capabilities with work stealing, priority queues, and concurrent sink writing.
Quick Reference: Method Aliases ​
| Full Method | Alias(es) | Description |
|---|---|---|
init() | create() | Initialize thread pool |
deinit() | destroy() | Deinitialize thread pool |
start() | begin() | Start worker threads |
shutdown() | stop(), halt() | Stop worker threads |
submit() | add() | Submit task to pool |
getStats() | statistics() | Get pool statistics |
Overview ​
const logly = @import("logly");
const ThreadPool = logly.ThreadPool;
const ThreadPoolPresets = logly.ThreadPoolPresets;Centralized Configuration ​
Thread pool can be enabled through the central Config struct:
var config = logly.Config.default();
config.thread_pool = .{
.enabled = true,
.thread_count = 4, // 0 = auto-detect
.queue_size = 10000,
.work_stealing = true,
};
const logger = try logly.Logger.initWithConfig(allocator, config);Or use the fluent API:
const config = logly.Config.default().withThreadPool(.{ .thread_count = 4 });Types ​
ThreadPool ​
The core thread pool implementation that manages worker threads and task distribution. It employs a work-stealing algorithm to balance load across multiple worker threads, ensuring high throughput and low latency.
Fields:
allocator: The memory allocator used for internal structures.config: The active configuration for the thread pool.workers: Slice of worker threads.work_queue: The global task queue for incoming tasks.stats: Performance statistics (submitted, completed, stolen tasks).running: Atomic flag indicating the pool's operational state.
pub const ThreadPool = struct {
allocator: std.mem.Allocator,
config: ThreadPoolConfig,
workers: []Worker,
work_queue: WorkQueue,
stats: ThreadPoolStats,
running: std.atomic.Value(bool),
};ThreadPoolConfig ​
Configuration available through Config.ThreadPoolConfig. This struct controls the initialization and behavior of the thread pool.
Fields:
enabled: Master switch to enable/disable the thread pool.thread_count: Number of worker threads to spawn. Set to 0 to automatically detect and use the number of available CPU cores.queue_size: Capacity of the global task queue. If the queue is full, submission may block or fail depending on policy.stack_size: Stack size allocated for each worker thread (in bytes). Default is 1MB.work_stealing: Enables the work-stealing algorithm, allowing idle workers to take tasks from busy workers' local queues.thread_name_prefix: Prefix for worker thread names (default: "logly-worker").keep_alive_ms: Keep-alive time for idle threads in milliseconds.thread_affinity: Enable thread affinity (pin threads to CPUs).
pub const ThreadPoolConfig = struct {
/// Enable thread pool for parallel processing.
enabled: bool = false,
/// Number of worker threads (0 = auto-detect based on CPU cores).
thread_count: usize = 0,
/// Maximum queue size for pending tasks.
queue_size: usize = 10000,
/// Stack size per thread in bytes.
stack_size: usize = 1024 * 1024,
/// Enable work stealing between threads.
work_stealing: bool = true,
/// Enable per-worker arena allocator.
enable_arena: bool = false,
/// Thread naming prefix.
thread_name_prefix: []const u8 = "logly-worker",
/// Keep alive time for idle threads (milliseconds).
keep_alive_ms: u64 = 60000,
/// Enable thread affinity (pin threads to CPUs).
thread_affinity: bool = false,
};ThreadPoolPresets ​
Helper functions to create common thread pool configurations.
pub const ThreadPoolPresets = struct {
/// Default configuration: auto-detect threads, standard queue size.
pub fn default() ThreadPoolConfig { ... }
/// High-throughput configuration: larger queues, work stealing enabled.
pub fn highThroughput() ThreadPoolConfig { ... }
/// Low-resource configuration: minimal threads, small queues.
pub fn lowResource() ThreadPoolConfig { ... }
};TaskPriority ​
Priority levels for tasks.
pub const TaskPriority = enum(u8) {
low = 0,
normal = 1,
high = 2,
critical = 3,
};ThreadPoolStats ​
Statistics for the thread pool.
Note: Atomic counters are architecture-dependent. On 64-bit targets these use 64-bit atomics (u64); on 32-bit targets they use 32-bit atomics (u32).
pub const ThreadPoolStats = struct {
tasks_submitted: std.atomic.Value(u64),
tasks_completed: std.atomic.Value(u64),
tasks_dropped: std.atomic.Value(u64),
tasks_stolen: std.atomic.Value(u64),
total_wait_time_ns: std.atomic.Value(u64),
total_exec_time_ns: std.atomic.Value(u64),
active_threads: std.atomic.Value(u32),
pub fn avgWaitTimeNs(self: *const ThreadPoolStats) u64;
pub fn avgExecTimeNs(self: *const ThreadPoolStats) u64;
pub fn throughput(self: *const ThreadPoolStats) f64;
};ParallelSinkWriter ​
Writes to multiple sinks in parallel with full configuration support, buffering, retry logic, and statistics.
pub const ParallelSinkWriter = struct {
allocator: std.mem.Allocator,
pool: *ThreadPool,
config: ParallelConfig,
sinks: std.ArrayList(SinkHandle),
buffer: std.ArrayList([]const u8),
mutex: std.Thread.Mutex,
stats: ParallelStats,
pub const SinkHandle = struct {
write_fn: *const fn (data: []const u8) void,
flush_fn: ?*const fn () void,
name: []const u8,
enabled: bool,
};
pub const ParallelStats = struct {
writes_submitted: std.atomic.Value(u64),
writes_completed: std.atomic.Value(u64),
writes_failed: std.atomic.Value(u64),
retries: std.atomic.Value(u64),
bytes_written: std.atomic.Value(u64),
pub fn successRate(self: *const ParallelStats) f64;
};
};ParallelConfig ​
Configuration for parallel sink writing operations. Available through Config.ParallelConfig.
pub const ParallelConfig = struct {
/// Maximum concurrent writes allowed at once.
max_concurrent: usize = 8,
/// Timeout for each write operation (ms).
write_timeout_ms: u64 = 1000,
/// Retry failed writes automatically.
retry_on_failure: bool = true,
/// Maximum number of retry attempts.
max_retries: u3 = 3,
/// Fail-fast mode: abort on any sink error.
fail_fast: bool = false,
/// Buffer writes before parallel dispatch.
buffered: bool = true,
/// Buffer size for buffered writes.
buffer_size: usize = 64,
// Presets
pub fn default() ParallelConfig;
pub fn highThroughput() ParallelConfig;
pub fn lowLatency() ParallelConfig;
pub fn reliable() ParallelConfig;
};ThreadPool Methods ​
init ​
Create a new thread pool with default configuration.
Alias: create
pub fn init(allocator: std.mem.Allocator) !*ThreadPoolinitWithConfig ​
Create a new thread pool with custom configuration.
pub fn initWithConfig(allocator: std.mem.Allocator, config: ThreadPoolConfig) !*ThreadPoolParameters:
allocator: Memory allocatorconfig: Thread pool configuration
Returns: A pointer to the new ThreadPool instance
deinit ​
Clean up resources and stop all workers.
Alias: destroy
pub fn deinit(self: *ThreadPool) voidstart ​
Start all worker threads.
pub fn start(self: *ThreadPool) !voidstop ​
Stop all workers gracefully.
pub fn stop(self: *ThreadPool) voidsubmit ​
Submits a task to the pool.
pub fn submit(self: *ThreadPool, task: Task, priority: WorkItem.Priority) boolsubmitFn ​
Submits a function for execution with normal priority.
pub fn submitFn(self: *ThreadPool, func: *const fn (?std.mem.Allocator) void) boolsubmitCallback ​
Submits a callback with context with normal priority.
pub fn submitCallback(self: *ThreadPool, func: *const fn (*anyopaque, ?std.mem.Allocator) void, context: *anyopaque) boolsubmitHighPriority ​
Shortcut for submitting a high priority callback.
pub fn submitHighPriority(self: *ThreadPool, func: *const fn (*anyopaque, ?std.mem.Allocator) void, context: *anyopaque) boolsubmitCritical ​
Shortcut for submitting a critical priority callback.
pub fn submitCritical(self: *ThreadPool, func: *const fn (*anyopaque, ?std.mem.Allocator) void, context: *anyopaque) boolsubmitBatch ​
Submits multiple tasks at once. Returns the number of successfully submitted tasks.
pub fn submitBatch(self: *ThreadPool, tasks: []const Task, priority: WorkItem.Priority) usizetrySubmit ​
Attempts to submit without blocking. Returns true if successful.
pub fn trySubmit(self: *ThreadPool, task: Task, priority: WorkItem.Priority) boolsubmitToWorker ​
Submits to a specific worker's local queue.
pub fn submitToWorker(self: *ThreadPool, worker_id: usize, task: Task, priority: WorkItem.Priority) boolwaitAll ​
Wait until all submitted tasks are completed.
pub fn waitAll(self: *ThreadPool) voidgetStats ​
Get current pool statistics.
pub fn getStats(self: *const ThreadPool) ThreadPoolStatsThreadPoolStats Methods ​
throughput ​
Calculate tasks per second.
pub fn throughput(self: *const ThreadPoolStats) f64avgWaitTimeNs ​
Get average task wait time.
pub fn avgWaitTimeNs(self: *const ThreadPoolStats) u64avgExecTimeNs ​
Get average task execution time.
pub fn avgExecTimeNs(self: *const ThreadPoolStats) u64ParallelSinkWriter Methods ​
init ​
Create a new parallel sink writer with default configuration.
pub fn init(allocator: std.mem.Allocator, pool: *ThreadPool) !*ParallelSinkWriterinitWithConfig ​
Create a new parallel sink writer with custom configuration.
pub fn initWithConfig(allocator: std.mem.Allocator, pool: *ThreadPool, config: ParallelConfig) !*ParallelSinkWriterdeinit ​
Clean up resources and flush any buffered data.
pub fn deinit(self: *ParallelSinkWriter) voidaddSink ​
Add a sink for parallel writing.
pub fn addSink(self: *ParallelSinkWriter, handle: SinkHandle) !voidremoveSink ​
Remove a sink by name.
pub fn removeSink(self: *ParallelSinkWriter, name: []const u8) voidsetSinkEnabled ​
Enable or disable a sink by name.
pub fn setSinkEnabled(self: *ParallelSinkWriter, name: []const u8, enabled: bool) voidwriteParallel / write ​
Write to all enabled sinks in parallel.
pub fn writeParallel(self: *ParallelSinkWriter, data: []const u8) voidflushBuffer ​
Flush any buffered writes immediately.
pub fn flushBuffer(self: *ParallelSinkWriter) voidflushAll / flush ​
Flush buffer and all sinks.
pub fn flushAll(self: *ParallelSinkWriter) voidgetStats ​
Get current parallel write statistics.
pub fn getStats(self: *const ParallelSinkWriter) ParallelStatssinkCount ​
Get the number of registered sinks.
pub fn sinkCount(self: *const ParallelSinkWriter) usizehasEnabledSinks ​
Check if any sinks are enabled.
pub fn hasEnabledSinks(self: *ParallelSinkWriter) boolPresets ​
singleThread ​
Single-threaded pool for testing or simple use cases.
pub fn singleThread() ThreadPoolConfig {
return .{
.thread_count = 1,
.work_stealing = false,
.queue_size = 256,
};
}cpuBound ​
Optimized for CPU-intensive tasks.
pub fn cpuBound() ThreadPoolConfig {
const cpu_count = std.Thread.getCpuCount() catch 4;
return .{
.thread_count = cpu_count,
.work_stealing = true,
};
}ioBound ​
Optimized for I/O-intensive tasks.
pub fn ioBound() ThreadPoolConfig {
const cpu_count = std.Thread.getCpuCount() catch 4;
return .{
.thread_count = cpu_count * 2,
.queue_size = 2048,
.work_stealing = true,
};
}highThroughput ​
Maximum throughput configuration.
pub fn highThroughput() ThreadPoolConfig {
return .{
.thread_count = 0, // Auto
.queue_size = 4096,
.work_stealing = true,
};
}Usage Example ​
const std = @import("std");
const logly = @import("logly");
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
// Create thread pool with CPU-bound preset
var pool = try logly.ThreadPool.init(
allocator,
logly.ThreadPoolPresets.cpuBound(),
);
defer pool.deinit();
// Start workers
try pool.start();
defer pool.stop();
// Submit tasks
var counter: std.atomic.Value(u32) = std.atomic.Value(u32).init(0);
for (0..100) |i| {
_ = pool.submitCallback(incrementTask, &counter);
}
// Wait for completion
pool.waitAll();
// Check stats
const stats = pool.getStats();
std.debug.print("Completed: {d}, Throughput: {d:.2} tasks/sec\n", .{
stats.tasks_completed.load(.monotonic),
stats.throughput(),
});
}
fn incrementTask(ctx: *anyopaque, _: ?std.mem.Allocator) void {
const counter: *std.atomic.Value(u32) = @alignCast(@ptrCast(ctx));
_ = counter.fetchAdd(1, .monotonic);
}Aliases ​
The ThreadPool module provides convenience aliases:
| Alias | Method |
|---|---|
flush | clear |
statistics | getStats |
stop | shutdown |
halt | shutdown |
begin | start |
add | submit |
Additional Methods ​
isEmpty() bool- Returns true if no pending tasksisFull() bool- Returns true if queue is at capacityutilization() f64- Returns thread pool utilization ratio (0.0 - 1.0)resetStats() void- Resets all statistics
See Also ​
- Thread Pool Guide - Usage patterns
- Async API - Async logging with ring buffers
- Scheduler API - Scheduled tasks
- Configuration Guide - Full configuration options
