Skip to content

Advanced Patterns & Best Practices

This guide covers advanced development patterns, architectural best practices, and sophisticated techniques for building robust Netforce applications. These patterns help create maintainable, scalable, and performant systems.

Architecture Patterns

1. Domain-Driven Design

Structure your models around business domains:

# Domain: Sales
class SalesDomain:
    """Sales domain with related models"""

    def __init__(self):
        self.order_service = SalesOrderService()
        self.pricing_service = PricingService()
        self.commission_service = CommissionService()

    def create_order(self, customer_data, line_items, context={}):
        """Create sales order with business logic"""
        # Validate customer
        customer = self.validate_customer(customer_data)

        # Calculate pricing
        prices = self.pricing_service.calculate_prices(line_items, customer)

        # Create order
        order_id = self.order_service.create_order({
            "customer_id": customer.id,
            "lines": self.build_order_lines(line_items, prices)
        })

        # Calculate commissions
        self.commission_service.calculate_commissions(order_id)

        return order_id

    def validate_customer(self, customer_data):
        """Domain-specific customer validation"""
        customer = get_model("contact").browse(customer_data["customer_id"])

        if not customer.is_customer:
            raise ValidationError("Contact is not a customer")

        if customer.credit_limit_exceeded:
            raise ValidationError("Customer credit limit exceeded")

        return customer

class SalesOrderService:
    """Service for sales order operations"""

    def create_order(self, order_data):
        """Create sales order with validation"""
        # Business rule validation
        self.validate_order_data(order_data)

        # Create order
        order_id = get_model("sale.order").create(order_data)

        # Trigger workflows
        self.trigger_order_workflows(order_id)

        return order_id

    def validate_order_data(self, order_data):
        """Validate order data against business rules"""
        # Check minimum order amount
        total = sum(line["unit_price"] * line["qty"] for line in order_data["lines"])
        if total < 100:
            raise ValidationError("Order must be at least $100")

        # Check product availability
        for line in order_data["lines"]:
            product = get_model("product").browse(line["product_id"])
            if not product.active:
                raise ValidationError(f"Product {product.name} is not active")

class PricingService:
    """Service for price calculations"""

    def calculate_prices(self, line_items, customer):
        """Calculate prices with customer-specific rules"""
        prices = []

        for item in line_items:
            base_price = self.get_base_price(item["product_id"])
            discounted_price = self.apply_customer_discount(base_price, customer)
            final_price = self.apply_volume_discount(discounted_price, item["qty"])

            prices.append({
                "product_id": item["product_id"],
                "base_price": base_price,
                "unit_price": final_price,
                "qty": item["qty"]
            })

        return prices

2. Repository Pattern

Encapsulate data access logic:

class BaseRepository:
    """Base repository with common operations"""

    def __init__(self, model_name):
        self.model_name = model_name
        self.model = get_model(model_name)

    def find_by_id(self, record_id):
        """Find record by ID"""
        return self.model.browse(record_id)

    def find_by_field(self, field_name, value):
        """Find records by field value"""
        ids = self.model.search([[field_name, "=", value]])
        return self.model.browse(ids)

    def create(self, data):
        """Create new record"""
        return self.model.create(data)

    def update(self, record_id, data):
        """Update existing record"""
        return self.model.write([record_id], data)

    def delete(self, record_id):
        """Delete record"""
        return self.model.delete([record_id])

class CustomerRepository(BaseRepository):
    """Repository for customer operations"""

    def __init__(self):
        super().__init__("contact")

    def find_active_customers(self):
        """Find all active customers"""
        ids = self.model.search([
            ["is_customer", "=", True],
            ["active", "=", True]
        ])
        return self.model.browse(ids)

    def find_by_credit_status(self, status):
        """Find customers by credit status"""
        ids = self.model.search([
            ["is_customer", "=", True],
            ["credit_status", "=", status]
        ])
        return self.model.browse(ids)

    def get_top_customers(self, limit=10):
        """Get top customers by sales volume"""
        ids = self.model.search([
            ["is_customer", "=", True]
        ], order="total_sales desc", limit=limit)
        return self.model.browse(ids)

    def update_credit_limit(self, customer_id, new_limit):
        """Update customer credit limit with validation"""
        customer = self.find_by_id(customer_id)

        if new_limit < 0:
            raise ValueError("Credit limit cannot be negative")

        # Log credit limit change
        self.log_credit_change(customer_id, customer.credit_limit, new_limit)

        return self.update(customer_id, {"credit_limit": new_limit})

    def log_credit_change(self, customer_id, old_limit, new_limit):
        """Log credit limit changes"""
        get_model("customer.credit.log").create({
            "customer_id": customer_id,
            "old_limit": old_limit,
            "new_limit": new_limit,
            "date": time.strftime("%Y-%m-%d %H:%M:%S"),
            "user_id": get_active_user()
        })

