Skip to content

Background Jobs & Task Scheduling

Netforce provides a comprehensive background job system for handling asynchronous tasks, scheduled operations, and long-running processes. The system supports job queuing, scheduling, retry logic, and monitoring.

Overview

The background job system consists of several key models:

  • bg.task - Background task definitions and scheduling
  • bg.task.log - Task execution logs and results
  • bg.process - Background worker processes
  • Job queues - Priority-based task queuing system

Core Models

1. Background Task (bg.task)

The main model for defining and scheduling background tasks:

class BackgroundTask(Model):
    _name = "bg.task"
    _fields = {
        "name": fields.Char("Task Name", required=True),
        "model": fields.Char("Model Name", required=True),
        "method": fields.Char("Method Name", required=True),
        "args": fields.Text("Arguments (JSON)"),
        "kwargs": fields.Text("Keyword Arguments (JSON)"),
        "user_id": fields.Many2One("base.user", "User"),
        "company_id": fields.Many2One("company", "Company"),
        "date_create": fields.DateTime("Created Date"),
        "date_run": fields.DateTime("Scheduled Run Date"),
        "date_done": fields.DateTime("Completed Date"),
        "state": fields.Selection([
            ["waiting", "Waiting"],
            ["running", "Running"],
            ["done", "Completed"],
            ["error", "Error"],
            ["cancelled", "Cancelled"]
        ], "Status"),
        "priority": fields.Integer("Priority"),
        "repeat": fields.Selection([
            ["once", "Run Once"],
            ["hourly", "Hourly"],
            ["daily", "Daily"], 
            ["weekly", "Weekly"],
            ["monthly", "Monthly"]
        ], "Repeat"),
        "repeat_interval": fields.Integer("Repeat Interval"),
        "max_retries": fields.Integer("Max Retries"),
        "retry_count": fields.Integer("Current Retry Count"),
        "timeout": fields.Integer("Timeout (seconds)"),
        "logs": fields.One2Many("bg.task.log", "task_id", "Execution Logs"),
    }

    _defaults = {
        "state": "waiting",
        "priority": 100,
        "max_retries": 3,
        "timeout": 3600,  # 1 hour
    }

2. Task Execution Log (bg.task.log)

Tracks execution history and results:

class BackgroundTaskLog(Model):
    _name = "bg.task.log"
    _fields = {
        "task_id": fields.Many2One("bg.task", "Task"),
        "date_start": fields.DateTime("Start Time"),
        "date_end": fields.DateTime("End Time"),
        "duration": fields.Decimal("Duration (seconds)"),
        "state": fields.Selection([
            ["success", "Success"],
            ["error", "Error"],
            ["timeout", "Timeout"]
        ], "Result"),
        "result": fields.Text("Result Data"),
        "error_message": fields.Text("Error Message"),
        "traceback": fields.Text("Full Traceback"),
        "pid": fields.Integer("Process ID"),
        "memory_usage": fields.Integer("Memory Usage (MB)"),
    }

Creating Background Tasks

1. Simple One-time Tasks

Create and schedule simple background tasks:

def create_invoice_processing_task(invoice_ids):
    """Create task to process multiple invoices"""
    task_id = get_model("bg.task").create({
        "name": "Process Invoices",
        "model": "account.invoice",
        "method": "process_batch",
        "args": json.dumps(invoice_ids),
        "date_run": time.strftime("%Y-%m-%d %H:%M:%S"),
        "priority": 100,
    })
    return task_id

def create_email_send_task(email_ids, delay_minutes=0):
    """Create task to send emails with optional delay"""
    run_time = datetime.now() + timedelta(minutes=delay_minutes)

    return get_model("bg.task").create({
        "name": "Send Email Batch",
        "model": "email.message",
        "method": "send_batch",
        "args": json.dumps(email_ids),
        "date_run": run_time.strftime("%Y-%m-%d %H:%M:%S"),
        "priority": 50,  # Higher priority for emails
    })

def create_report_generation_task(report_id, params, user_id):
    """Create task to generate large report"""
    return get_model("bg.task").create({
        "name": f"Generate Report {report_id}",
        "model": "report",
        "method": "generate_async",
        "args": json.dumps([report_id]),
        "kwargs": json.dumps({"params": params}),
        "user_id": user_id,
        "timeout": 7200,  # 2 hours for large reports
        "priority": 200,  # Lower priority
    })

2. Recurring Tasks

