Thread Pool Guide
This guide covers parallel log processing in Logly using thread pools, including configuration, task submission, work stealing, and best practices.
Overview
The thread pool module enables parallel log processing for high-throughput scenarios. It provides configurable worker threads, priority queues, work stealing, and parallel sink writing.
Logger Configuration
Configure thread pool settings through the logger's Config:
const logly = @import("logly");
var config = logly.Config.default();
config.thread_pool = .{
.enabled = true, // Enable thread pool
.thread_count = 8, // Number of worker threads
.queue_size = 2048, // Max queued tasks
.stack_size = 1024 * 1024, // 1MB per thread
.work_stealing = true, // Enable work stealing
};
// Or use helper method
var config2 = logly.Config.default().withThreadPool(4); // 4 worker threadsQuick Start
const std = @import("std");
const logly = @import("logly");
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
// Create thread pool with default settings
var pool = try logly.ThreadPool.init(allocator, .{
.thread_count = 4,
.work_stealing = true,
});
defer pool.deinit();
// Start workers
try pool.start();
defer pool.stop();
// Submit tasks
// pool.submit(...);
}Configuration
Thread Count
// Specific number of threads
.thread_count = 8
// Auto-detect (0 = CPU cores)
.thread_count = 0Queue Size
// Per-thread queue size
.queue_size = 1024
// Large queue for bursty workloads
.queue_size = 4096Work Stealing
Enable threads to steal work from other threads' queues:
.work_stealing = trueThis improves load balancing when some threads finish faster than others.
Arena Allocation
Enable per-worker arena allocation for efficient memory usage:
.enable_arena = trueWhen enabled, each worker thread maintains its own arena allocator. This is particularly useful for formatting operations, as it reduces contention on the global allocator and improves cache locality. The arena is automatically reset after each task.
Priority Queues
Enable task prioritization:
.enable_priorities = truePresets
Use built-in presets for common scenarios:
// Single thread (for testing/debugging)
const single = logly.ThreadPoolPresets.singleThread();
// CPU-bound tasks (N threads, work stealing)
const cpu = logly.ThreadPoolPresets.cpuBound();
// I/O-bound tasks (2N threads, large queues)
const io = logly.ThreadPoolPresets.ioBound();
// Maximum throughput
const high = logly.ThreadPoolPresets.highThroughput();Submitting Tasks
Basic Task Submission
try pool.submit(.{
.func = myTaskFunction,
.context = @ptrCast(&myData),
.priority = .normal,
.submitted_at = std.time.milliTimestamp(),
});
fn myTaskFunction(ctx: *anyopaque) void {
const data: *MyData = @alignCast(@ptrCast(ctx));
// Process data...
}Priority Levels
pub const TaskPriority = enum(u8) {
low = 0, // Background tasks
normal = 1, // Regular logging
high = 2, // Important logs
critical = 3, // Error/alert logs
};Submit with Priority
try pool.submitWithPriority(
myFunction,
@ptrCast(&data),
.critical,
);Parallel Sink Writing
Write to multiple sinks concurrently:
var writer = try logly.ParallelSinkWriter.init(allocator, .{
.max_concurrent = 4,
.retry_on_failure = true,
.fail_fast = false,
});
defer writer.deinit();
// Add sinks
try writer.addSink(&file_sink);
try writer.addSink(&console_sink);
try writer.addSink(&network_sink);
// Write to all sinks in parallel
try writer.write(&record);Configuration Options
pub const ParallelConfig = struct {
max_concurrent: usize = 8, // Max parallel writes
write_timeout_ms: u64 = 1000, // Timeout per write
retry_on_failure: bool = true, // Retry failed writes
max_retries: u3 = 3, // Max retry attempts
fail_fast: bool = false, // Stop on first error
buffered: bool = true, // Buffer before dispatch
buffer_size: usize = 64, // Buffer size
};Statistics
Monitor pool performance:
const stats = pool.getStats();
std.debug.print("Tasks completed: {d}\n", .{
stats.tasks_completed.load(.monotonic),
});
std.debug.print("Tasks stolen: {d}\n", .{
stats.tasks_stolen.load(.monotonic),
});
std.debug.print("Throughput: {d:.2} tasks/sec\n", .{
stats.throughput(),
});
std.debug.print("Avg wait time: {d}ns\n", .{
stats.averageWaitTimeNs(),
});
std.debug.print("Avg exec time: {d}ns\n", .{
stats.averageExecTimeNs(),
});Use Cases
1. High-Volume Logging
// Use high throughput preset
var pool = try logly.ThreadPool.init(
allocator,
logly.ThreadPoolPresets.highThroughput(),
);2. Multiple Log Destinations
// Write to file, console, and network simultaneously
var writer = try logly.ParallelSinkWriter.init(allocator, .{
.max_concurrent = 3,
});
try writer.addSink(&file_sink);
try writer.addSink(&console_sink);
try writer.addSink(&network_sink);3. Batch Processing
// Process log batches in parallel
for (log_batches) |batch| {
try pool.submit(.{
.func = processBatch,
.context = @ptrCast(&batch),
});
}
pool.waitIdle();4. Priority-Based Logging
// Critical logs get processed first
try pool.submitWithPriority(logError, &error_data, .critical);
// Regular logs processed normally
try pool.submitWithPriority(logInfo, &info_data, .normal);
// Debug logs processed last
try pool.submitWithPriority(logDebug, &debug_data, .low);Work Stealing
Work stealing improves efficiency when:
- Tasks have variable execution times
- Some threads finish faster than others
- You want better CPU utilization
var pool = try logly.ThreadPool.init(allocator, .{
.work_stealing = true, // Enable work stealing
.thread_count = 8,
});How It Works
- Each thread has its own work queue
- When a thread's queue is empty, it "steals" from others
- Stealing is done from the back of other queues
- This balances work across all threads
Integration with Async Logger
Combine thread pools with async logging:
var async_logger = try logly.AsyncLogger.init(allocator, .{
.buffer_size = 8192,
});
var pool = try logly.ThreadPool.init(allocator, .{
.thread_count = 4,
});
// Use pool for parallel sink writing
var parallel_writer = try logly.ParallelSinkWriter.init(allocator, .{});
// Connect components...Best Practices
1. Choose Appropriate Thread Count
// CPU-bound: Use CPU core count
.thread_count = std.Thread.getCpuCount() catch 4
// I/O-bound: Use 2x CPU cores
.thread_count = (std.Thread.getCpuCount() catch 4) * 22. Size Queues Appropriately
// For bursty workloads, use larger queues
.queue_size = 4096
// For steady workloads, smaller is fine
.queue_size = 2563. Handle Queue Full
pool.submit(task) catch |err| {
if (err == error.QueueFull) {
// Handle backpressure
// - Wait and retry
// - Drop the task
// - Expand queue
}
};4. Graceful Shutdown
// Stop accepting new tasks
pool.stop();
// Or with timeout
pool.stopWithTimeout(5000); // 5 second timeout5. Monitor Performance
Regularly check statistics to identify bottlenecks:
const stats = pool.getStats();
if (stats.tasks_dropped.load(.monotonic) > 0) {
// Queue overflow - increase queue size or threads
}Error Handling
try pool.submit(task) catch |err| {
switch (err) {
error.PoolNotRunning => {
// Pool hasn't started or has stopped
},
error.QueueFull => {
// All queues are full
},
error.OutOfMemory => {
// Memory allocation failed
},
}
};Performance Considerations
- Thread overhead: Each thread has memory overhead (~8KB stack)
- Context switching: Too many threads can cause overhead
- Cache locality: Work stealing may affect cache performance
- Lock contention: Minimize shared state between tasks
Example: Production Setup
const std = @import("std");
const logly = @import("logly");
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
// Production thread pool config
const cpu_count = std.Thread.getCpuCount() catch 4;
var pool = try logly.ThreadPool.init(allocator, .{
.thread_count = cpu_count,
.queue_size = 2048,
.work_stealing = true,
.enable_priorities = true,
.shutdown_timeout_ms = 10000,
});
defer pool.deinit();
// Parallel sink writer
var writer = try logly.ParallelSinkWriter.init(allocator, .{
.max_concurrent = 4,
.retry_on_failure = true,
.max_retries = 3,
});
defer writer.deinit();
try pool.start();
defer pool.stop();
// Application runs...
// Check final stats
const stats = pool.getStats();
std.debug.print("Total processed: {d}\n", .{
stats.tasks_completed.load(.monotonic),
});
}