Background Jobs & Task Scheduling¶
Netforce provides a comprehensive background job system for handling asynchronous tasks, scheduled operations, and long-running processes. The system supports job queuing, scheduling, retry logic, and monitoring.
Overview¶
The background job system consists of several key models:
bg.task- Background task definitions and schedulingbg.task.log- Task execution logs and resultsbg.process- Background worker processes- Job queues - Priority-based task queuing system
Core Models¶
1. Background Task (bg.task)¶
The main model for defining and scheduling background tasks:
class BackgroundTask(Model):
_name = "bg.task"
_fields = {
"name": fields.Char("Task Name", required=True),
"model": fields.Char("Model Name", required=True),
"method": fields.Char("Method Name", required=True),
"args": fields.Text("Arguments (JSON)"),
"kwargs": fields.Text("Keyword Arguments (JSON)"),
"user_id": fields.Many2One("base.user", "User"),
"company_id": fields.Many2One("company", "Company"),
"date_create": fields.DateTime("Created Date"),
"date_run": fields.DateTime("Scheduled Run Date"),
"date_done": fields.DateTime("Completed Date"),
"state": fields.Selection([
["waiting", "Waiting"],
["running", "Running"],
["done", "Completed"],
["error", "Error"],
["cancelled", "Cancelled"]
], "Status"),
"priority": fields.Integer("Priority"),
"repeat": fields.Selection([
["once", "Run Once"],
["hourly", "Hourly"],
["daily", "Daily"],
["weekly", "Weekly"],
["monthly", "Monthly"]
], "Repeat"),
"repeat_interval": fields.Integer("Repeat Interval"),
"max_retries": fields.Integer("Max Retries"),
"retry_count": fields.Integer("Current Retry Count"),
"timeout": fields.Integer("Timeout (seconds)"),
"logs": fields.One2Many("bg.task.log", "task_id", "Execution Logs"),
}
_defaults = {
"state": "waiting",
"priority": 100,
"max_retries": 3,
"timeout": 3600, # 1 hour
}
2. Task Execution Log (bg.task.log)¶
Tracks execution history and results:
class BackgroundTaskLog(Model):
_name = "bg.task.log"
_fields = {
"task_id": fields.Many2One("bg.task", "Task"),
"date_start": fields.DateTime("Start Time"),
"date_end": fields.DateTime("End Time"),
"duration": fields.Decimal("Duration (seconds)"),
"state": fields.Selection([
["success", "Success"],
["error", "Error"],
["timeout", "Timeout"]
], "Result"),
"result": fields.Text("Result Data"),
"error_message": fields.Text("Error Message"),
"traceback": fields.Text("Full Traceback"),
"pid": fields.Integer("Process ID"),
"memory_usage": fields.Integer("Memory Usage (MB)"),
}
Creating Background Tasks¶
1. Simple One-time Tasks¶
Create and schedule simple background tasks:
def create_invoice_processing_task(invoice_ids):
"""Create task to process multiple invoices"""
task_id = get_model("bg.task").create({
"name": "Process Invoices",
"model": "account.invoice",
"method": "process_batch",
"args": json.dumps(invoice_ids),
"date_run": time.strftime("%Y-%m-%d %H:%M:%S"),
"priority": 100,
})
return task_id
def create_email_send_task(email_ids, delay_minutes=0):
"""Create task to send emails with optional delay"""
run_time = datetime.now() + timedelta(minutes=delay_minutes)
return get_model("bg.task").create({
"name": "Send Email Batch",
"model": "email.message",
"method": "send_batch",
"args": json.dumps(email_ids),
"date_run": run_time.strftime("%Y-%m-%d %H:%M:%S"),
"priority": 50, # Higher priority for emails
})
def create_report_generation_task(report_id, params, user_id):
"""Create task to generate large report"""
return get_model("bg.task").create({
"name": f"Generate Report {report_id}",
"model": "report",
"method": "generate_async",
"args": json.dumps([report_id]),
"kwargs": json.dumps({"params": params}),
"user_id": user_id,
"timeout": 7200, # 2 hours for large reports
"priority": 200, # Lower priority
})
2. Recurring Tasks¶
Create scheduled recurring tasks:
def create_daily_backup_task():
"""Create daily database backup task"""
return get_model("bg.task").create({
"name": "Daily Database Backup",
"model": "database",
"method": "create_backup",
"date_run": "2023-01-01 02:00:00", # Start date
"repeat": "daily",
"repeat_interval": 1,
"priority": 10, # Very high priority
})
def create_weekly_cleanup_task():
"""Create weekly cleanup task"""
return get_model("bg.task").create({
"name": "Weekly Log Cleanup",
"model": "log",
"method": "cleanup_old_logs",
"args": json.dumps([90]), # Keep 90 days
"date_run": "2023-01-01 03:00:00", # Sunday 3 AM
"repeat": "weekly",
"repeat_interval": 1,
"priority": 300,
})
def create_monthly_report_task():
"""Create monthly financial report"""
return get_model("bg.task").create({
"name": "Monthly Financial Report",
"model": "report",
"method": "generate_monthly_financial",
"date_run": "2023-01-01 01:00:00", # First day of month
"repeat": "monthly",
"repeat_interval": 1,
"priority": 150,
})
3. Conditional Tasks¶
Create tasks with dependencies and conditions:
def create_conditional_task(condition_model, condition_method, task_args):
"""Create task that runs only if condition is met"""
return get_model("bg.task").create({
"name": "Conditional Processing Task",
"model": "custom.processor",
"method": "process_if_condition",
"args": json.dumps({
"condition_model": condition_model,
"condition_method": condition_method,
"task_args": task_args
}),
"priority": 150,
})
def process_if_condition(self, ids, context={}):
"""Execute task only if condition is met"""
args = json.loads(context.get("args", "{}"))
# Check condition
condition_model = get_model(args["condition_model"])
condition_result = getattr(condition_model, args["condition_method"])()
if not condition_result:
return {"result": "skipped", "reason": "condition not met"}
# Execute actual task
return self.execute_main_task(args["task_args"])
Task Execution Methods¶
1. Implementing Task Methods¶
Create methods that can be executed as background tasks:
class AccountInvoice(Model):
_name = "account.invoice"
def process_batch(self, ids, context={}):
"""Process multiple invoices in background"""
results = []
for inv_id in ids:
try:
invoice = self.browse(inv_id)
# Process invoice
if invoice.state == "draft":
invoice.validate()
invoice.post()
results.append({
"id": inv_id,
"status": "success",
"number": invoice.number
})
except Exception as e:
results.append({
"id": inv_id,
"status": "error",
"error": str(e)
})
return {
"processed": len(results),
"success_count": len([r for r in results if r["status"] == "success"]),
"error_count": len([r for r in results if r["status"] == "error"]),
"results": results
}
def send_reminder_emails(self, ids, context={}):
"""Send reminder emails for overdue invoices"""
overdue_days = context.get("overdue_days", 30)
overdue_invoices = self.search_browse([
["id", "in", ids],
["state", "=", "open"],
["due_date", "<", (datetime.now() - timedelta(days=overdue_days)).strftime("%Y-%m-%d")]
])
sent_count = 0
for invoice in overdue_invoices:
try:
self.send_reminder_email(invoice.id)
sent_count += 1
except Exception as e:
print(f"Failed to send reminder for invoice {invoice.number}: {e}")
return {"sent_count": sent_count, "total": len(overdue_invoices)}
2. Long-running Tasks with Progress¶
Implement tasks that report progress:
def process_large_dataset(self, ids, context={}):
"""Process large dataset with progress reporting"""
dataset_size = len(ids)
processed = 0
batch_size = 100
# Update task progress
task_id = context.get("bg_task_id")
for i in range(0, dataset_size, batch_size):
batch_ids = ids[i:i + batch_size]
# Process batch
self.process_batch_items(batch_ids)
processed += len(batch_ids)
progress = (processed / dataset_size) * 100
# Update progress
if task_id:
get_model("bg.task").browse(task_id).write({
"progress": progress,
"status_message": f"Processed {processed}/{dataset_size} items"
})
# Yield control to prevent blocking
time.sleep(0.1)
return {"processed": processed, "total": dataset_size}
def cleanup_old_data(self, ids, context={}):
"""Cleanup old data with progress tracking"""
days_to_keep = context.get("days_to_keep", 365)
cutoff_date = (datetime.now() - timedelta(days=days_to_keep)).strftime("%Y-%m-%d")
models_to_clean = [
"log", "bg.task.log", "audit.log", "email.message"
]
total_deleted = 0
task_id = context.get("bg_task_id")
for i, model_name in enumerate(models_to_clean):
model = get_model(model_name)
# Find old records
old_ids = model.search([["date_create", "<", cutoff_date]])
if old_ids:
# Delete in batches
batch_size = 1000
for j in range(0, len(old_ids), batch_size):
batch = old_ids[j:j + batch_size]
model.delete(batch)
total_deleted += len(batch)
# Update progress
if task_id:
progress = ((i + 1) / len(models_to_clean)) * 100
get_model("bg.task").browse(task_id).write({
"progress": progress,
"status_message": f"Cleaned {model_name}: {total_deleted} records deleted"
})
return {"deleted": total_deleted, "models_cleaned": len(models_to_clean)}
Task Queue Management¶
1. Priority-based Queuing¶
Implement priority-based task execution:
def get_next_task():
"""Get next task to execute based on priority"""
# Get waiting tasks ordered by priority (lower number = higher priority)
tasks = get_model("bg.task").search_browse([
["state", "=", "waiting"],
["date_run", "<=", time.strftime("%Y-%m-%d %H:%M:%S")]
], order="priority,date_create")
return tasks[0] if tasks else None
def execute_task(task_id):
"""Execute background task"""
task = get_model("bg.task").browse(task_id)
# Mark as running
task.write({
"state": "running",
"date_start": time.strftime("%Y-%m-%d %H:%M:%S")
})
# Create log entry
log_id = get_model("bg.task.log").create({
"task_id": task_id,
"date_start": time.strftime("%Y-%m-%d %H:%M:%S"),
"pid": os.getpid()
})
try:
# Execute task
model = get_model(task.model)
method = getattr(model, task.method)
# Prepare arguments
args = json.loads(task.args or "[]")
kwargs = json.loads(task.kwargs or "{}")
kwargs["bg_task_id"] = task_id
# Execute with timeout
result = execute_with_timeout(method, args, kwargs, task.timeout)
# Mark as completed
task.write({
"state": "done",
"date_done": time.strftime("%Y-%m-%d %H:%M:%S"),
"retry_count": 0
})
# Update log
get_model("bg.task.log").browse(log_id).write({
"date_end": time.strftime("%Y-%m-%d %H:%M:%S"),
"state": "success",
"result": json.dumps(result) if result else None
})
# Schedule next run if recurring
if task.repeat != "once":
schedule_next_run(task_id)
except Exception as e:
# Handle error
handle_task_error(task_id, log_id, e)
def handle_task_error(task_id, log_id, error):
"""Handle task execution error"""
task = get_model("bg.task").browse(task_id)
# Update log with error
get_model("bg.task.log").browse(log_id).write({
"date_end": time.strftime("%Y-%m-%d %H:%M:%S"),
"state": "error",
"error_message": str(error),
"traceback": traceback.format_exc()
})
# Retry if not exceeded max retries
if task.retry_count < task.max_retries:
# Schedule retry with exponential backoff
retry_delay = (2 ** task.retry_count) * 60 # 1, 2, 4 minutes
next_run = datetime.now() + timedelta(seconds=retry_delay)
task.write({
"state": "waiting",
"retry_count": task.retry_count + 1,
"date_run": next_run.strftime("%Y-%m-%d %H:%M:%S")
})
else:
# Max retries exceeded
task.write({"state": "error"})
2. Worker Processes¶
Implement background worker processes:
class BackgroundWorker:
def __init__(self, worker_id):
self.worker_id = worker_id
self.running = False
self.current_task = None
def start(self):
"""Start background worker"""
self.running = True
# Register worker process
process_id = get_model("bg.process").create({
"worker_id": self.worker_id,
"pid": os.getpid(),
"date_start": time.strftime("%Y-%m-%d %H:%M:%S"),
"state": "running"
})
print(f"Background worker {self.worker_id} started (PID: {os.getpid()})")
try:
while self.running:
# Get next task
task = get_next_task()
if task:
self.current_task = task.id
print(f"Worker {self.worker_id} executing task: {task.name}")
# Execute task
execute_task(task.id)
self.current_task = None
else:
# No tasks available, sleep
time.sleep(5)
except KeyboardInterrupt:
print(f"Worker {self.worker_id} shutting down...")
finally:
# Clean up
get_model("bg.process").browse(process_id).write({
"date_end": time.strftime("%Y-%m-%d %H:%M:%S"),
"state": "stopped"
})
def stop(self):
"""Stop background worker"""
self.running = False
# Cancel current task if any
if self.current_task:
get_model("bg.task").browse(self.current_task).write({
"state": "cancelled"
})
# Usage
def start_background_workers(num_workers=4):
"""Start multiple background workers"""
workers = []
for i in range(num_workers):
worker = BackgroundWorker(f"worker_{i}")
process = multiprocessing.Process(target=worker.start)
process.start()
workers.append((worker, process))
return workers
Advanced Features¶
1. Job Dependencies¶
Implement task dependencies:
def create_dependent_tasks():
"""Create tasks with dependencies"""
# Parent task
parent_task = get_model("bg.task").create({
"name": "Process Orders",
"model": "sale.order",
"method": "process_batch",
"args": json.dumps([1, 2, 3, 4, 5])
})
# Dependent tasks
invoice_task = get_model("bg.task").create({
"name": "Generate Invoices",
"model": "account.invoice",
"method": "generate_from_orders",
"depends_on": parent_task,
"state": "depends", # Wait for dependency
})
email_task = get_model("bg.task").create({
"name": "Send Order Confirmations",
"model": "email.template",
"method": "send_order_confirmations",
"depends_on": parent_task,
"state": "depends",
})
return parent_task, [invoice_task, email_task]
def check_dependencies():
"""Check and activate dependent tasks"""
dependent_tasks = get_model("bg.task").search_browse([
["state", "=", "depends"]
])
for task in dependent_tasks:
if task.depends_on.state == "done":
# Dependency completed, activate task
task.write({
"state": "waiting",
"date_run": time.strftime("%Y-%m-%d %H:%M:%S")
})
elif task.depends_on.state == "error":
# Dependency failed, cancel task
task.write({"state": "cancelled"})
2. Task Chaining¶
Create chains of sequential tasks:
def create_task_chain(chain_config):
"""Create chain of sequential tasks"""
previous_task = None
tasks = []
for step in chain_config:
task_id = get_model("bg.task").create({
"name": step["name"],
"model": step["model"],
"method": step["method"],
"args": json.dumps(step.get("args", [])),
"kwargs": json.dumps(step.get("kwargs", {})),
"state": "depends" if previous_task else "waiting",
"depends_on": previous_task,
"priority": step.get("priority", 100)
})
tasks.append(task_id)
previous_task = task_id
return tasks
# Example usage
chain = create_task_chain([
{
"name": "Extract Data",
"model": "data.extractor",
"method": "extract_from_source",
"args": ["source_id"]
},
{
"name": "Transform Data",
"model": "data.transformer",
"method": "transform_dataset",
"depends_on_previous": True
},
{
"name": "Load Data",
"model": "data.loader",
"method": "load_to_target",
"depends_on_previous": True
}
])
3. Task Monitoring¶
Implement comprehensive monitoring:
def create_monitoring_dashboard():
"""Create task monitoring dashboard"""
dashboard_data = {
"active_tasks": get_active_task_count(),
"failed_tasks": get_failed_task_count(),
"avg_execution_time": get_avg_execution_time(),
"worker_status": get_worker_status(),
"queue_size": get_queue_size(),
"recent_errors": get_recent_errors()
}
return dashboard_data
def get_task_statistics(days=7):
"""Get task execution statistics"""
cutoff_date = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
stats = database.get("""
SELECT
DATE(t.date_create) as date,
COUNT(*) as total_tasks,
SUM(CASE WHEN t.state = 'done' THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN t.state = 'error' THEN 1 ELSE 0 END) as failed,
AVG(l.duration) as avg_duration
FROM bg_task t
LEFT JOIN bg_task_log l ON t.id = l.task_id
WHERE t.date_create >= %s
GROUP BY DATE(t.date_create)
ORDER BY date
""", [cutoff_date])
return stats
def send_monitoring_alerts():
"""Send alerts for task monitoring"""
# Check for high failure rate
failure_rate = get_failure_rate()
if failure_rate > 0.1: # > 10% failure rate
send_alert(f"High task failure rate: {failure_rate:.1%}")
# Check for stuck tasks
stuck_tasks = get_stuck_tasks()
if stuck_tasks:
send_alert(f"Found {len(stuck_tasks)} stuck tasks")
# Check worker health
unhealthy_workers = get_unhealthy_workers()
if unhealthy_workers:
send_alert(f"Unhealthy workers: {unhealthy_workers}")
def send_alert(message):
"""Send monitoring alert"""
get_model("email.message").create({
"to": "admin@company.com",
"subject": "Background Task Alert",
"body": f"<p>{message}</p><p>Time: {datetime.now()}</p>",
"type": "out"
})
Best Practices¶
1. Error Recovery¶
Implement robust error recovery:
def create_resilient_task(task_config):
"""Create task with comprehensive error handling"""
return get_model("bg.task").create({
**task_config,
"max_retries": 5,
"retry_strategy": "exponential_backoff",
"timeout": 3600,
"error_handling": "graceful",
"notification_on_failure": True
})
def graceful_error_handling(self, task_id, error):
"""Handle errors gracefully"""
task = get_model("bg.task").browse(task_id)
# Categorize error
error_type = classify_error(error)
if error_type == "temporary":
# Temporary error - retry with backoff
schedule_retry(task_id, delay_seconds=300)
elif error_type == "resource":
# Resource constraint - retry later
schedule_retry(task_id, delay_seconds=1800)
elif error_type == "data":
# Data issue - notify and require manual intervention
notify_admin(task_id, error)
task.write({"state": "requires_attention"})
else:
# Permanent error - mark as failed
task.write({"state": "error"})
2. Performance Optimization¶
Optimize background task performance:
def optimize_task_execution():
"""Optimize task execution performance"""
# Use connection pooling
setup_connection_pool()
# Implement task batching
batch_small_tasks()
# Use memory-efficient processing
enable_streaming_processing()
# Monitor resource usage
setup_resource_monitoring()
def batch_small_tasks():
"""Combine small tasks into batches"""
small_tasks = get_model("bg.task").search_browse([
["state", "=", "waiting"],
["model", "=", "email.message"],
["method", "=", "send_single"]
])
if len(small_tasks) > 10:
# Create batch task
email_ids = [json.loads(task.args)[0] for task in small_tasks]
get_model("bg.task").create({
"name": "Send Email Batch",
"model": "email.message",
"method": "send_batch",
"args": json.dumps(email_ids),
"priority": 50
})
# Cancel individual tasks
for task in small_tasks:
task.write({"state": "cancelled"})
3. Resource Management¶
Manage system resources effectively:
def monitor_system_resources():
"""Monitor and manage system resources"""
import psutil
# Check memory usage
memory = psutil.virtual_memory()
if memory.percent > 80:
# Pause low-priority tasks
pause_low_priority_tasks()
# Check CPU usage
cpu_usage = psutil.cpu_percent(interval=1)
if cpu_usage > 90:
# Throttle task execution
throttle_task_execution()
# Check disk space
disk = psutil.disk_usage('/')
if disk.percent > 90:
# Clean up temporary files
cleanup_temp_files()
def auto_scale_workers():
"""Automatically scale worker processes"""
queue_size = get_queue_size()
active_workers = get_active_worker_count()
if queue_size > active_workers * 5:
# Scale up workers
scale_workers(active_workers + 2)
elif queue_size < active_workers * 2 and active_workers > 2:
# Scale down workers
scale_workers(active_workers - 1)
Next Steps¶
- Learn about Reports for scheduled report generation
- Explore Security for task access control and security
- Check Multi-Company for company-specific background tasks
- Review Advanced Patterns for complex job orchestration