Create scheduled recurring tasks:

def create_daily_backup_task():
    """Create daily database backup task"""
    return get_model("bg.task").create({
        "name": "Daily Database Backup",
        "model": "database",
        "method": "create_backup",
        "date_run": "2023-01-01 02:00:00",  # Start date
        "repeat": "daily",
        "repeat_interval": 1,
        "priority": 10,  # Very high priority
    })

def create_weekly_cleanup_task():
    """Create weekly cleanup task"""
    return get_model("bg.task").create({
        "name": "Weekly Log Cleanup",
        "model": "log",
        "method": "cleanup_old_logs",
        "args": json.dumps([90]),  # Keep 90 days
        "date_run": "2023-01-01 03:00:00",  # Sunday 3 AM
        "repeat": "weekly",
        "repeat_interval": 1,
        "priority": 300,
    })

def create_monthly_report_task():
    """Create monthly financial report"""
    return get_model("bg.task").create({
        "name": "Monthly Financial Report",
        "model": "report",
        "method": "generate_monthly_financial",
        "date_run": "2023-01-01 01:00:00",  # First day of month
        "repeat": "monthly", 
        "repeat_interval": 1,
        "priority": 150,
    })

3. Conditional Tasks

Create tasks with dependencies and conditions:

def create_conditional_task(condition_model, condition_method, task_args):
    """Create task that runs only if condition is met"""
    return get_model("bg.task").create({
        "name": "Conditional Processing Task",
        "model": "custom.processor",
        "method": "process_if_condition",
        "args": json.dumps({
            "condition_model": condition_model,
            "condition_method": condition_method,
            "task_args": task_args
        }),
        "priority": 150,
    })

def process_if_condition(self, ids, context={}):
    """Execute task only if condition is met"""
    args = json.loads(context.get("args", "{}"))

    # Check condition
    condition_model = get_model(args["condition_model"])
    condition_result = getattr(condition_model, args["condition_method"])()

    if not condition_result:
        return {"result": "skipped", "reason": "condition not met"}

    # Execute actual task
    return self.execute_main_task(args["task_args"])

Task Execution Methods

1. Implementing Task Methods

Create methods that can be executed as background tasks:

class AccountInvoice(Model):
    _name = "account.invoice"

    def process_batch(self, ids, context={}):
        """Process multiple invoices in background"""
        results = []

        for inv_id in ids:
            try:
                invoice = self.browse(inv_id)

                # Process invoice
                if invoice.state == "draft":
                    invoice.validate()
                    invoice.post()

                results.append({
                    "id": inv_id,
                    "status": "success",
                    "number": invoice.number
                })

            except Exception as e:
                results.append({
                    "id": inv_id,
                    "status": "error",
                    "error": str(e)
                })

        return {
            "processed": len(results),
            "success_count": len([r for r in results if r["status"] == "success"]),
            "error_count": len([r for r in results if r["status"] == "error"]),
            "results": results
        }

    def send_reminder_emails(self, ids, context={}):
        """Send reminder emails for overdue invoices"""
        overdue_days = context.get("overdue_days", 30)

        overdue_invoices = self.search_browse([
            ["id", "in", ids],
            ["state", "=", "open"],
            ["due_date", "<", (datetime.now() - timedelta(days=overdue_days)).strftime("%Y-%m-%d")]
        ])

        sent_count = 0
        for invoice in overdue_invoices:
            try:
                self.send_reminder_email(invoice.id)
                sent_count += 1
            except Exception as e:
                print(f"Failed to send reminder for invoice {invoice.number}: {e}")

        return {"sent_count": sent_count, "total": len(overdue_invoices)}

2. Long-running Tasks with Progress

Implement tasks that report progress:

def process_large_dataset(self, ids, context={}):
    """Process large dataset with progress reporting"""
    dataset_size = len(ids)
    processed = 0
    batch_size = 100

    # Update task progress
    task_id = context.get("bg_task_id")

    for i in range(0, dataset_size, batch_size):
        batch_ids = ids[i:i + batch_size]

        # Process batch
        self.process_batch_items(batch_ids)

        processed += len(batch_ids)
        progress = (processed / dataset_size) * 100

        # Update progress
        if task_id:
            get_model("bg.task").browse(task_id).write({
                "progress": progress,
                "status_message": f"Processed {processed}/{dataset_size} items"
            })

        # Yield control to prevent blocking
        time.sleep(0.1)

    return {"processed": processed, "total": dataset_size}