# Usage
class SalesService:
    def __init__(self):
        self.customer_repo = CustomerRepository()
        self.order_repo = SalesOrderRepository()

    def create_order_for_customer(self, customer_code, order_data):
        """Create order using repository pattern"""
        # Find customer
        customers = self.customer_repo.find_by_field("code", customer_code)
        if not customers:
            raise ValueError(f"Customer not found: {customer_code}")

        customer = customers[0]

        # Create order
        order_data["customer_id"] = customer.id
        return self.order_repo.create(order_data)

3. Factory Pattern

Create objects with complex initialization:

class DocumentFactory:
    """Factory for creating various document types"""

    @staticmethod
    def create_invoice(invoice_type, customer_id, **kwargs):
        """Create invoice based on type"""
        factories = {
            "standard": StandardInvoiceFactory,
            "recurring": RecurringInvoiceFactory,
            "prepaid": PrepaidInvoiceFactory,
            "credit_note": CreditNoteFactory
        }

        factory_class = factories.get(invoice_type)
        if not factory_class:
            raise ValueError(f"Unknown invoice type: {invoice_type}")

        return factory_class.create(customer_id, **kwargs)

class StandardInvoiceFactory:
    """Factory for standard invoices"""

    @staticmethod
    def create(customer_id, lines=None, **kwargs):
        """Create standard invoice"""
        invoice_data = {
            "type": "out",
            "contact_id": customer_id,
            "date_invoice": time.strftime("%Y-%m-%d"),
            "payment_term": StandardInvoiceFactory.get_default_payment_term(customer_id)
        }

        # Add optional data
        invoice_data.update(kwargs)

        # Add lines if provided
        if lines:
            invoice_data["lines"] = StandardInvoiceFactory.prepare_lines(lines)

        return get_model("account.invoice").create(invoice_data)

    @staticmethod
    def get_default_payment_term(customer_id):
        """Get default payment term for customer"""
        customer = get_model("contact").browse(customer_id)
        return customer.payment_term_id.id if customer.payment_term_id else None

    @staticmethod
    def prepare_lines(lines):
        """Prepare invoice lines with defaults"""
        prepared_lines = []

        for line in lines:
            line_data = {
                "product_id": line["product_id"],
                "qty": line.get("qty", 1),
                "unit_price": line.get("unit_price", 0)
            }

            # Set default unit price from product
            if line_data["unit_price"] == 0:
                product = get_model("product").browse(line["product_id"])
                line_data["unit_price"] = product.sale_price

            prepared_lines.append(line_data)

        return prepared_lines

class RecurringInvoiceFactory:
    """Factory for recurring invoices"""

    @staticmethod
    def create(customer_id, template_id, **kwargs):
        """Create recurring invoice from template"""
        template = get_model("invoice.template").browse(template_id)

        invoice_data = {
            "type": "out",
            "contact_id": customer_id,
            "date_invoice": time.strftime("%Y-%m-%d"),
            "recurring": True,
            "recurring_interval": template.recurring_interval,
            "recurring_rule": template.recurring_rule,
            "lines": RecurringInvoiceFactory.prepare_template_lines(template)
        }

        invoice_data.update(kwargs)

        invoice_id = get_model("account.invoice").create(invoice_data)

        # Schedule next invoice
        RecurringInvoiceFactory.schedule_next_invoice(invoice_id, template)

        return invoice_id

    @staticmethod
    def prepare_template_lines(template):
        """Prepare lines from template"""
        lines = []
        for template_line in template.lines:
            lines.append({
                "product_id": template_line.product_id.id,
                "qty": template_line.qty,
                "unit_price": template_line.unit_price,
                "description": template_line.description
            })
        return lines

    @staticmethod
    def schedule_next_invoice(invoice_id, template):
        """Schedule next recurring invoice"""
        next_date = calculate_next_date(
            datetime.now(),
            template.recurring_interval,
            template.recurring_rule
        )

        get_model("bg.task").create({
            "name": f"Generate Recurring Invoice {invoice_id}",
            "model": "account.invoice",
            "method": "generate_recurring",
            "args": json.dumps([invoice_id]),
            "date_run": next_date.strftime("%Y-%m-%d %H:%M:%S")
        })

4. Observer Pattern

Implement event-driven architecture:

class EventManager:
    """Central event manager"""

    def __init__(self):
        self.listeners = {}

    def register(self, event_name, listener):
        """Register event listener"""
        if event_name not in self.listeners:
            self.listeners[event_name] = []
        self.listeners[event_name].append(listener)

    def trigger(self, event_name, data=None):
        """Trigger event and notify all listeners"""
        if event_name in self.listeners:
            for listener in self.listeners[event_name]:
                try:
                    listener(data)
                except Exception as e:
                    print(f"Error in event listener: {e}")

# Global event manager
event_manager = EventManager()

class InvoiceEventListener:
    """Listener for invoice events"""

    @staticmethod
    def on_invoice_created(data):
        """Handle invoice created event"""
        invoice_id = data["invoice_id"]
        invoice = get_model("account.invoice").browse(invoice_id)

        # Send notification email
        InvoiceEventListener.send_creation_notification(invoice)

        # Update customer statistics
        InvoiceEventListener.update_customer_stats(invoice.contact_id.id)

    @staticmethod
    def on_invoice_paid(data):
        """Handle invoice paid event"""
        invoice_id = data["invoice_id"]

        # Update customer credit status
        invoice = get_model("account.invoice").browse(invoice_id)
        InvoiceEventListener.update_credit_status(invoice.contact_id.id)

        # Process commission
        InvoiceEventListener.process_commission(invoice_id)

    @staticmethod
    def send_creation_notification(invoice):
        """Send invoice creation notification"""
        if invoice.contact_id.email:
            get_model("email.template").send("invoice_created", {
                "invoice": invoice,
                "customer": invoice.contact_id
            })

    @staticmethod
    def update_customer_stats(customer_id):
        """Update customer statistics"""
        # Recalculate total sales
        total_sales = database.get("""
            SELECT SUM(amount_total) 
            FROM account_invoice 
            WHERE contact_id = %s AND state = 'paid'
        """, [customer_id])[0][0] or 0

        get_model("contact").write([customer_id], {
            "total_sales": total_sales
        })

# Register listeners
event_manager.register("invoice.created", InvoiceEventListener.on_invoice_created)
event_manager.register("invoice.paid", InvoiceEventListener.on_invoice_paid)

# Enhanced model with events
class Invoice(Model):
    _name = "account.invoice"

    def create(self, vals, context={}):
        """Create invoice with event triggering"""
        invoice_id = super().create(vals, context)

        # Trigger event
        event_manager.trigger("invoice.created", {
            "invoice_id": invoice_id,
            "data": vals
        })

        return invoice_id

    def pay(self, ids, context={}):
        """Pay invoice with event triggering"""
        result = super().pay(ids, context)

        for invoice_id in ids:
            event_manager.trigger("invoice.paid", {
                "invoice_id": invoice_id
            })

        return result

Performance Patterns

1. Caching Strategies

Implement intelligent caching:

class SmartCache:
    """Smart caching with invalidation"""

    def __init__(self):
        self.cache = {}
        self.dependencies = {}  # Cache key -> [dependent models]
        self.model_dependencies = {}  # Model -> [cache keys]

    def get(self, key):
        """Get cached value"""
        if key in self.cache:
            return self.cache[key]
        return None

    def set(self, key, value, dependencies=None):
        """Set cached value with dependencies"""
        self.cache[key] = value

        if dependencies:
            self.dependencies[key] = dependencies
            for model in dependencies:
                if model not in self.model_dependencies:
                    self.model_dependencies[model] = []
                if key not in self.model_dependencies[model]:
                    self.model_dependencies[model].append(key)

    def invalidate(self, model_name):
        """Invalidate cache for model"""
        if model_name in self.model_dependencies:
            keys_to_remove = self.model_dependencies[model_name][:]
            for key in keys_to_remove:
                self.remove(key)

    def remove(self, key):
        """Remove cached value"""
        if key in self.cache:
            del self.cache[key]

        if key in self.dependencies:
            del self.dependencies[key]

        # Remove from model dependencies
        for model, keys in self.model_dependencies.items():
            if key in keys:
                keys.remove(key)

# Global cache instance
cache = SmartCache()

