Callbacks¶
Methods for managing log event handlers.
Overview¶
Logly supports callbacks that are invoked for each log event. Callbacks enable:
- ✅ Custom Backends: Send logs to external services (Slack, PagerDuty, Sentry)
- ✅ Metrics: Track log counts, error rates
- ✅ Alerting: Trigger alerts on ERROR/CRITICAL logs
- ✅ Filtering: Custom filtering logic
- ✅ Aggregation: Collect logs for batch processing
Callback Execution: - Callbacks are regular functions (run in background threads) - Callbacks do not block log operations - Callbacks receive log record dictionary - Callbacks can be added or removed dynamically
logger.add_callback()¶
Register a callback function to be invoked for each log event.
Signature¶
Parameters¶
Parameter | Type | Default | Description |
---|---|---|---|
callback | Callable[[dict], None] | required | Function that receives log record dictionary |
Returns¶
int
: Callback ID (use withremove_callback()
)
Log Record Structure¶
The callback receives a dictionary with this structure:
{
"timestamp": "2025-01-15T10:30:45.123Z", # ISO 8601 timestamp
"level": "INFO", # Log level name
"message": "User logged in", # Log message text
"filename": "/path/to/file.py", # Source file path
"lineno": "42", # Line number (as string)
"function": "login_handler", # Function name
# Additional fields from bind(), contextualize(), or kwargs
"user_id": "12345",
"action": "login"
}
Location Information
Every log record includes filename
, lineno
, and function
fields that indicate where the log was called from. This is useful for debugging and tracing log origins.
Examples¶
from logly import logger
import requests
def slack_callback(record: dict):
"""Send ERROR/CRITICAL logs to Slack"""
if record["level"] in ["ERROR", "CRITICAL"]:
requests.post(
"https://hooks.slack.com/services/YOUR/WEBHOOK/URL",
json={
"text": f"🚨 {record['level']}: {record['message']}",
"attachments": [{
"color": "danger",
"fields": [
{"title": k, "value": str(v), "short": True}
for k, v in record.items()
if k not in ["timestamp", "level", "message"]
]
}]
}
)
# Register callback
callback_id = logger.add_callback(slack_callback)
# This will trigger Slack notification
logger.error("Payment failed", order_id=1234, amount=99.99)
from logly import logger
from collections import Counter
log_counts = Counter()
def metrics_callback(record: dict):
"""Count logs by level"""
log_counts[record["level"]] += 1
# Every 100 logs, print summary
if sum(log_counts.values()) % 100 == 0:
print(f"Log counts: {dict(log_counts)}")
logger.add_callback(metrics_callback, name="metrics")
# Log some events
for i in range(150):
logger.info(f"Event {i}")
# Prints: Log counts: {'INFO': 100}
# Prints: Log counts: {'INFO': 150}
from logly import logger
error_count = 0
error_threshold = 10
def alert_callback(record: dict):
"""Alert if too many errors"""
global error_count
if record["level"] in ["ERROR", "CRITICAL"]:
error_count += 1
if error_count >= error_threshold:
send_alert(f"High error rate: {error_count} errors")
error_count = 0 # Reset counter
def send_alert(message: str):
print(f"🚨 ALERT: {message}")
# Send to PagerDuty, email, etc.
logger.add_callback(alert_callback, name="error_alerting")
# Trigger alerts
for i in range(12):
logger.error(f"Error {i}")
# Prints: 🚨 ALERT: High error rate: 10 errors
from logly import logger
def filter_callback(record: dict):
"""Log only errors from specific module"""
if record["level"] == "ERROR":
# Re-log to separate file
with open("critical_errors.log", "a") as f:
f.write(f"{record['timestamp']} | {record['message']}\n")
logger.add_callback(filter_callback, name="critical_filter")
from logly import logger
log_buffer = []
def batch_callback(record: dict):
"""Collect logs and process in batches"""
log_buffer.append(record)
# Process every 50 logs
if len(log_buffer) >= 50:
process_batch(log_buffer.copy())
log_buffer.clear()
def process_batch(records: list):
print(f"Processing batch of {len(records)} logs")
# Send to database, analytics service, etc.
logger.add_callback(batch_callback, name="batch_processor")
Notes¶
When to Use Callbacks
- External Services: Send logs to Slack, PagerDuty, Sentry
- Metrics: Track log volume, error rates
- Alerting: Trigger alerts on specific conditions
- Custom Storage: Write to databases, cloud storage
- Real-time Processing: Stream logs to analytics
Regular Functions
Callbacks can be regular functions (not async):
# ✅ Regular function works fine
def my_callback(record: dict):
print(f"Log: {record['level']} - {record['message']}")
logger.add_callback(my_callback)
Callbacks run in background threads, so they don't block the main application even if they perform synchronous I/O.
Non-Blocking
Callbacks run in the background and do not block logging:
Exception Handling
Exceptions in callbacks are caught and logged:
logger.remove_callback()¶
Unregister a callback function.
Signature¶
Parameters¶
callback_id
(str): Callback ID returned byadd_callback()
Returns¶
None
Examples¶
from logly import logger
async def my_callback(record: dict):
print(f"Log: {record['message']}")
# Add callback
callback_id = logger.add_callback(my_callback, name="printer")
# Log some events
logger.info("Event 1") # Callback invoked
logger.info("Event 2") # Callback invoked
# Remove callback
logger.remove_callback(callback_id)
# Log more events
logger.info("Event 3") # Callback NOT invoked
# Add multiple callbacks
id1 = logger.add_callback(callback1, name="slack")
id2 = logger.add_callback(callback2, name="metrics")
id3 = logger.add_callback(callback3, name="alerting")
# Remove specific callback
logger.remove_callback(id2) # Remove metrics callback
# Other callbacks still active
logger.info("Test") # Triggers slack and alerting callbacks
Notes¶
When to Remove Callbacks
- Cleanup: Remove callbacks at shutdown
- Testing: Remove callbacks between tests
- Dynamic: Remove callbacks based on configuration changes
Invalid ID
Removing a non-existent callback ID is a no-op (does not raise error):
Complete Example¶
from logly import logger
import aiohttp
import asyncio
# Configure
logger.configure(level="INFO", json=True)
logger.add("console")
logger.add("logs/app.log")
# Callback 1: Slack notifications for errors
def slack_callback(record: dict):
if record["level"] in ["ERROR", "CRITICAL"]:
# Note: In real code, use requests or similar for HTTP calls
print(f"Would send to Slack: 🚨 {record['level']}: {record['message']}")
# Callback 2: Metrics collection
log_counts = {"INFO": 0, "ERROR": 0}
def metrics_callback(record: dict):
level = record["level"]
if level in log_counts:
log_counts[level] += 1
# Callback 3: Custom alerting
error_threshold = 5
def alert_callback(record: dict):
if record["level"] == "ERROR":
if log_counts["ERROR"] >= error_threshold:
print(f"🚨 ALERT: {error_threshold} errors detected!")
# Register callbacks
slack_id = logger.add_callback(slack_callback)
metrics_id = logger.add_callback(metrics_callback)
alert_id = logger.add_callback(alert_callback, name="alerting")
# Application code
def main():
logger.info("Application started")
# Simulate errors
for i in range(10):
logger.error(f"Error {i}", error_code=i)
# Print metrics
print(f"Final counts: {log_counts}")
# Cleanup
logger.remove_callback(slack_id)
logger.remove_callback(metrics_id)
logger.remove_callback(alert_id)
logger.complete()
# Run
main()
Output:
2025-01-15 10:30:45 | INFO | Application started
2025-01-15 10:30:45 | ERROR | Error 0 error_code=0
...
2025-01-15 10:30:46 | ERROR | Error 4 error_code=4
🚨 ALERT: 5 errors detected!
...
Final counts: {'INFO': 1, 'ERROR': 10}
Best Practices¶
✅ DO¶
# 1. Keep callbacks simple and fast
def good_callback(record: dict):
process_sync(record)
logger.add_callback(good_callback)
# 2. Handle exceptions in callbacks
def safe_callback(record: dict):
try:
risky_operation(record)
except Exception as e:
print(f"Callback error: {e}")
# 3. Store callback IDs for cleanup
callback_id = logger.add_callback(my_callback)
# Later...
logger.remove_callback(callback_id)
# 4. Use named callbacks for clarity
logger.add_callback(slack_callback, name="slack_alerts")
logger.add_callback(metrics_callback, name="metrics_collector")
❌ DON'T¶
# 1. Don't perform blocking I/O
def blocking_callback(record: dict):
with open("file.log", "a") as f: # ❌ Blocking I/O
f.write(str(record))
# Use async I/O instead:
import asyncio
import aiofiles
async def async_callback(record: dict):
async with aiofiles.open("file.log", "a") as f: # ✅ Async I/O
await f.write(str(record))
# 2. Don't forget to remove callbacks
logger.add_callback(temp_callback)
# ... use logger ...
# ❌ Callback still active (memory leak)
# ✅ Always cleanup:
callback_id = logger.add_callback(temp_callback)
# ... use logger ...
logger.remove_callback(callback_id)
# 3. Don't log inside callbacks (infinite loop)
def bad_callback(record: dict):
logger.info("Processing log") # ❌ Triggers callback again!
Performance¶
Callback Overhead¶
- Minimal: Callbacks run in background
- Non-Blocking: Logging continues immediately
- Concurrent: Multiple callbacks run in parallel
Best Performance¶
# 1. Use batch processing
log_buffer = []
def batch_callback(record: dict):
log_buffer.append(record)
if len(log_buffer) >= 100:
process_batch(log_buffer.copy())
log_buffer.clear()
# 2. Filter early
def filtered_callback(record: dict):
if record["level"] != "ERROR":
return # Skip non-errors
process_error(record)
# 3. Use async I/O when needed
import asyncio
import aiohttp
async def http_callback(record: dict):
async with aiohttp.ClientSession() as session:
await session.post(url, json=record)