Skip to content

Reports & Data Analytics

Netforce provides a comprehensive reporting system that allows you to create custom reports, dashboards, and data analytics. The system supports various output formats, parameterized reports, and real-time data visualization.

Overview

The reporting system consists of several key components:

  • report - Report definitions with SQL queries and parameters
  • report.template - Report templates for different output formats
  • report.data - Generated report data and caching
  • Dashboard widgets - Real-time data visualization components

Core Models

1. Report Model (report)

The report model defines the data source and configuration:

class Report(Model):
    _name = "report"
    _fields = {
        "name": fields.Char("Report Name", required=True),
        "type": fields.Selection([
            ["list", "List Report"],
            ["summary", "Summary Report"],
            ["chart", "Chart"],
            ["dashboard", "Dashboard Widget"]
        ], "Report Type"),
        "model_id": fields.Many2One("model", "Data Model"),
        "sql": fields.Text("SQL Query"),
        "params": fields.One2Many("report.param", "report_id", "Parameters"),
        "template_id": fields.Many2One("report.template", "Template"),
        "company_id": fields.Many2One("company", "Company"),
        "access_groups": fields.Many2Many("user.group", "Access Groups"),
    }

2. Report Parameters (report.param)

Define user input parameters for reports:

class ReportParam(Model):
    _name = "report.param"
    _fields = {
        "report_id": fields.Many2One("report", "Report"),
        "name": fields.Char("Parameter Name", required=True),
        "string": fields.Char("Display Label"),
        "type": fields.Selection([
            ["char", "Text"],
            ["date", "Date"],
            ["datetime", "Date & Time"],
            ["selection", "Selection"],
            ["many2one", "Reference"]
        ], "Type"),
        "required": fields.Boolean("Required"),
        "default_value": fields.Char("Default Value"),
        "sequence": fields.Integer("Sequence"),
    }

Creating Reports

1. Basic List Report

Create a simple list report showing invoice data:

def create_invoice_report():
    """Create invoice list report"""
    report_id = get_model("report").create({
        "name": "Invoice List Report",
        "type": "list",
        "sql": """
            SELECT 
                number,
                contact_id,
                date_invoice,
                amount_total,
                state
            FROM account_invoice 
            WHERE date_invoice >= %(date_from)s 
            AND date_invoice <= %(date_to)s
            ORDER BY date_invoice DESC
        """,
    })

    # Add parameters
    get_model("report.param").create([
        {
            "report_id": report_id,
            "name": "date_from",
            "string": "From Date",
            "type": "date",
            "required": True,
            "sequence": 1
        },
        {
            "report_id": report_id, 
            "name": "date_to",
            "string": "To Date",
            "type": "date",
            "required": True,
            "sequence": 2
        }
    ])

    return report_id

2. Summary Report with Grouping

Create aggregated summary reports:

def create_sales_summary():
    """Create sales summary report"""
    return get_model("report").create({
        "name": "Sales Summary by Month",
        "type": "summary",
        "sql": """
            SELECT 
                DATE_TRUNC('month', date_invoice) as month,
                COUNT(*) as invoice_count,
                SUM(amount_total) as total_sales,
                AVG(amount_total) as avg_sales
            FROM account_invoice 
            WHERE type = 'out'
            AND date_invoice >= %(date_from)s
            AND date_invoice <= %(date_to)s
            AND company_id = %(company_id)s
            GROUP BY DATE_TRUNC('month', date_invoice)
            ORDER BY month
        """
    })

3. Chart Reports

Create visual chart reports:

def create_sales_chart():
    """Create sales trend chart"""
    return get_model("report").create({
        "name": "Sales Trend Chart",
        "type": "chart",
        "sql": """
            SELECT 
                DATE_TRUNC('week', date_invoice) as week,
                SUM(amount_total) as sales
            FROM account_invoice 
            WHERE type = 'out'
            AND date_invoice >= %(date_from)s
            AND date_invoice <= %(date_to)s
            GROUP BY DATE_TRUNC('week', date_invoice)
            ORDER BY week
        """,
        "chart_config": {
            "type": "line",
            "x_field": "week",
            "y_field": "sales",
            "title": "Weekly Sales Trend"
        }
    })

Report Generation

1. Basic Report Generation

Generate reports programmatically:

def generate_report(report_id, params={}, context={}):
    """Generate report with parameters"""
    report = get_model("report").browse(report_id)

    # Validate parameters
    _validate_report_params(report, params)

    # Add company context if needed
    if report.company_id:
        params["company_id"] = report.company_id.id

    # Execute SQL query
    sql = report.sql
    db = get_connection()

    try:
        res = db.query(sql, params)
        data = res.fetchall()
        columns = [desc[0] for desc in res.description]

        # Store generated data
        report_data_id = get_model("report.data").create({
            "report_id": report_id,
            "params": json.dumps(params),
            "data": json.dumps({
                "columns": columns,
                "rows": data
            }),
            "generate_date": time.strftime("%Y-%m-%d %H:%M:%S")
        })

        return report_data_id

    except Exception as e:
        raise Exception(f"Report generation failed: {str(e)}")

def _validate_report_params(report, params):
    """Validate report parameters"""
    for param in report.params:
        if param.required and param.name not in params:
            raise Exception(f"Required parameter missing: {param.string}")

        if param.type == "date" and param.name in params:
            try:
                datetime.strptime(params[param.name], "%Y-%m-%d")
            except ValueError:
                raise Exception(f"Invalid date format: {param.name}")

2. Cached Report Generation

Implement caching for performance:

def get_cached_report(report_id, params={}, cache_minutes=30):
    """Get report with caching"""
    cache_key = _get_cache_key(report_id, params)

    # Check for cached data
    cached_data = get_model("report.data").search_browse([
        ["report_id", "=", report_id],
        ["cache_key", "=", cache_key],
        ["generate_date", ">=", 
         (datetime.now() - timedelta(minutes=cache_minutes)).strftime("%Y-%m-%d %H:%M:%S")]
    ])

    if cached_data:
        return cached_data[0]

    # Generate new report
    return generate_report(report_id, params)

def _get_cache_key(report_id, params):
    """Generate cache key for report"""
    param_str = json.dumps(params, sort_keys=True)
    return hashlib.md5(f"{report_id}_{param_str}".encode()).hexdigest()

Report Templates

1. HTML Templates

Create HTML templates for web display:

def create_html_template(report_id):
    """Create HTML template"""
    return get_model("report.template").create({
        "report_id": report_id,
        "name": "HTML Template",
        "type": "html",
        "template": """
        <html>
        <head>
            <title>{{report.name}}</title>
            <style>
                table { border-collapse: collapse; width: 100%; }
                th, td { border: 1px solid #ddd; padding: 8px; }
                th { background-color: #f2f2f2; }
            </style>
        </head>
        <body>
            <h1>{{report.name}}</h1>
            <p>Generated: {{generate_date}}</p>

            <table>
                <thead>
                    <tr>
                        {% for col in columns %}
                        <th>{{col}}</th>
                        {% endfor %}
                    </tr>
                </thead>
                <tbody>
                    {% for row in rows %}
                    <tr>
                        {% for cell in row %}
                        <td>{{cell}}</td>
                        {% endfor %}
                    </tr>
                    {% endfor %}
                </tbody>
            </table>
        </body>
        </html>
        """
    })

2. PDF Templates

Create PDF templates using reportlab:

def create_pdf_template(report_id):
    """Create PDF template"""
    return get_model("report.template").create({
        "report_id": report_id,
        "name": "PDF Template",
        "type": "pdf",
        "template": """
        import reportlab
        from reportlab.lib.pagesizes import letter, A4
        from reportlab.platypus import SimpleDocTemplate, Table, TableStyle
        from reportlab.lib import colors

        def generate_pdf(data, filename):
            doc = SimpleDocTemplate(filename, pagesize=A4)
            elements = []

            # Create table
            table_data = [data['columns']] + data['rows']
            table = Table(table_data)

            # Style table
            table.setStyle(TableStyle([
                ('BACKGROUND', (0, 0), (-1, 0), colors.grey),
                ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
                ('ALIGN', (0, 0), (-1, -1), 'CENTER'),
                ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
                ('FONTSIZE', (0, 0), (-1, 0), 14),
                ('BOTTOMPADDING', (0, 0), (-1, 0), 12),
                ('BACKGROUND', (0, 1), (-1, -1), colors.beige),
                ('GRID', (0, 0), (-1, -1), 1, colors.black)
            ]))

            elements.append(table)
            doc.build(elements)
            return filename
        """
    })

3. Excel Templates

Create Excel export templates:

def create_excel_template(report_id):
    """Create Excel template"""
    return get_model("report.template").create({
        "report_id": report_id,
        "name": "Excel Template", 
        "type": "excel",
        "template": """
        import openpyxl
        from openpyxl.styles import Font, PatternFill

        def generate_excel(data, filename):
            wb = openpyxl.Workbook()
            ws = wb.active
            ws.title = "Report Data"

            # Add headers
            headers = data['columns']
            for col, header in enumerate(headers, 1):
                cell = ws.cell(row=1, column=col)
                cell.value = header
                cell.font = Font(bold=True)
                cell.fill = PatternFill("solid", fgColor="CCCCCC")

            # Add data rows
            for row_idx, row_data in enumerate(data['rows'], 2):
                for col_idx, cell_value in enumerate(row_data, 1):
                    ws.cell(row=row_idx, column=col_idx, value=cell_value)

            wb.save(filename)
            return filename
        """
    })

Dashboard Widgets

1. KPI Widgets

Create key performance indicator widgets:

def create_kpi_widget(name, sql_query):
    """Create KPI dashboard widget"""
    return get_model("report").create({
        "name": name,
        "type": "dashboard",
        "widget_type": "kpi",
        "sql": sql_query,
        "refresh_interval": 300,  # 5 minutes
        "chart_config": {
            "format": "currency",  # or "number", "percentage"
            "trend": True,  # Show trend arrow
            "color": "primary"
        }
    })

# Example KPI widgets
def setup_sales_kpis():
    """Setup sales KPI widgets"""
    kpis = [
        {
            "name": "Today's Sales",
            "sql": """
                SELECT COALESCE(SUM(amount_total), 0) as value
                FROM account_invoice 
                WHERE type='out' AND date_invoice = CURRENT_DATE
            """
        },
        {
            "name": "Monthly Sales",
            "sql": """
                SELECT COALESCE(SUM(amount_total), 0) as value
                FROM account_invoice 
                WHERE type='out' 
                AND date_invoice >= DATE_TRUNC('month', CURRENT_DATE)
            """
        },
        {
            "name": "Outstanding Invoices",
            "sql": """
                SELECT COUNT(*) as value
                FROM account_invoice 
                WHERE type='out' AND state='open'
            """
        }
    ]

    for kpi in kpis:
        create_kpi_widget(kpi["name"], kpi["sql"])

2. Chart Widgets

Create chart widgets for dashboards:

def create_chart_widget(name, chart_type, sql_query):
    """Create chart dashboard widget"""
    return get_model("report").create({
        "name": name,
        "type": "dashboard",
        "widget_type": "chart",
        "sql": sql_query,
        "chart_config": {
            "type": chart_type,  # bar, line, pie, doughnut
            "responsive": True,
            "legend": {"display": True}
        }
    })

# Example chart widgets
def setup_dashboard_charts():
    """Setup dashboard chart widgets"""

    # Sales by month (bar chart)
    get_model("report").create({
        "name": "Sales by Month",
        "type": "dashboard", 
        "widget_type": "chart",
        "sql": """
            SELECT 
                TO_CHAR(date_invoice, 'YYYY-MM') as month,
                SUM(amount_total) as sales
            FROM account_invoice 
            WHERE type='out' 
            AND date_invoice >= CURRENT_DATE - INTERVAL '12 months'
            GROUP BY TO_CHAR(date_invoice, 'YYYY-MM')
            ORDER BY month
        """,
        "chart_config": {
            "type": "bar",
            "x_field": "month",
            "y_field": "sales"
        }
    })

    # Top customers (pie chart)
    get_model("report").create({
        "name": "Top Customers",
        "type": "dashboard",
        "widget_type": "chart", 
        "sql": """
            SELECT 
                c.name as customer,
                SUM(i.amount_total) as total_sales
            FROM account_invoice i
            JOIN contact c ON i.contact_id = c.id
            WHERE i.type='out' 
            AND i.date_invoice >= CURRENT_DATE - INTERVAL '12 months'
            GROUP BY c.id, c.name
            ORDER BY total_sales DESC
            LIMIT 10
        """,
        "chart_config": {
            "type": "pie",
            "label_field": "customer",
            "value_field": "total_sales"
        }
    })

Advanced Features

1. Drill-down Reports

Create interactive drill-down functionality:

def create_drilldown_report():
    """Create report with drill-down capability"""
    # Master report - Sales by product category
    master_report = get_model("report").create({
        "name": "Sales by Category",
        "type": "summary",
        "sql": """
            SELECT 
                pc.name as category,
                COUNT(sol.id) as line_count,
                SUM(sol.amount) as total_sales,
                'product_sales' as drill_report,
                pc.id as drill_param
            FROM sale_order_line sol
            JOIN product p ON sol.product_id = p.id  
            JOIN product_categ pc ON p.categ_id = pc.id
            WHERE sol.date >= %(date_from)s
            GROUP BY pc.id, pc.name
            ORDER BY total_sales DESC
        """,
        "drill_enabled": True
    })

    # Detail report - Products in category  
    detail_report = get_model("report").create({
        "name": "Product Sales Detail",
        "type": "list",
        "sql": """
            SELECT 
                p.name as product,
                SUM(sol.qty) as qty_sold,
                SUM(sol.amount) as total_sales,
                AVG(sol.unit_price) as avg_price
            FROM sale_order_line sol
            JOIN product p ON sol.product_id = p.id
            WHERE p.categ_id = %(category_id)s
            AND sol.date >= %(date_from)s
            GROUP BY p.id, p.name
            ORDER BY total_sales DESC
        """
    })

    return master_report, detail_report

2. Real-time Reports

Create reports with real-time data updates:

def create_realtime_report(report_id):
    """Enable real-time updates for report"""
    report = get_model("report").browse(report_id)

    # Configure WebSocket updates
    report.write({
        "realtime": True,
        "refresh_interval": 30,  # seconds
        "trigger_models": ["sale.order", "account.invoice"],  # Models to watch
    })

    # Set up trigger function
    def trigger_report_refresh(model_name, record_id, action):
        """Trigger report refresh on model changes"""
        if model_name in report.trigger_models:
            # Send WebSocket message to update report
            get_model("websocket").send_message({
                "type": "report_refresh",
                "report_id": report.id,
                "timestamp": time.time()
            })

    # Register trigger
    get_model("trigger").create({
        "model_id": report.model_id.id,
        "event": "after_write",
        "function": trigger_report_refresh
    })

3. Scheduled Reports

Create scheduled report generation:

def schedule_report(report_id, schedule_config):
    """Schedule automatic report generation"""
    get_model("bg.task").create({
        "name": f"Scheduled Report: {report.name}",
        "model": "report",
        "method": "generate_scheduled",
        "args": json.dumps([report_id]),
        "date_run": schedule_config["start_date"],
        "repeat": schedule_config.get("repeat", "daily"),
        "repeat_interval": schedule_config.get("interval", 1),
        "user_id": get_active_user(),
        "state": "active"
    })

def generate_scheduled(self, ids, context={}):
    """Generate scheduled report and email"""
    for report_id in ids:
        report = self.browse(report_id)

        # Generate report
        params = report.get_default_params()
        report_data_id = generate_report(report_id, params)

        # Email to subscribers
        if report.email_subscribers:
            self.email_report(report_data_id, report.email_subscribers)

def email_report(self, report_data_id, recipients):
    """Email report to recipients"""
    report_data = get_model("report.data").browse(report_data_id)
    report = report_data.report_id

    # Generate PDF attachment
    pdf_file = self.generate_pdf_output(report_data_id)

    # Send email
    get_model("email.message").create({
        "to": ";".join(recipients),
        "subject": f"Scheduled Report: {report.name}",
        "body": f"""
        <p>Please find attached the scheduled report: <strong>{report.name}</strong></p>
        <p>Generated: {report_data.generate_date}</p>
        <p>Parameters: {report_data.params}</p>
        """,
        "attachments": [pdf_file],
        "type": "out"
    })

Performance Optimization

1. Query Optimization

Optimize report queries for performance:

def optimize_report_query(sql):
    """Optimize SQL query for better performance"""
    optimizations = []

    # Add proper indexes
    if "ORDER BY" in sql and "date_invoice" in sql:
        optimizations.append("CREATE INDEX IF NOT EXISTS idx_invoice_date ON account_invoice(date_invoice)")

    # Use LIMIT for large datasets
    if "GROUP BY" not in sql and "LIMIT" not in sql:
        sql += " LIMIT 10000"

    # Add company filter for multi-tenant
    if "company_id" not in sql and "account_invoice" in sql:
        sql = sql.replace("WHERE", "WHERE company_id = %(company_id)s AND")

    return sql, optimizations

def execute_optimized_query(sql, params):
    """Execute query with optimizations"""
    sql, indexes = optimize_report_query(sql)

    db = get_connection()

    # Create indexes if needed
    for index_sql in indexes:
        try:
            db.execute(index_sql)
        except:
            pass  # Index might already exist

    # Execute query with timeout
    return db.execute(sql, params, timeout=30)

2. Result Caching

Implement intelligent caching:

def get_cached_report_data(report_id, params, cache_strategy="time"):
    """Get report data with intelligent caching"""
    cache_key = _generate_cache_key(report_id, params)

    if cache_strategy == "time":
        # Time-based cache (default 1 hour)
        cache_duration = 3600
        cached_data = get_cached_data(cache_key, cache_duration)

    elif cache_strategy == "data":
        # Data-change based cache
        last_change = get_model("audit.log").get_last_change_time(
            models=["account.invoice", "sale.order"]
        )
        cached_data = get_cached_data(cache_key, since=last_change)

    elif cache_strategy == "manual":
        # Manual cache invalidation
        cached_data = get_cached_data(cache_key)

    if cached_data:
        return cached_data

    # Generate new data
    return generate_report_data(report_id, params)

def invalidate_report_cache(model_names=None):
    """Invalidate report cache for specific models"""
    if model_names:
        # Invalidate cache for reports using these models
        reports = get_model("report").search_browse([
            ["model_id.name", "in", model_names]
        ])
        for report in reports:
            cache_prefix = f"report_{report.id}_"
            clear_cache(prefix=cache_prefix)
    else:
        # Clear all report cache
        clear_cache(prefix="report_")

Best Practices

1. SQL Security

Ensure SQL queries are secure:

def validate_report_sql(sql):
    """Validate SQL query for security"""
    dangerous_keywords = [
        "DROP", "DELETE", "UPDATE", "INSERT", 
        "CREATE", "ALTER", "EXEC", "EXECUTE"
    ]

    sql_upper = sql.upper()
    for keyword in dangerous_keywords:
        if keyword in sql_upper:
            raise Exception(f"Dangerous SQL keyword not allowed: {keyword}")

    # Only allow SELECT statements
    if not sql_upper.strip().startswith("SELECT"):
        raise Exception("Only SELECT statements are allowed")

    # Validate parameter placeholders
    import re
    params = re.findall(r"%\((\w+)\)s", sql)
    return params

def sanitize_report_params(params):
    """Sanitize report parameters"""
    sanitized = {}
    for key, value in params.items():
        if isinstance(value, str):
            # Escape single quotes
            sanitized[key] = value.replace("'", "''")
        else:
            sanitized[key] = value
    return sanitized

2. Error Handling

Implement robust error handling:

def generate_report_safely(report_id, params={}):
    """Generate report with comprehensive error handling"""
    try:
        # Validate report exists
        report = get_model("report").browse(report_id)
        if not report:
            raise Exception(f"Report not found: {report_id}")

        # Check permissions
        if not _check_report_access(report, get_active_user()):
            raise Exception("Access denied to report")

        # Validate SQL
        validate_report_sql(report.sql)

        # Sanitize parameters
        safe_params = sanitize_report_params(params)

        # Generate report
        return generate_report(report_id, safe_params)

    except Exception as e:
        # Log error
        get_model("log").create({
            "level": "error",
            "message": f"Report generation failed: {str(e)}",
            "model": "report",
            "record_id": report_id,
            "user_id": get_active_user()
        })

        # Return error report
        return {
            "error": True,
            "message": str(e),
            "report_id": report_id
        }

def _check_report_access(report, user_id):
    """Check if user has access to report"""
    if not report.access_groups:
        return True  # No restrictions

    user_groups = get_model("base.user").browse(user_id).groups
    user_group_ids = [g.id for g in user_groups]

    report_group_ids = [g.id for g in report.access_groups]

    return bool(set(user_group_ids) & set(report_group_ids))

3. Performance Monitoring

Monitor report performance:

def monitor_report_performance(report_id, execution_time, row_count):
    """Monitor and log report performance"""
    get_model("report.stats").create({
        "report_id": report_id,
        "execution_time": execution_time,
        "row_count": row_count,
        "date": time.strftime("%Y-%m-%d %H:%M:%S"),
        "user_id": get_active_user()
    })

    # Alert if performance is poor
    if execution_time > 30:  # 30 seconds
        get_model("alert").create({
            "type": "performance",
            "message": f"Slow report execution: {report.name} ({execution_time}s)",
            "severity": "warning"
        })

def get_report_performance_stats(report_id, days=30):
    """Get report performance statistics"""
    stats = database.get("""
        SELECT 
            AVG(execution_time) as avg_time,
            MAX(execution_time) as max_time,
            COUNT(*) as run_count,
            AVG(row_count) as avg_rows
        FROM report_stats 
        WHERE report_id = %s 
        AND date >= %s
    """, [report_id, (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")])

    return stats[0] if stats else {}

Next Steps