class CachedCustomerService:
    """Customer service with caching"""

    def get_customer_info(self, customer_id):
        """Get customer info with caching"""
        cache_key = f"customer_info_{customer_id}"

        # Try cache first
        cached_data = cache.get(cache_key)
        if cached_data:
            return cached_data

        # Load from database
        customer = get_model("contact").browse(customer_id)
        customer_info = {
            "id": customer.id,
            "name": customer.name,
            "credit_limit": customer.credit_limit,
            "total_sales": customer.total_sales,
            "payment_terms": customer.payment_term_id.name if customer.payment_term_id else None
        }

        # Cache with dependencies
        cache.set(cache_key, customer_info, dependencies=["contact"])

        return customer_info

    def get_customer_orders(self, customer_id):
        """Get customer orders with caching"""
        cache_key = f"customer_orders_{customer_id}"

        cached_orders = cache.get(cache_key)
        if cached_orders:
            return cached_orders

        # Load orders
        order_ids = get_model("sale.order").search([
            ["contact_id", "=", customer_id]
        ])

        orders = []
        for order in get_model("sale.order").browse(order_ids):
            orders.append({
                "id": order.id,
                "number": order.number,
                "date": order.date,
                "amount": order.amount_total,
                "state": order.state
            })

        # Cache with dependencies
        cache.set(cache_key, orders, dependencies=["sale.order", "contact"])

        return orders

# Automatic cache invalidation
class Contact(Model):
    _name = "contact"

    def write(self, ids, vals, context={}):
        """Write with cache invalidation"""
        result = super().write(ids, vals, context)

        # Invalidate cache
        cache.invalidate("contact")

        return result

2. Lazy Loading

Implement lazy loading for performance:

class LazyLoader:
    """Lazy loading utility"""

    def __init__(self, model, record_id, field_name):
        self.model = model
        self.record_id = record_id
        self.field_name = field_name
        self._loaded = False
        self._value = None

    def __call__(self):
        """Load value when called"""
        if not self._loaded:
            record = get_model(self.model).browse(self.record_id)
            self._value = getattr(record, self.field_name)
            self._loaded = True
        return self._value

class LazyModel:
    """Model wrapper with lazy loading"""

    def __init__(self, model_name, record_id):
        self.model_name = model_name
        self.record_id = record_id
        self._loaded_fields = {}
        self._lazy_fields = {}

    def __getattr__(self, name):
        """Get field value with lazy loading"""
        if name in self._loaded_fields:
            return self._loaded_fields[name]

        # Check if it's a lazy field
        model = get_model(self.model_name)
        field_def = model._fields.get(name)

        if field_def and self._should_lazy_load(field_def):
            if name not in self._lazy_fields:
                self._lazy_fields[name] = LazyLoader(self.model_name, self.record_id, name)
            return self._lazy_fields[name]()

        # Load field normally
        record = get_model(self.model_name).browse(self.record_id)
        value = getattr(record, name)
        self._loaded_fields[name] = value
        return value

    def _should_lazy_load(self, field_def):
        """Determine if field should be lazy loaded"""
        # Lazy load large text fields, binary fields, and related fields
        return (field_def.get("type") in ["text", "binary", "one2many"] or
                field_def.get("function") is not None)

# Usage
def get_customer_lazy(customer_id):
    """Get customer with lazy loading"""
    return LazyModel("contact", customer_id)

# Example usage
customer = get_customer_lazy(123)
print(customer.name)  # Loads only name field
print(customer.description)  # Lazy loads description when accessed

3. Batch Processing

Implement efficient batch operations:

class BatchProcessor:
    """Utility for batch processing"""

    def __init__(self, batch_size=1000):
        self.batch_size = batch_size

    def process_records(self, model_name, record_ids, process_func, **kwargs):
        """Process records in batches"""
        results = []

        for i in range(0, len(record_ids), self.batch_size):
            batch_ids = record_ids[i:i + self.batch_size]

            try:
                batch_results = self.process_batch(
                    model_name, batch_ids, process_func, **kwargs
                )
                results.extend(batch_results)

                # Progress callback
                if kwargs.get("progress_callback"):
                    progress = min(i + self.batch_size, len(record_ids))
                    kwargs["progress_callback"](progress, len(record_ids))

            except Exception as e:
                # Handle batch errors
                error_results = self.handle_batch_error(
                    model_name, batch_ids, e, **kwargs
                )
                results.extend(error_results)

        return results

    def process_batch(self, model_name, batch_ids, process_func, **kwargs):
        """Process single batch"""
        model = get_model(model_name)
        records = model.browse(batch_ids)

        batch_results = []
        for record in records:
            try:
                result = process_func(record, **kwargs)
                batch_results.append({
                    "id": record.id,
                    "status": "success",
                    "result": result
                })
            except Exception as e:
                batch_results.append({
                    "id": record.id,
                    "status": "error",
                    "error": str(e)
                })

        return batch_results

    def handle_batch_error(self, model_name, batch_ids, error, **kwargs):
        """Handle batch processing error"""
        # Log error
        print(f"Batch error processing {model_name}: {error}")

        # Return error results for all records in batch
        return [{
            "id": record_id,
            "status": "error",
            "error": str(error)
        } for record_id in batch_ids]

