Reports & Data Analytics¶
Netforce provides a comprehensive reporting system that allows you to create custom reports, dashboards, and data analytics. The system supports various output formats, parameterized reports, and real-time data visualization.
Overview¶
The reporting system consists of several key components:
report- Report definitions with SQL queries and parametersreport.template- Report templates for different output formatsreport.data- Generated report data and caching- Dashboard widgets - Real-time data visualization components
Core Models¶
1. Report Model (report)¶
The report model defines the data source and configuration:
class Report(Model):
_name = "report"
_fields = {
"name": fields.Char("Report Name", required=True),
"type": fields.Selection([
["list", "List Report"],
["summary", "Summary Report"],
["chart", "Chart"],
["dashboard", "Dashboard Widget"]
], "Report Type"),
"model_id": fields.Many2One("model", "Data Model"),
"sql": fields.Text("SQL Query"),
"params": fields.One2Many("report.param", "report_id", "Parameters"),
"template_id": fields.Many2One("report.template", "Template"),
"company_id": fields.Many2One("company", "Company"),
"access_groups": fields.Many2Many("user.group", "Access Groups"),
}
2. Report Parameters (report.param)¶
Define user input parameters for reports:
class ReportParam(Model):
_name = "report.param"
_fields = {
"report_id": fields.Many2One("report", "Report"),
"name": fields.Char("Parameter Name", required=True),
"string": fields.Char("Display Label"),
"type": fields.Selection([
["char", "Text"],
["date", "Date"],
["datetime", "Date & Time"],
["selection", "Selection"],
["many2one", "Reference"]
], "Type"),
"required": fields.Boolean("Required"),
"default_value": fields.Char("Default Value"),
"sequence": fields.Integer("Sequence"),
}
Creating Reports¶
1. Basic List Report¶
Create a simple list report showing invoice data:
def create_invoice_report():
"""Create invoice list report"""
report_id = get_model("report").create({
"name": "Invoice List Report",
"type": "list",
"sql": """
SELECT
number,
contact_id,
date_invoice,
amount_total,
state
FROM account_invoice
WHERE date_invoice >= %(date_from)s
AND date_invoice <= %(date_to)s
ORDER BY date_invoice DESC
""",
})
# Add parameters
get_model("report.param").create([
{
"report_id": report_id,
"name": "date_from",
"string": "From Date",
"type": "date",
"required": True,
"sequence": 1
},
{
"report_id": report_id,
"name": "date_to",
"string": "To Date",
"type": "date",
"required": True,
"sequence": 2
}
])
return report_id
2. Summary Report with Grouping¶
Create aggregated summary reports:
def create_sales_summary():
"""Create sales summary report"""
return get_model("report").create({
"name": "Sales Summary by Month",
"type": "summary",
"sql": """
SELECT
DATE_TRUNC('month', date_invoice) as month,
COUNT(*) as invoice_count,
SUM(amount_total) as total_sales,
AVG(amount_total) as avg_sales
FROM account_invoice
WHERE type = 'out'
AND date_invoice >= %(date_from)s
AND date_invoice <= %(date_to)s
AND company_id = %(company_id)s
GROUP BY DATE_TRUNC('month', date_invoice)
ORDER BY month
"""
})
3. Chart Reports¶
Create visual chart reports:
def create_sales_chart():
"""Create sales trend chart"""
return get_model("report").create({
"name": "Sales Trend Chart",
"type": "chart",
"sql": """
SELECT
DATE_TRUNC('week', date_invoice) as week,
SUM(amount_total) as sales
FROM account_invoice
WHERE type = 'out'
AND date_invoice >= %(date_from)s
AND date_invoice <= %(date_to)s
GROUP BY DATE_TRUNC('week', date_invoice)
ORDER BY week
""",
"chart_config": {
"type": "line",
"x_field": "week",
"y_field": "sales",
"title": "Weekly Sales Trend"
}
})
Report Generation¶
1. Basic Report Generation¶
Generate reports programmatically:
def generate_report(report_id, params={}, context={}):
"""Generate report with parameters"""
report = get_model("report").browse(report_id)
# Validate parameters
_validate_report_params(report, params)
# Add company context if needed
if report.company_id:
params["company_id"] = report.company_id.id
# Execute SQL query
sql = report.sql
db = get_connection()
try:
res = db.query(sql, params)
data = res.fetchall()
columns = [desc[0] for desc in res.description]
# Store generated data
report_data_id = get_model("report.data").create({
"report_id": report_id,
"params": json.dumps(params),
"data": json.dumps({
"columns": columns,
"rows": data
}),
"generate_date": time.strftime("%Y-%m-%d %H:%M:%S")
})
return report_data_id
except Exception as e:
raise Exception(f"Report generation failed: {str(e)}")
def _validate_report_params(report, params):
"""Validate report parameters"""
for param in report.params:
if param.required and param.name not in params:
raise Exception(f"Required parameter missing: {param.string}")
if param.type == "date" and param.name in params:
try:
datetime.strptime(params[param.name], "%Y-%m-%d")
except ValueError:
raise Exception(f"Invalid date format: {param.name}")
2. Cached Report Generation¶
Implement caching for performance:
def get_cached_report(report_id, params={}, cache_minutes=30):
"""Get report with caching"""
cache_key = _get_cache_key(report_id, params)
# Check for cached data
cached_data = get_model("report.data").search_browse([
["report_id", "=", report_id],
["cache_key", "=", cache_key],
["generate_date", ">=",
(datetime.now() - timedelta(minutes=cache_minutes)).strftime("%Y-%m-%d %H:%M:%S")]
])
if cached_data:
return cached_data[0]
# Generate new report
return generate_report(report_id, params)
def _get_cache_key(report_id, params):
"""Generate cache key for report"""
param_str = json.dumps(params, sort_keys=True)
return hashlib.md5(f"{report_id}_{param_str}".encode()).hexdigest()
Report Templates¶
1. HTML Templates¶
Create HTML templates for web display:
def create_html_template(report_id):
"""Create HTML template"""
return get_model("report.template").create({
"report_id": report_id,
"name": "HTML Template",
"type": "html",
"template": """
<html>
<head>
<title>{{report.name}}</title>
<style>
table { border-collapse: collapse; width: 100%; }
th, td { border: 1px solid #ddd; padding: 8px; }
th { background-color: #f2f2f2; }
</style>
</head>
<body>
<h1>{{report.name}}</h1>
<p>Generated: {{generate_date}}</p>
<table>
<thead>
<tr>
{% for col in columns %}
<th>{{col}}</th>
{% endfor %}
</tr>
</thead>
<tbody>
{% for row in rows %}
<tr>
{% for cell in row %}
<td>{{cell}}</td>
{% endfor %}
</tr>
{% endfor %}
</tbody>
</table>
</body>
</html>
"""
})
2. PDF Templates¶
Create PDF templates using reportlab:
def create_pdf_template(report_id):
"""Create PDF template"""
return get_model("report.template").create({
"report_id": report_id,
"name": "PDF Template",
"type": "pdf",
"template": """
import reportlab
from reportlab.lib.pagesizes import letter, A4
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle
from reportlab.lib import colors
def generate_pdf(data, filename):
doc = SimpleDocTemplate(filename, pagesize=A4)
elements = []
# Create table
table_data = [data['columns']] + data['rows']
table = Table(table_data)
# Style table
table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 14),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('BACKGROUND', (0, 1), (-1, -1), colors.beige),
('GRID', (0, 0), (-1, -1), 1, colors.black)
]))
elements.append(table)
doc.build(elements)
return filename
"""
})
3. Excel Templates¶
Create Excel export templates:
def create_excel_template(report_id):
"""Create Excel template"""
return get_model("report.template").create({
"report_id": report_id,
"name": "Excel Template",
"type": "excel",
"template": """
import openpyxl
from openpyxl.styles import Font, PatternFill
def generate_excel(data, filename):
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "Report Data"
# Add headers
headers = data['columns']
for col, header in enumerate(headers, 1):
cell = ws.cell(row=1, column=col)
cell.value = header
cell.font = Font(bold=True)
cell.fill = PatternFill("solid", fgColor="CCCCCC")
# Add data rows
for row_idx, row_data in enumerate(data['rows'], 2):
for col_idx, cell_value in enumerate(row_data, 1):
ws.cell(row=row_idx, column=col_idx, value=cell_value)
wb.save(filename)
return filename
"""
})
Dashboard Widgets¶
1. KPI Widgets¶
Create key performance indicator widgets:
def create_kpi_widget(name, sql_query):
"""Create KPI dashboard widget"""
return get_model("report").create({
"name": name,
"type": "dashboard",
"widget_type": "kpi",
"sql": sql_query,
"refresh_interval": 300, # 5 minutes
"chart_config": {
"format": "currency", # or "number", "percentage"
"trend": True, # Show trend arrow
"color": "primary"
}
})
# Example KPI widgets
def setup_sales_kpis():
"""Setup sales KPI widgets"""
kpis = [
{
"name": "Today's Sales",
"sql": """
SELECT COALESCE(SUM(amount_total), 0) as value
FROM account_invoice
WHERE type='out' AND date_invoice = CURRENT_DATE
"""
},
{
"name": "Monthly Sales",
"sql": """
SELECT COALESCE(SUM(amount_total), 0) as value
FROM account_invoice
WHERE type='out'
AND date_invoice >= DATE_TRUNC('month', CURRENT_DATE)
"""
},
{
"name": "Outstanding Invoices",
"sql": """
SELECT COUNT(*) as value
FROM account_invoice
WHERE type='out' AND state='open'
"""
}
]
for kpi in kpis:
create_kpi_widget(kpi["name"], kpi["sql"])
2. Chart Widgets¶
Create chart widgets for dashboards:
def create_chart_widget(name, chart_type, sql_query):
"""Create chart dashboard widget"""
return get_model("report").create({
"name": name,
"type": "dashboard",
"widget_type": "chart",
"sql": sql_query,
"chart_config": {
"type": chart_type, # bar, line, pie, doughnut
"responsive": True,
"legend": {"display": True}
}
})
# Example chart widgets
def setup_dashboard_charts():
"""Setup dashboard chart widgets"""
# Sales by month (bar chart)
get_model("report").create({
"name": "Sales by Month",
"type": "dashboard",
"widget_type": "chart",
"sql": """
SELECT
TO_CHAR(date_invoice, 'YYYY-MM') as month,
SUM(amount_total) as sales
FROM account_invoice
WHERE type='out'
AND date_invoice >= CURRENT_DATE - INTERVAL '12 months'
GROUP BY TO_CHAR(date_invoice, 'YYYY-MM')
ORDER BY month
""",
"chart_config": {
"type": "bar",
"x_field": "month",
"y_field": "sales"
}
})
# Top customers (pie chart)
get_model("report").create({
"name": "Top Customers",
"type": "dashboard",
"widget_type": "chart",
"sql": """
SELECT
c.name as customer,
SUM(i.amount_total) as total_sales
FROM account_invoice i
JOIN contact c ON i.contact_id = c.id
WHERE i.type='out'
AND i.date_invoice >= CURRENT_DATE - INTERVAL '12 months'
GROUP BY c.id, c.name
ORDER BY total_sales DESC
LIMIT 10
""",
"chart_config": {
"type": "pie",
"label_field": "customer",
"value_field": "total_sales"
}
})
Advanced Features¶
1. Drill-down Reports¶
Create interactive drill-down functionality:
def create_drilldown_report():
"""Create report with drill-down capability"""
# Master report - Sales by product category
master_report = get_model("report").create({
"name": "Sales by Category",
"type": "summary",
"sql": """
SELECT
pc.name as category,
COUNT(sol.id) as line_count,
SUM(sol.amount) as total_sales,
'product_sales' as drill_report,
pc.id as drill_param
FROM sale_order_line sol
JOIN product p ON sol.product_id = p.id
JOIN product_categ pc ON p.categ_id = pc.id
WHERE sol.date >= %(date_from)s
GROUP BY pc.id, pc.name
ORDER BY total_sales DESC
""",
"drill_enabled": True
})
# Detail report - Products in category
detail_report = get_model("report").create({
"name": "Product Sales Detail",
"type": "list",
"sql": """
SELECT
p.name as product,
SUM(sol.qty) as qty_sold,
SUM(sol.amount) as total_sales,
AVG(sol.unit_price) as avg_price
FROM sale_order_line sol
JOIN product p ON sol.product_id = p.id
WHERE p.categ_id = %(category_id)s
AND sol.date >= %(date_from)s
GROUP BY p.id, p.name
ORDER BY total_sales DESC
"""
})
return master_report, detail_report
2. Real-time Reports¶
Create reports with real-time data updates:
def create_realtime_report(report_id):
"""Enable real-time updates for report"""
report = get_model("report").browse(report_id)
# Configure WebSocket updates
report.write({
"realtime": True,
"refresh_interval": 30, # seconds
"trigger_models": ["sale.order", "account.invoice"], # Models to watch
})
# Set up trigger function
def trigger_report_refresh(model_name, record_id, action):
"""Trigger report refresh on model changes"""
if model_name in report.trigger_models:
# Send WebSocket message to update report
get_model("websocket").send_message({
"type": "report_refresh",
"report_id": report.id,
"timestamp": time.time()
})
# Register trigger
get_model("trigger").create({
"model_id": report.model_id.id,
"event": "after_write",
"function": trigger_report_refresh
})
3. Scheduled Reports¶
Create scheduled report generation:
def schedule_report(report_id, schedule_config):
"""Schedule automatic report generation"""
get_model("bg.task").create({
"name": f"Scheduled Report: {report.name}",
"model": "report",
"method": "generate_scheduled",
"args": json.dumps([report_id]),
"date_run": schedule_config["start_date"],
"repeat": schedule_config.get("repeat", "daily"),
"repeat_interval": schedule_config.get("interval", 1),
"user_id": get_active_user(),
"state": "active"
})
def generate_scheduled(self, ids, context={}):
"""Generate scheduled report and email"""
for report_id in ids:
report = self.browse(report_id)
# Generate report
params = report.get_default_params()
report_data_id = generate_report(report_id, params)
# Email to subscribers
if report.email_subscribers:
self.email_report(report_data_id, report.email_subscribers)
def email_report(self, report_data_id, recipients):
"""Email report to recipients"""
report_data = get_model("report.data").browse(report_data_id)
report = report_data.report_id
# Generate PDF attachment
pdf_file = self.generate_pdf_output(report_data_id)
# Send email
get_model("email.message").create({
"to": ";".join(recipients),
"subject": f"Scheduled Report: {report.name}",
"body": f"""
<p>Please find attached the scheduled report: <strong>{report.name}</strong></p>
<p>Generated: {report_data.generate_date}</p>
<p>Parameters: {report_data.params}</p>
""",
"attachments": [pdf_file],
"type": "out"
})
Performance Optimization¶
1. Query Optimization¶
Optimize report queries for performance:
def optimize_report_query(sql):
"""Optimize SQL query for better performance"""
optimizations = []
# Add proper indexes
if "ORDER BY" in sql and "date_invoice" in sql:
optimizations.append("CREATE INDEX IF NOT EXISTS idx_invoice_date ON account_invoice(date_invoice)")
# Use LIMIT for large datasets
if "GROUP BY" not in sql and "LIMIT" not in sql:
sql += " LIMIT 10000"
# Add company filter for multi-tenant
if "company_id" not in sql and "account_invoice" in sql:
sql = sql.replace("WHERE", "WHERE company_id = %(company_id)s AND")
return sql, optimizations
def execute_optimized_query(sql, params):
"""Execute query with optimizations"""
sql, indexes = optimize_report_query(sql)
db = get_connection()
# Create indexes if needed
for index_sql in indexes:
try:
db.execute(index_sql)
except:
pass # Index might already exist
# Execute query with timeout
return db.execute(sql, params, timeout=30)
2. Result Caching¶
Implement intelligent caching:
def get_cached_report_data(report_id, params, cache_strategy="time"):
"""Get report data with intelligent caching"""
cache_key = _generate_cache_key(report_id, params)
if cache_strategy == "time":
# Time-based cache (default 1 hour)
cache_duration = 3600
cached_data = get_cached_data(cache_key, cache_duration)
elif cache_strategy == "data":
# Data-change based cache
last_change = get_model("audit.log").get_last_change_time(
models=["account.invoice", "sale.order"]
)
cached_data = get_cached_data(cache_key, since=last_change)
elif cache_strategy == "manual":
# Manual cache invalidation
cached_data = get_cached_data(cache_key)
if cached_data:
return cached_data
# Generate new data
return generate_report_data(report_id, params)
def invalidate_report_cache(model_names=None):
"""Invalidate report cache for specific models"""
if model_names:
# Invalidate cache for reports using these models
reports = get_model("report").search_browse([
["model_id.name", "in", model_names]
])
for report in reports:
cache_prefix = f"report_{report.id}_"
clear_cache(prefix=cache_prefix)
else:
# Clear all report cache
clear_cache(prefix="report_")
Best Practices¶
1. SQL Security¶
Ensure SQL queries are secure:
def validate_report_sql(sql):
"""Validate SQL query for security"""
dangerous_keywords = [
"DROP", "DELETE", "UPDATE", "INSERT",
"CREATE", "ALTER", "EXEC", "EXECUTE"
]
sql_upper = sql.upper()
for keyword in dangerous_keywords:
if keyword in sql_upper:
raise Exception(f"Dangerous SQL keyword not allowed: {keyword}")
# Only allow SELECT statements
if not sql_upper.strip().startswith("SELECT"):
raise Exception("Only SELECT statements are allowed")
# Validate parameter placeholders
import re
params = re.findall(r"%\((\w+)\)s", sql)
return params
def sanitize_report_params(params):
"""Sanitize report parameters"""
sanitized = {}
for key, value in params.items():
if isinstance(value, str):
# Escape single quotes
sanitized[key] = value.replace("'", "''")
else:
sanitized[key] = value
return sanitized
2. Error Handling¶
Implement robust error handling:
def generate_report_safely(report_id, params={}):
"""Generate report with comprehensive error handling"""
try:
# Validate report exists
report = get_model("report").browse(report_id)
if not report:
raise Exception(f"Report not found: {report_id}")
# Check permissions
if not _check_report_access(report, get_active_user()):
raise Exception("Access denied to report")
# Validate SQL
validate_report_sql(report.sql)
# Sanitize parameters
safe_params = sanitize_report_params(params)
# Generate report
return generate_report(report_id, safe_params)
except Exception as e:
# Log error
get_model("log").create({
"level": "error",
"message": f"Report generation failed: {str(e)}",
"model": "report",
"record_id": report_id,
"user_id": get_active_user()
})
# Return error report
return {
"error": True,
"message": str(e),
"report_id": report_id
}
def _check_report_access(report, user_id):
"""Check if user has access to report"""
if not report.access_groups:
return True # No restrictions
user_groups = get_model("base.user").browse(user_id).groups
user_group_ids = [g.id for g in user_groups]
report_group_ids = [g.id for g in report.access_groups]
return bool(set(user_group_ids) & set(report_group_ids))
3. Performance Monitoring¶
Monitor report performance:
def monitor_report_performance(report_id, execution_time, row_count):
"""Monitor and log report performance"""
get_model("report.stats").create({
"report_id": report_id,
"execution_time": execution_time,
"row_count": row_count,
"date": time.strftime("%Y-%m-%d %H:%M:%S"),
"user_id": get_active_user()
})
# Alert if performance is poor
if execution_time > 30: # 30 seconds
get_model("alert").create({
"type": "performance",
"message": f"Slow report execution: {report.name} ({execution_time}s)",
"severity": "warning"
})
def get_report_performance_stats(report_id, days=30):
"""Get report performance statistics"""
stats = database.get("""
SELECT
AVG(execution_time) as avg_time,
MAX(execution_time) as max_time,
COUNT(*) as run_count,
AVG(row_count) as avg_rows
FROM report_stats
WHERE report_id = %s
AND date >= %s
""", [report_id, (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")])
return stats[0] if stats else {}
Next Steps¶
- Learn about Background Jobs for scheduled report generation
- Explore Security for report access control
- Check Multi-Company for company-specific reports
- Review Advanced Patterns for complex reporting scenarios