def cleanup_old_data(self, ids, context={}):
    """Cleanup old data with progress tracking"""
    days_to_keep = context.get("days_to_keep", 365)
    cutoff_date = (datetime.now() - timedelta(days=days_to_keep)).strftime("%Y-%m-%d")

    models_to_clean = [
        "log", "bg.task.log", "audit.log", "email.message"
    ]

    total_deleted = 0
    task_id = context.get("bg_task_id")

    for i, model_name in enumerate(models_to_clean):
        model = get_model(model_name)

        # Find old records
        old_ids = model.search([["date_create", "<", cutoff_date]])

        if old_ids:
            # Delete in batches
            batch_size = 1000
            for j in range(0, len(old_ids), batch_size):
                batch = old_ids[j:j + batch_size]
                model.delete(batch)
                total_deleted += len(batch)

                # Update progress
                if task_id:
                    progress = ((i + 1) / len(models_to_clean)) * 100
                    get_model("bg.task").browse(task_id).write({
                        "progress": progress,
                        "status_message": f"Cleaned {model_name}: {total_deleted} records deleted"
                    })

    return {"deleted": total_deleted, "models_cleaned": len(models_to_clean)}

Task Queue Management

1. Priority-based Queuing

Implement priority-based task execution:

def get_next_task():
    """Get next task to execute based on priority"""
    # Get waiting tasks ordered by priority (lower number = higher priority)
    tasks = get_model("bg.task").search_browse([
        ["state", "=", "waiting"],
        ["date_run", "<=", time.strftime("%Y-%m-%d %H:%M:%S")]
    ], order="priority,date_create")

    return tasks[0] if tasks else None

def execute_task(task_id):
    """Execute background task"""
    task = get_model("bg.task").browse(task_id)

    # Mark as running
    task.write({
        "state": "running", 
        "date_start": time.strftime("%Y-%m-%d %H:%M:%S")
    })

    # Create log entry
    log_id = get_model("bg.task.log").create({
        "task_id": task_id,
        "date_start": time.strftime("%Y-%m-%d %H:%M:%S"),
        "pid": os.getpid()
    })

    try:
        # Execute task
        model = get_model(task.model)
        method = getattr(model, task.method)

        # Prepare arguments
        args = json.loads(task.args or "[]")
        kwargs = json.loads(task.kwargs or "{}")
        kwargs["bg_task_id"] = task_id

        # Execute with timeout
        result = execute_with_timeout(method, args, kwargs, task.timeout)

        # Mark as completed
        task.write({
            "state": "done",
            "date_done": time.strftime("%Y-%m-%d %H:%M:%S"),
            "retry_count": 0
        })

        # Update log
        get_model("bg.task.log").browse(log_id).write({
            "date_end": time.strftime("%Y-%m-%d %H:%M:%S"),
            "state": "success",
            "result": json.dumps(result) if result else None
        })

        # Schedule next run if recurring
        if task.repeat != "once":
            schedule_next_run(task_id)

    except Exception as e:
        # Handle error
        handle_task_error(task_id, log_id, e)

def handle_task_error(task_id, log_id, error):
    """Handle task execution error"""
    task = get_model("bg.task").browse(task_id)

    # Update log with error
    get_model("bg.task.log").browse(log_id).write({
        "date_end": time.strftime("%Y-%m-%d %H:%M:%S"),
        "state": "error",
        "error_message": str(error),
        "traceback": traceback.format_exc()
    })

    # Retry if not exceeded max retries
    if task.retry_count < task.max_retries:
        # Schedule retry with exponential backoff
        retry_delay = (2 ** task.retry_count) * 60  # 1, 2, 4 minutes
        next_run = datetime.now() + timedelta(seconds=retry_delay)

        task.write({
            "state": "waiting",
            "retry_count": task.retry_count + 1,
            "date_run": next_run.strftime("%Y-%m-%d %H:%M:%S")
        })
    else:
        # Max retries exceeded
        task.write({"state": "error"})

2. Worker Processes

Implement background worker processes:

class BackgroundWorker:
    def __init__(self, worker_id):
        self.worker_id = worker_id
        self.running = False
        self.current_task = None

    def start(self):
        """Start background worker"""
        self.running = True

        # Register worker process
        process_id = get_model("bg.process").create({
            "worker_id": self.worker_id,
            "pid": os.getpid(),
            "date_start": time.strftime("%Y-%m-%d %H:%M:%S"),
            "state": "running"
        })

        print(f"Background worker {self.worker_id} started (PID: {os.getpid()})")

        try:
            while self.running:
                # Get next task
                task = get_next_task()

                if task:
                    self.current_task = task.id
                    print(f"Worker {self.worker_id} executing task: {task.name}")

                    # Execute task
                    execute_task(task.id)

                    self.current_task = None
                else:
                    # No tasks available, sleep
                    time.sleep(5)

        except KeyboardInterrupt:
            print(f"Worker {self.worker_id} shutting down...")
        finally:
            # Clean up
            get_model("bg.process").browse(process_id).write({
                "date_end": time.strftime("%Y-%m-%d %H:%M:%S"),
                "state": "stopped"
            })

    def stop(self):
        """Stop background worker"""
        self.running = False

        # Cancel current task if any
        if self.current_task:
            get_model("bg.task").browse(self.current_task).write({
                "state": "cancelled"
            })

# Usage
def start_background_workers(num_workers=4):
    """Start multiple background workers"""
    workers = []

    for i in range(num_workers):
        worker = BackgroundWorker(f"worker_{i}")
        process = multiprocessing.Process(target=worker.start)
        process.start()
        workers.append((worker, process))

    return workers

Advanced Features

1. Job Dependencies

Implement task dependencies:

def create_dependent_tasks():
    """Create tasks with dependencies"""
    # Parent task
    parent_task = get_model("bg.task").create({
        "name": "Process Orders",
        "model": "sale.order", 
        "method": "process_batch",
        "args": json.dumps([1, 2, 3, 4, 5])
    })

    # Dependent tasks
    invoice_task = get_model("bg.task").create({
        "name": "Generate Invoices",
        "model": "account.invoice",
        "method": "generate_from_orders", 
        "depends_on": parent_task,
        "state": "depends",  # Wait for dependency
    })

    email_task = get_model("bg.task").create({
        "name": "Send Order Confirmations",
        "model": "email.template",
        "method": "send_order_confirmations",
        "depends_on": parent_task,
        "state": "depends",
    })

    return parent_task, [invoice_task, email_task]

def check_dependencies():
    """Check and activate dependent tasks"""
    dependent_tasks = get_model("bg.task").search_browse([
        ["state", "=", "depends"]
    ])

    for task in dependent_tasks:
        if task.depends_on.state == "done":
            # Dependency completed, activate task
            task.write({
                "state": "waiting",
                "date_run": time.strftime("%Y-%m-%d %H:%M:%S")
            })
        elif task.depends_on.state == "error":
            # Dependency failed, cancel task
            task.write({"state": "cancelled"})

2. Task Chaining

Create chains of sequential tasks:

def create_task_chain(chain_config):
    """Create chain of sequential tasks"""
    previous_task = None
    tasks = []

    for step in chain_config:
        task_id = get_model("bg.task").create({
            "name": step["name"],
            "model": step["model"],
            "method": step["method"],
            "args": json.dumps(step.get("args", [])),
            "kwargs": json.dumps(step.get("kwargs", {})),
            "state": "depends" if previous_task else "waiting",
            "depends_on": previous_task,
            "priority": step.get("priority", 100)
        })

        tasks.append(task_id)
        previous_task = task_id

    return tasks

# Example usage
chain = create_task_chain([
    {
        "name": "Extract Data",
        "model": "data.extractor",
        "method": "extract_from_source",
        "args": ["source_id"]
    },
    {
        "name": "Transform Data", 
        "model": "data.transformer",
        "method": "transform_dataset",
        "depends_on_previous": True
    },
    {
        "name": "Load Data",
        "model": "data.loader", 
        "method": "load_to_target",
        "depends_on_previous": True
    }
])

3. Task Monitoring

Implement comprehensive monitoring:

def create_monitoring_dashboard():
    """Create task monitoring dashboard"""
    dashboard_data = {
        "active_tasks": get_active_task_count(),
        "failed_tasks": get_failed_task_count(),
        "avg_execution_time": get_avg_execution_time(),
        "worker_status": get_worker_status(),
        "queue_size": get_queue_size(),
        "recent_errors": get_recent_errors()
    }

    return dashboard_data