class InvoiceBatchProcessor:
    """Batch processor for invoices"""

    def __init__(self):
        self.processor = BatchProcessor(batch_size=500)

    def validate_invoices(self, invoice_ids, progress_callback=None):
        """Validate invoices in batches"""
        def validate_invoice(invoice):
            """Validate single invoice"""
            if invoice.state != "draft":
                raise ValueError("Invoice not in draft state")

            invoice.validate()
            return {"number": invoice.number, "total": invoice.amount_total}

        return self.processor.process_records(
            "account.invoice",
            invoice_ids,
            validate_invoice,
            progress_callback=progress_callback
        )

    def send_invoice_emails(self, invoice_ids):
        """Send invoice emails in batches"""
        def send_email(invoice):
            """Send email for single invoice"""
            if not invoice.contact_id.email:
                raise ValueError("Customer has no email address")

            get_model("email.template").send("invoice_notification", {
                "invoice": invoice,
                "customer": invoice.contact_id
            })

            return {"email": invoice.contact_id.email}

        return self.processor.process_records(
            "account.invoice",
            invoice_ids,
            send_email
        )

# Usage
processor = InvoiceBatchProcessor()

def progress_callback(current, total):
    print(f"Processed {current}/{total} invoices ({current/total*100:.1f}%)")

# Validate large batch of invoices
invoice_ids = get_model("account.invoice").search([["state", "=", "draft"]])
results = processor.validate_invoices(invoice_ids, progress_callback=progress_callback)

# Check results
success_count = len([r for r in results if r["status"] == "success"])
error_count = len([r for r in results if r["status"] == "error"])
print(f"Validation completed: {success_count} success, {error_count} errors")

Data Patterns

1. Data Transfer Objects

Use DTOs for clean data handling:

from dataclasses import dataclass
from typing import Optional, List

@dataclass
class CustomerDTO:
    """Data transfer object for customer data"""
    id: Optional[int] = None
    name: str = ""
    email: Optional[str] = None
    phone: Optional[str] = None
    credit_limit: float = 0.0
    is_active: bool = True

    @classmethod
    def from_model(cls, customer_record):
        """Create DTO from model record"""
        return cls(
            id=customer_record.id,
            name=customer_record.name,
            email=customer_record.email,
            phone=customer_record.phone,
            credit_limit=customer_record.credit_limit,
            is_active=customer_record.active
        )

    def to_dict(self):
        """Convert DTO to dictionary"""
        return {
            "id": self.id,
            "name": self.name,
            "email": self.email,
            "phone": self.phone,
            "credit_limit": self.credit_limit,
            "is_active": self.is_active
        }

    def validate(self):
        """Validate DTO data"""
        errors = []

        if not self.name.strip():
            errors.append("Name is required")

        if self.email and not self._is_valid_email(self.email):
            errors.append("Invalid email format")

        if self.credit_limit < 0:
            errors.append("Credit limit cannot be negative")

        return errors

    def _is_valid_email(self, email):
        """Validate email format"""
        import re
        pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        return re.match(pattern, email) is not None

@dataclass
class OrderLineDTO:
    """DTO for order line data"""
    product_id: int
    quantity: float
    unit_price: float
    description: str = ""

    def calculate_total(self):
        """Calculate line total"""
        return self.quantity * self.unit_price

@dataclass
class OrderDTO:
    """DTO for order data"""
    id: Optional[int] = None
    customer_id: int = 0
    date: str = ""
    reference: str = ""
    lines: List[OrderLineDTO] = None

    def __post_init__(self):
        if self.lines is None:
            self.lines = []

    def calculate_total(self):
        """Calculate order total"""
        return sum(line.calculate_total() for line in self.lines)

    def validate(self):
        """Validate order data"""
        errors = []

        if not self.customer_id:
            errors.append("Customer is required")

        if not self.lines:
            errors.append("Order must have at least one line")

        for i, line in enumerate(self.lines):
            line_errors = line.validate() if hasattr(line, 'validate') else []
            errors.extend([f"Line {i+1}: {error}" for error in line_errors])

        return errors

