Data Import & Export¶
Netforce provides built-in functionality for importing and exporting data in CSV format. This system allows for bulk data operations, data migration, and integration with external systems.
Overview¶
The import/export system consists of:
- CSV Import -
import_csv()method on models - CSV Export -
export_data()method on models - Import UI - Generic import wizard (
import.datamodel) - Export UI - Built-in export functionality
Data Export¶
Basic Export¶
All models automatically support data export:
# Export specific records
invoice_ids = [1, 2, 3]
field_paths = ["number", "date", "customer_id.name", "amount_total"]
csv_data = get_model("account.invoice").export_data(invoice_ids, field_paths)
# CSV data is returned as string
with open("invoices.csv", "w") as f:
f.write(csv_data)
Export Field Paths¶
Use dot notation for related fields:
field_paths = [
"number", # Simple field
"date", # Date field
"customer_id.name", # Related field (Many2One)
"customer_id.email", # Related field chain
"lines.product_id.name", # One2Many -> Many2One
"lines.qty", # One2Many field
"lines.price", # One2Many field
"amount_total", # Computed field
]
csv_data = get_model("sale.order").export_data(order_ids, field_paths)
Export from Frontend¶
The frontend automatically provides export functionality:
// Export current list view data
rpc.execute("account.invoice", "export_data", [
[1, 2, 3, 4, 5], // record IDs
["number", "date", "customer_id.name", "amount_total"] // fields
], {}, (err, csv_data) => {
// Download CSV file
const blob = new Blob([csv_data], {type: "text/csv"});
const url = URL.createObjectURL(blob);
const a = document.createElement("a");
a.href = url;
a.download = "export.csv";
a.click();
});
Data Import¶
CSV Import Method¶
Models can implement import_csv() for bulk data import:
class Product(Model):
_name = "product.product"
def import_csv(self, file_path, context={}):
"""Import products from CSV file"""
import csv
# Open and parse CSV
with open(file_path, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
created_count = 0
updated_count = 0
errors = []
for row_num, row in enumerate(reader, start=2):
try:
# Clean and validate data
data = self._prepare_import_data(row)
# Check if record exists
existing_id = self._find_existing_record(data)
if existing_id:
# Update existing
self.write([existing_id], data, context)
updated_count += 1
else:
# Create new
self.create(data, context)
created_count += 1
except Exception as e:
errors.append(f"Row {row_num}: {str(e)}")
# Return import results
return {
"created": created_count,
"updated": updated_count,
"errors": errors
}
def _prepare_import_data(self, row):
"""Clean and validate CSV row data"""
data = {}
# Map CSV columns to model fields
field_mapping = {
"Product Code": "code",
"Product Name": "name",
"Category": "categ_id",
"Price": "price",
"Active": "active"
}
for csv_field, model_field in field_mapping.items():
if csv_field in row and row[csv_field]:
value = row[csv_field].strip()
# Handle different field types
if model_field == "price":
data[model_field] = float(value) if value else 0.0
elif model_field == "active":
data[model_field] = value.lower() in ["true", "1", "yes"]
elif model_field == "categ_id":
# Look up related record
categ_id = self._find_category(value)
if categ_id:
data[model_field] = categ_id
else:
data[model_field] = value
return data
def _find_existing_record(self, data):
"""Find existing record by unique identifier"""
if "code" in data:
existing = self.search([["code", "=", data["code"]]])
return existing[0] if existing else None
return None
def _find_category(self, category_name):
"""Find category by name, create if not exists"""
categ_ids = get_model("product.categ").search([["name", "=", category_name]])
if categ_ids:
return categ_ids[0]
else:
# Create new category
return get_model("product.categ").create({"name": category_name})
Import with Validation¶
Add comprehensive validation to your import process:
def import_csv_with_validation(self, file_path, context={}):
"""Import with detailed validation and error handling"""
import csv
validation_errors = []
import_data = []
# First pass: Validate all data
with open(file_path, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row_num, row in enumerate(reader, start=2):
try:
data = self._prepare_import_data(row)
# Validate required fields
errors = self._validate_required_fields(data, row_num)
if errors:
validation_errors.extend(errors)
continue
# Validate field formats
errors = self._validate_field_formats(data, row_num)
if errors:
validation_errors.extend(errors)
continue
# Validate business rules
errors = self._validate_business_rules(data, row_num)
if errors:
validation_errors.extend(errors)
continue
import_data.append((row_num, data))
except Exception as e:
validation_errors.append(f"Row {row_num}: Unexpected error - {str(e)}")
# Stop if validation errors
if validation_errors:
raise Exception("Validation failed:\n" + "\n".join(validation_errors))
# Second pass: Import validated data
return self._perform_import(import_data, context)
def _validate_required_fields(self, data, row_num):
"""Validate required fields are present"""
errors = []
required_fields = ["code", "name"]
for field in required_fields:
if not data.get(field):
errors.append(f"Row {row_num}: Missing required field '{field}'")
return errors
def _validate_field_formats(self, data, row_num):
"""Validate field data formats"""
errors = []
# Validate numeric fields
if "price" in data:
try:
float(data["price"])
except ValueError:
errors.append(f"Row {row_num}: Invalid price format")
# Validate email format
if "email" in data and data["email"]:
import re
if not re.match(r'^[^@]+@[^@]+\.[^@]+$', data["email"]):
errors.append(f"Row {row_num}: Invalid email format")
return errors
def _validate_business_rules(self, data, row_num):
"""Validate business-specific rules"""
errors = []
# Check for duplicate codes
if "code" in data:
existing = self.search([["code", "=", data["code"]]])
if existing:
errors.append(f"Row {row_num}: Duplicate product code '{data['code']}'")
# Validate price ranges
if "price" in data and float(data["price"]) < 0:
errors.append(f"Row {row_num}: Price cannot be negative")
return errors
Import UI Integration¶
Generic Import Wizard¶
Use the built-in import wizard for any model:
# models/my_model.py
class MyModel(Model):
_name = "my.model"
# Implement import_csv method
def import_csv(self, file_path, context={}):
# Your import logic here
pass
<!-- actions/my_actions.xml -->
<action>
<field name="view">form</field>
<field name="string">Import My Data</field>
<field name="model">import.data</field>
<field name="target">popup</field>
<field name="context">{"import_model": "my.model"}</field>
</action>
Custom Import Forms¶
Create custom import wizards for complex scenarios:
class ProductImportWizard(Model):
_name = "product.import.wizard"
_transient = True
_fields = {
"file": fields.File("CSV File", required=True),
"update_existing": fields.Boolean("Update Existing Products"),
"create_categories": fields.Boolean("Auto-create Categories"),
"default_category_id": fields.Many2One("product.categ", "Default Category"),
"dry_run": fields.Boolean("Dry Run (Validate Only)"),
}
def import_products(self, ids, context={}):
"""Import products with wizard options"""
obj = self.browse(ids[0])
import_context = {
"update_existing": obj.update_existing,
"create_categories": obj.create_categories,
"default_category_id": obj.default_category_id.id,
"dry_run": obj.dry_run,
}
try:
result = get_model("product.product").import_csv(
obj.file, context=import_context
)
if obj.dry_run:
return {
"flash": f"Validation successful: {result['valid_rows']} rows ready to import"
}
else:
return {
"flash": f"Import complete: {result['created']} created, {result['updated']} updated"
}
except Exception as e:
return {
"flash": f"Import failed: {str(e)}",
"flash_type": "danger"
}
Advanced Import Patterns¶
Batch Processing¶
Handle large imports efficiently:
def import_large_csv(self, file_path, batch_size=1000, context={}):
"""Import large CSV files in batches"""
import csv
batch = []
batch_num = 1
total_created = 0
total_errors = []
with open(file_path, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
batch.append(row)
if len(batch) >= batch_size:
# Process batch
try:
result = self._import_batch(batch, batch_num, context)
total_created += result["created"]
total_errors.extend(result["errors"])
except Exception as e:
total_errors.append(f"Batch {batch_num} failed: {str(e)}")
batch = []
batch_num += 1
# Optional: Progress callback
if hasattr(context, 'progress_callback'):
context['progress_callback'](batch_num, total_created)
# Process remaining batch
if batch:
try:
result = self._import_batch(batch, batch_num, context)
total_created += result["created"]
total_errors.extend(result["errors"])
except Exception as e:
total_errors.append(f"Final batch failed: {str(e)}")
return {
"created": total_created,
"errors": total_errors,
"batches_processed": batch_num
}
Relationship Handling¶
Import data with complex relationships:
def import_orders_with_lines(self, file_path, context={}):
"""Import orders with order lines from CSV"""
import csv
orders = {} # Group lines by order
with open(file_path, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
order_ref = row["Order Number"]
if order_ref not in orders:
orders[order_ref] = {
"header": {
"number": order_ref,
"customer_id": self._find_customer(row["Customer"]),
"date": row["Order Date"],
},
"lines": []
}
# Add line to order
orders[order_ref]["lines"].append({
"product_id": self._find_product(row["Product Code"]),
"qty": float(row["Quantity"]),
"price": float(row["Unit Price"]),
})
# Create orders with lines
created_orders = []
for order_ref, order_data in orders.items():
try:
# Prepare One2Many data
order_data["header"]["lines"] = [
[0, 0, line_data] for line_data in order_data["lines"]
]
order_id = self.create(order_data["header"], context)
created_orders.append(order_id)
except Exception as e:
print(f"Failed to create order {order_ref}: {str(e)}")
return {"created_orders": len(created_orders)}
Export Templates¶
Creating Export Templates¶
Generate CSV templates for imports:
def generate_import_template(self, context={}):
"""Generate CSV template for imports"""
import csv
import io
# Define template fields
template_fields = [
{"name": "code", "label": "Product Code", "required": True},
{"name": "name", "label": "Product Name", "required": True},
{"name": "categ_id", "label": "Category", "required": False},
{"name": "price", "label": "Price", "required": False},
{"name": "active", "label": "Active (true/false)", "required": False},
]
# Create CSV with headers and sample data
output = io.StringIO()
writer = csv.writer(output)
# Write headers
headers = [field["label"] for field in template_fields]
writer.writerow(headers)
# Write sample data
writer.writerow([
"PROD001",
"Sample Product",
"Electronics",
"99.99",
"true"
])
csv_content = output.getvalue()
output.close()
return csv_content
# Usage
template_csv = get_model("product.product").generate_import_template()
with open("product_import_template.csv", "w") as f:
f.write(template_csv)
Error Handling & Logging¶
Comprehensive Error Tracking¶
def import_with_detailed_logging(self, file_path, context={}):
"""Import with comprehensive error logging"""
import_log = {
"start_time": time.strftime("%Y-%m-%d %H:%M:%S"),
"file_path": file_path,
"total_rows": 0,
"processed_rows": 0,
"created_records": 0,
"updated_records": 0,
"skipped_rows": 0,
"errors": [],
"warnings": []
}
try:
with open(file_path, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
rows = list(reader)
import_log["total_rows"] = len(rows)
for row_num, row in enumerate(rows, start=2):
try:
data = self._prepare_import_data(row)
# Check for warnings
warnings = self._check_data_warnings(data, row_num)
import_log["warnings"].extend(warnings)
# Import record
existing_id = self._find_existing_record(data)
if existing_id:
self.write([existing_id], data, context)
import_log["updated_records"] += 1
else:
self.create(data, context)
import_log["created_records"] += 1
import_log["processed_rows"] += 1
except Exception as e:
error_msg = f"Row {row_num}: {str(e)}"
import_log["errors"].append(error_msg)
import_log["skipped_rows"] += 1
import_log["end_time"] = time.strftime("%Y-%m-%d %H:%M:%S")
import_log["status"] = "completed"
except Exception as e:
import_log["end_time"] = time.strftime("%Y-%m-%d %H:%M:%S")
import_log["status"] = "failed"
import_log["fatal_error"] = str(e)
# Save import log
self._save_import_log(import_log)
return import_log
def _save_import_log(self, import_log):
"""Save import log to database"""
get_model("import.log").create({
"model": self._name,
"start_time": import_log["start_time"],
"end_time": import_log.get("end_time"),
"status": import_log["status"],
"total_rows": import_log["total_rows"],
"processed_rows": import_log["processed_rows"],
"created_records": import_log["created_records"],
"updated_records": import_log["updated_records"],
"error_count": len(import_log["errors"]),
"warning_count": len(import_log["warnings"]),
"details": json.dumps(import_log, indent=2)
})
Performance Optimization¶
Bulk Operations¶
Use bulk database operations for better performance:
def bulk_import_csv(self, file_path, context={}):
"""High-performance bulk import"""
import csv
from netforce import database
# Prepare bulk insert data
bulk_data = []
with open(file_path, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
data = self._prepare_import_data(row)
# Add default fields
data.update({
"create_time": time.strftime("%Y-%m-%d %H:%M:%S"),
"create_uid": get_active_user(),
})
bulk_data.append(data)
# Bulk insert using raw SQL
if bulk_data:
fields = list(bulk_data[0].keys())
placeholders = ", ".join(["%s"] * len(fields))
field_list = ", ".join(fields)
sql = f"INSERT INTO {self._table} ({field_list}) VALUES ({placeholders})"
values_list = []
for data in bulk_data:
values_list.append([data[field] for field in fields])
database.get().executemany(sql, values_list)
database.get().commit()
return {"created": len(bulk_data)}
Best Practices¶
1. Field Mapping¶
Always provide clear field mapping documentation:
# Document expected CSV format
CSV_FORMAT = {
"Product Code": "Unique identifier (required)",
"Product Name": "Display name (required)",
"Category": "Category name (will be created if not exists)",
"Price": "Numeric price in base currency",
"Active": "true/false or 1/0"
}
2. Data Validation¶
Implement comprehensive validation:
def validate_import_data(self, data):
"""Validate data before import"""
errors = []
# Required field validation
if not data.get("code"):
errors.append("Product code is required")
# Format validation
if "price" in data:
try:
price = float(data["price"])
if price < 0:
errors.append("Price cannot be negative")
except ValueError:
errors.append("Invalid price format")
# Business rule validation
if data.get("code"):
existing = self.search([["code", "=", data["code"]]])
if existing and not context.get("update_existing"):
errors.append(f"Product code '{data['code']}' already exists")
return errors
3. Progress Feedback¶
Provide progress feedback for large imports:
def import_with_progress(self, file_path, progress_callback=None):
"""Import with progress reporting"""
with open(file_path, 'r') as f:
total_rows = sum(1 for _ in f) - 1 # Exclude header
processed = 0
with open(file_path, 'r') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
# Process row
self._import_row(row)
processed += 1
# Report progress
if progress_callback and processed % 100 == 0:
progress_callback(processed, total_rows)
Next Steps¶
- Learn about Background Jobs for async imports
- Explore Security for import/export permissions
- Check Models for data validation patterns
- Review API Reference for import/export endpoints