def get_task_statistics(days=7):
    """Get task execution statistics"""
    cutoff_date = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")

    stats = database.get("""
        SELECT 
            DATE(t.date_create) as date,
            COUNT(*) as total_tasks,
            SUM(CASE WHEN t.state = 'done' THEN 1 ELSE 0 END) as completed,
            SUM(CASE WHEN t.state = 'error' THEN 1 ELSE 0 END) as failed,
            AVG(l.duration) as avg_duration
        FROM bg_task t
        LEFT JOIN bg_task_log l ON t.id = l.task_id
        WHERE t.date_create >= %s
        GROUP BY DATE(t.date_create)
        ORDER BY date
    """, [cutoff_date])

    return stats

def send_monitoring_alerts():
    """Send alerts for task monitoring"""
    # Check for high failure rate
    failure_rate = get_failure_rate()
    if failure_rate > 0.1:  # > 10% failure rate
        send_alert(f"High task failure rate: {failure_rate:.1%}")

    # Check for stuck tasks
    stuck_tasks = get_stuck_tasks()
    if stuck_tasks:
        send_alert(f"Found {len(stuck_tasks)} stuck tasks")

    # Check worker health
    unhealthy_workers = get_unhealthy_workers()
    if unhealthy_workers:
        send_alert(f"Unhealthy workers: {unhealthy_workers}")

def send_alert(message):
    """Send monitoring alert"""
    get_model("email.message").create({
        "to": "admin@company.com",
        "subject": "Background Task Alert",
        "body": f"<p>{message}</p><p>Time: {datetime.now()}</p>",
        "type": "out"
    })

Best Practices

1. Error Recovery

Implement robust error recovery:

def create_resilient_task(task_config):
    """Create task with comprehensive error handling"""
    return get_model("bg.task").create({
        **task_config,
        "max_retries": 5,
        "retry_strategy": "exponential_backoff",
        "timeout": 3600,
        "error_handling": "graceful",
        "notification_on_failure": True
    })

def graceful_error_handling(self, task_id, error):
    """Handle errors gracefully"""
    task = get_model("bg.task").browse(task_id)

    # Categorize error
    error_type = classify_error(error)

    if error_type == "temporary":
        # Temporary error - retry with backoff
        schedule_retry(task_id, delay_seconds=300)
    elif error_type == "resource":
        # Resource constraint - retry later
        schedule_retry(task_id, delay_seconds=1800)
    elif error_type == "data":
        # Data issue - notify and require manual intervention
        notify_admin(task_id, error)
        task.write({"state": "requires_attention"})
    else:
        # Permanent error - mark as failed
        task.write({"state": "error"})

2. Performance Optimization

Optimize background task performance:

def optimize_task_execution():
    """Optimize task execution performance"""

    # Use connection pooling
    setup_connection_pool()

    # Implement task batching
    batch_small_tasks()

    # Use memory-efficient processing
    enable_streaming_processing()

    # Monitor resource usage
    setup_resource_monitoring()

def batch_small_tasks():
    """Combine small tasks into batches"""
    small_tasks = get_model("bg.task").search_browse([
        ["state", "=", "waiting"],
        ["model", "=", "email.message"],
        ["method", "=", "send_single"]
    ])

    if len(small_tasks) > 10:
        # Create batch task
        email_ids = [json.loads(task.args)[0] for task in small_tasks]

        get_model("bg.task").create({
            "name": "Send Email Batch",
            "model": "email.message",
            "method": "send_batch",
            "args": json.dumps(email_ids),
            "priority": 50
        })

        # Cancel individual tasks
        for task in small_tasks:
            task.write({"state": "cancelled"})

3. Resource Management

Manage system resources effectively:

def monitor_system_resources():
    """Monitor and manage system resources"""
    import psutil

    # Check memory usage
    memory = psutil.virtual_memory()
    if memory.percent > 80:
        # Pause low-priority tasks
        pause_low_priority_tasks()

    # Check CPU usage
    cpu_usage = psutil.cpu_percent(interval=1)
    if cpu_usage > 90:
        # Throttle task execution
        throttle_task_execution()

    # Check disk space
    disk = psutil.disk_usage('/')
    if disk.percent > 90:
        # Clean up temporary files
        cleanup_temp_files()

def auto_scale_workers():
    """Automatically scale worker processes"""
    queue_size = get_queue_size()
    active_workers = get_active_worker_count()

    if queue_size > active_workers * 5:
        # Scale up workers
        scale_workers(active_workers + 2)
    elif queue_size < active_workers * 2 and active_workers > 2:
        # Scale down workers
        scale_workers(active_workers - 1)

Next Steps