# Service using DTOs
class OrderService:
    """Service using DTOs for clean data handling"""

    def create_order(self, order_dto: OrderDTO):
        """Create order from DTO"""
        # Validate DTO
        errors = order_dto.validate()
        if errors:
            raise ValueError(f"Validation errors: {', '.join(errors)}")

        # Convert to model data
        order_data = {
            "contact_id": order_dto.customer_id,
            "date": order_dto.date,
            "reference": order_dto.reference,
            "lines": []
        }

        for line_dto in order_dto.lines:
            order_data["lines"].append({
                "product_id": line_dto.product_id,
                "qty": line_dto.quantity,
                "unit_price": line_dto.unit_price,
                "description": line_dto.description
            })

        # Create order
        order_id = get_model("sale.order").create(order_data)

        # Return updated DTO with ID
        order_dto.id = order_id
        return order_dto

    def get_order(self, order_id) -> OrderDTO:
        """Get order as DTO"""
        order = get_model("sale.order").browse(order_id)

        # Convert lines
        line_dtos = []
        for line in order.lines:
            line_dtos.append(OrderLineDTO(
                product_id=line.product_id.id,
                quantity=line.qty,
                unit_price=line.unit_price,
                description=line.description or ""
            ))

        return OrderDTO(
            id=order.id,
            customer_id=order.contact_id.id,
            date=order.date,
            reference=order.reference or "",
            lines=line_dtos
        )

2. Specification Pattern

Implement business rules as specifications:

class Specification:
    """Base specification class"""

    def is_satisfied_by(self, obj):
        """Check if object satisfies specification"""
        raise NotImplementedError

    def and_(self, other):
        """Combine with AND logic"""
        return AndSpecification(self, other)

    def or_(self, other):
        """Combine with OR logic"""
        return OrSpecification(self, other)

    def not_(self):
        """Negate specification"""
        return NotSpecification(self)

class AndSpecification(Specification):
    def __init__(self, spec1, spec2):
        self.spec1 = spec1
        self.spec2 = spec2

    def is_satisfied_by(self, obj):
        return (self.spec1.is_satisfied_by(obj) and 
                self.spec2.is_satisfied_by(obj))

class OrSpecification(Specification):
    def __init__(self, spec1, spec2):
        self.spec1 = spec1
        self.spec2 = spec2

    def is_satisfied_by(self, obj):
        return (self.spec1.is_satisfied_by(obj) or 
                self.spec2.is_satisfied_by(obj))

class NotSpecification(Specification):
    def __init__(self, spec):
        self.spec = spec

    def is_satisfied_by(self, obj):
        return not self.spec.is_satisfied_by(obj)

# Business rule specifications
class CustomerSpecifications:
    """Customer-related specifications"""

    class IsActive(Specification):
        def is_satisfied_by(self, customer):
            return customer.active

    class HasGoodCredit(Specification):
        def is_satisfied_by(self, customer):
            return customer.credit_rating >= 3

    class HasCreditLimit(Specification):
        def __init__(self, minimum_limit=0):
            self.minimum_limit = minimum_limit

        def is_satisfied_by(self, customer):
            return customer.credit_limit >= self.minimum_limit

    class IsNotOverdue(Specification):
        def is_satisfied_by(self, customer):
            overdue_invoices = get_model("account.invoice").search_count([
                ["contact_id", "=", customer.id],
                ["state", "=", "open"],
                ["due_date", "<", time.strftime("%Y-%m-%d")]
            ])
            return overdue_invoices == 0

class OrderSpecifications:
    """Order-related specifications"""

    class HasMinimumAmount(Specification):
        def __init__(self, minimum_amount):
            self.minimum_amount = minimum_amount

        def is_satisfied_by(self, order):
            return order.amount_total >= self.minimum_amount

    class RequiresApproval(Specification):
        def __init__(self, approval_threshold=10000):
            self.approval_threshold = approval_threshold

        def is_satisfied_by(self, order):
            return order.amount_total > self.approval_threshold

# Usage in business logic
class OrderApprovalService:
    """Service for order approval logic"""

    def __init__(self):
        # Define approval specification
        self.approval_spec = (
            CustomerSpecifications.IsActive().and_(
                CustomerSpecifications.HasGoodCredit()
            ).and_(
                CustomerSpecifications.IsNotOverdue()
            ).and_(
                OrderSpecifications.HasMinimumAmount(100)
            )
        )

        self.auto_approval_spec = (
            self.approval_spec.and_(
                OrderSpecifications.RequiresApproval(5000).not_()
            )
        )

    def can_approve_order(self, order):
        """Check if order can be approved"""
        customer = order.contact_id
        return self.approval_spec.is_satisfied_by(order) and \
               self.approval_spec.is_satisfied_by(customer)

    def can_auto_approve(self, order):
        """Check if order can be auto-approved"""
        customer = order.contact_id
        return (self.auto_approval_spec.is_satisfied_by(order) and
                self.auto_approval_spec.is_satisfied_by(customer))

    def process_order(self, order_id):
        """Process order based on specifications"""
        order = get_model("sale.order").browse(order_id)

        if not self.can_approve_order(order):
            raise Exception("Order does not meet approval criteria")

        if self.can_auto_approve(order):
            # Auto approve
            order.confirm()
            return {"status": "auto_approved"}
        else:
            # Requires manual approval
            self.submit_for_approval(order_id)
            return {"status": "pending_approval"}

    def submit_for_approval(self, order_id):
        """Submit order for manual approval"""
        get_model("approval").create({
            "related_id": f"sale.order,{order_id}",
            "user_id": self.get_approver(order_id),
            "state": "pending"
        })

Integration Patterns

1. Adapter Pattern

Integrate with external systems:

class PaymentGatewayAdapter:
    """Base adapter for payment gateways"""

    def process_payment(self, payment_data):
        """Process payment"""
        raise NotImplementedError

    def refund_payment(self, payment_id, amount):
        """Refund payment"""
        raise NotImplementedError

    def get_payment_status(self, payment_id):
        """Get payment status"""
        raise NotImplementedError

class StripeAdapter(PaymentGatewayAdapter):
    """Stripe payment gateway adapter"""

    def __init__(self, api_key):
        import stripe
        stripe.api_key = api_key
        self.stripe = stripe

    def process_payment(self, payment_data):
        """Process payment through Stripe"""
        try:
            charge = self.stripe.Charge.create(
                amount=int(payment_data["amount"] * 100),  # Convert to cents
                currency=payment_data.get("currency", "usd"),
                source=payment_data["token"],
                description=payment_data.get("description", "")
            )

            return {
                "success": True,
                "transaction_id": charge.id,
                "status": charge.status,
                "amount": charge.amount / 100
            }

        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }

    def refund_payment(self, transaction_id, amount=None):
        """Refund Stripe payment"""
        try:
            refund_data = {"charge": transaction_id}
            if amount:
                refund_data["amount"] = int(amount * 100)

            refund = self.stripe.Refund.create(**refund_data)

            return {
                "success": True,
                "refund_id": refund.id,
                "amount": refund.amount / 100,
                "status": refund.status
            }

        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }

class PayPalAdapter(PaymentGatewayAdapter):
    """PayPal payment gateway adapter"""

    def __init__(self, client_id, client_secret, sandbox=True):
        import paypalrestsdk

        self.paypal = paypalrestsdk
        self.paypal.configure({
            "mode": "sandbox" if sandbox else "live",
            "client_id": client_id,
            "client_secret": client_secret
        })

    def process_payment(self, payment_data):
        """Process payment through PayPal"""
        try:
            payment = self.paypal.Payment({
                "intent": "sale",
                "payer": {"payment_method": "paypal"},
                "transactions": [{
                    "amount": {
                        "total": str(payment_data["amount"]),
                        "currency": payment_data.get("currency", "USD")
                    },
                    "description": payment_data.get("description", "")
                }],
                "redirect_urls": {
                    "return_url": payment_data.get("return_url", ""),
                    "cancel_url": payment_data.get("cancel_url", "")
                }
            })

            if payment.create():
                return {
                    "success": True,
                    "payment_id": payment.id,
                    "approval_url": next(link.href for link in payment.links if link.rel == "approval_url")
                }
            else:
                return {
                    "success": False,
                    "error": payment.error
                }

        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }

# Payment service using adapters
class PaymentService:
    """Payment service using adapter pattern"""

    def __init__(self):
        self.gateways = {
            "stripe": StripeAdapter("sk_test_..."),
            "paypal": PayPalAdapter("client_id", "client_secret", sandbox=True)
        }

    def process_payment(self, invoice_id, gateway_name, payment_data):
        """Process payment using specified gateway"""
        if gateway_name not in self.gateways:
            raise ValueError(f"Unknown payment gateway: {gateway_name}")

        gateway = self.gateways[gateway_name]

        # Add invoice information
        invoice = get_model("account.invoice").browse(invoice_id)
        payment_data.update({
            "amount": invoice.amount_total,
            "description": f"Payment for invoice {invoice.number}"
        })

        # Process payment
        result = gateway.process_payment(payment_data)

        # Record payment if successful
        if result["success"]:
            self.record_payment(invoice_id, result, gateway_name)

        return result

    def record_payment(self, invoice_id, payment_result, gateway_name):
        """Record successful payment"""
        get_model("account.payment").create({
            "invoice_id": invoice_id,
            "amount": payment_result["amount"],
            "date": time.strftime("%Y-%m-%d"),
            "method": gateway_name,
            "transaction_id": payment_result.get("transaction_id"),
            "state": "posted"
        })

        # Update invoice status
        invoice = get_model("account.invoice").browse(invoice_id)
        if invoice.amount_due <= 0:
            invoice.write({"state": "paid"})

Error Handling Patterns

1. Custom Exception Hierarchy

Create structured exception handling:

class NetforceException(Exception):
    """Base exception for Netforce"""

    def __init__(self, message, error_code=None, details=None):
        super().__init__(message)
        self.error_code = error_code
        self.details = details or {}

    def to_dict(self):
        """Convert exception to dictionary"""
        return {
            "error": self.__class__.__name__,
            "message": str(self),
            "error_code": self.error_code,
            "details": self.details
        }

class ValidationException(NetforceException):
    """Exception for validation errors"""

    def __init__(self, field_errors=None, **kwargs):
        self.field_errors = field_errors or {}
        message = "Validation failed"
        if field_errors:
            message += f": {', '.join(field_errors.keys())}"
        super().__init__(message, **kwargs)

    def to_dict(self):
        """Include field errors in dictionary"""
        result = super().to_dict()
        result["field_errors"] = self.field_errors
        return result

class BusinessRuleException(NetforceException):
    """Exception for business rule violations"""
    pass

class DataIntegrityException(NetforceException):
    """Exception for data integrity violations"""
    pass

class PermissionException(NetforceException):
    """Exception for permission violations"""
    pass

class ExternalServiceException(NetforceException):
    """Exception for external service errors"""

    def __init__(self, service_name, status_code=None, **kwargs):
        self.service_name = service_name
        self.status_code = status_code
        super().__init__(f"External service error: {service_name}", **kwargs)

# Usage in models and services
class InvoiceService:
    """Invoice service with structured error handling"""

    def create_invoice(self, invoice_data):
        """Create invoice with validation"""
        try:
            # Validate data
            self.validate_invoice_data(invoice_data)

            # Check business rules
            self.check_business_rules(invoice_data)

            # Create invoice
            return get_model("account.invoice").create(invoice_data)

        except ValidationException:
            raise  # Re-raise validation exceptions
        except BusinessRuleException:
            raise  # Re-raise business rule exceptions
        except Exception as e:
            # Wrap unexpected exceptions
            raise NetforceException(
                "Failed to create invoice",
                error_code="INVOICE_CREATE_FAILED",
                details={"original_error": str(e)}
            )

    def validate_invoice_data(self, invoice_data):
        """Validate invoice data"""
        field_errors = {}

        if not invoice_data.get("contact_id"):
            field_errors["contact_id"] = "Customer is required"

        if not invoice_data.get("lines"):
            field_errors["lines"] = "Invoice must have at least one line"

        if not invoice_data.get("date_invoice"):
            field_errors["date_invoice"] = "Invoice date is required"

        if field_errors:
            raise ValidationException(
                field_errors=field_errors,
                error_code="INVALID_INVOICE_DATA"
            )

    def check_business_rules(self, invoice_data):
        """Check business rules"""
        customer_id = invoice_data.get("contact_id")
        if customer_id:
            customer = get_model("contact").browse(customer_id)

            # Check credit limit
            if customer.credit_limit_exceeded:
                raise BusinessRuleException(
                    "Customer credit limit exceeded",
                    error_code="CREDIT_LIMIT_EXCEEDED",
                    details={
                        "customer_id": customer_id,
                        "credit_limit": customer.credit_limit,
                        "current_balance": customer.balance
                    }
                )

            # Check if customer is active
            if not customer.active:
                raise BusinessRuleException(
                    "Cannot create invoice for inactive customer",
                    error_code="INACTIVE_CUSTOMER",
                    details={"customer_id": customer_id}
                )

# Error handler decorator
def handle_errors(func):
    """Decorator for consistent error handling"""
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except NetforceException:
            raise  # Re-raise Netforce exceptions
        except Exception as e:
            # Log unexpected errors
            import traceback
            print(f"Unexpected error in {func.__name__}: {e}")
            print(traceback.format_exc())

            # Convert to Netforce exception
            raise NetforceException(
                f"Unexpected error in {func.__name__}",
                error_code="UNEXPECTED_ERROR",
                details={"original_error": str(e)}
            )
    return wrapper

# Usage with decorator
class CustomerService:
    @handle_errors
    def update_credit_limit(self, customer_id, new_limit):
        """Update customer credit limit"""
        if new_limit < 0:
            raise ValidationException(
                field_errors={"credit_limit": "Credit limit cannot be negative"}
            )

        customer = get_model("contact").browse(customer_id)
        customer.write({"credit_limit": new_limit})

        return customer

Next Steps

  • Learn about Testing for testing these advanced patterns
  • Explore Security for securing complex applications
  • Check Performance for optimizing advanced patterns
  • Review Multi-Company for multi-tenant pattern implementations