Skip to content

Data Import & Export

Netforce provides built-in functionality for importing and exporting data in CSV format. This system allows for bulk data operations, data migration, and integration with external systems.

Overview

The import/export system consists of:

  • CSV Import - import_csv() method on models
  • CSV Export - export_data() method on models
  • Import UI - Generic import wizard (import.data model)
  • Export UI - Built-in export functionality

Data Export

Basic Export

All models automatically support data export:

# Export specific records
invoice_ids = [1, 2, 3]
field_paths = ["number", "date", "customer_id.name", "amount_total"]
csv_data = get_model("account.invoice").export_data(invoice_ids, field_paths)

# CSV data is returned as string
with open("invoices.csv", "w") as f:
    f.write(csv_data)

Export Field Paths

Use dot notation for related fields:

field_paths = [
    "number",                    # Simple field
    "date",                      # Date field
    "customer_id.name",          # Related field (Many2One)
    "customer_id.email",         # Related field chain
    "lines.product_id.name",     # One2Many -> Many2One
    "lines.qty",                 # One2Many field
    "lines.price",               # One2Many field
    "amount_total",              # Computed field
]

csv_data = get_model("sale.order").export_data(order_ids, field_paths)

Export from Frontend

The frontend automatically provides export functionality:

// Export current list view data
rpc.execute("account.invoice", "export_data", [
    [1, 2, 3, 4, 5],  // record IDs
    ["number", "date", "customer_id.name", "amount_total"]  // fields
], {}, (err, csv_data) => {
    // Download CSV file
    const blob = new Blob([csv_data], {type: "text/csv"});
    const url = URL.createObjectURL(blob);
    const a = document.createElement("a");
    a.href = url;
    a.download = "export.csv";
    a.click();
});

Data Import

CSV Import Method

Models can implement import_csv() for bulk data import:

class Product(Model):
    _name = "product.product"

    def import_csv(self, file_path, context={}):
        """Import products from CSV file"""
        import csv

        # Open and parse CSV
        with open(file_path, 'r', encoding='utf-8') as csvfile:
            reader = csv.DictReader(csvfile)

            created_count = 0
            updated_count = 0
            errors = []

            for row_num, row in enumerate(reader, start=2):
                try:
                    # Clean and validate data
                    data = self._prepare_import_data(row)

                    # Check if record exists
                    existing_id = self._find_existing_record(data)

                    if existing_id:
                        # Update existing
                        self.write([existing_id], data, context)
                        updated_count += 1
                    else:
                        # Create new
                        self.create(data, context)
                        created_count += 1

                except Exception as e:
                    errors.append(f"Row {row_num}: {str(e)}")

        # Return import results
        return {
            "created": created_count,
            "updated": updated_count, 
            "errors": errors
        }

    def _prepare_import_data(self, row):
        """Clean and validate CSV row data"""
        data = {}

        # Map CSV columns to model fields
        field_mapping = {
            "Product Code": "code",
            "Product Name": "name",
            "Category": "categ_id",
            "Price": "price",
            "Active": "active"
        }

        for csv_field, model_field in field_mapping.items():
            if csv_field in row and row[csv_field]:
                value = row[csv_field].strip()

                # Handle different field types
                if model_field == "price":
                    data[model_field] = float(value) if value else 0.0
                elif model_field == "active":
                    data[model_field] = value.lower() in ["true", "1", "yes"]
                elif model_field == "categ_id":
                    # Look up related record
                    categ_id = self._find_category(value)
                    if categ_id:
                        data[model_field] = categ_id
                else:
                    data[model_field] = value

        return data

    def _find_existing_record(self, data):
        """Find existing record by unique identifier"""
        if "code" in data:
            existing = self.search([["code", "=", data["code"]]])
            return existing[0] if existing else None
        return None

    def _find_category(self, category_name):
        """Find category by name, create if not exists"""
        categ_ids = get_model("product.categ").search([["name", "=", category_name]])

        if categ_ids:
            return categ_ids[0]
        else:
            # Create new category
            return get_model("product.categ").create({"name": category_name})

Import with Validation

Add comprehensive validation to your import process:

def import_csv_with_validation(self, file_path, context={}):
    """Import with detailed validation and error handling"""
    import csv

    validation_errors = []
    import_data = []

    # First pass: Validate all data
    with open(file_path, 'r', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)

        for row_num, row in enumerate(reader, start=2):
            try:
                data = self._prepare_import_data(row)

                # Validate required fields
                errors = self._validate_required_fields(data, row_num)
                if errors:
                    validation_errors.extend(errors)
                    continue

                # Validate field formats
                errors = self._validate_field_formats(data, row_num)
                if errors:
                    validation_errors.extend(errors)
                    continue

                # Validate business rules
                errors = self._validate_business_rules(data, row_num)
                if errors:
                    validation_errors.extend(errors)
                    continue

                import_data.append((row_num, data))

            except Exception as e:
                validation_errors.append(f"Row {row_num}: Unexpected error - {str(e)}")

    # Stop if validation errors
    if validation_errors:
        raise Exception("Validation failed:\n" + "\n".join(validation_errors))

    # Second pass: Import validated data
    return self._perform_import(import_data, context)

def _validate_required_fields(self, data, row_num):
    """Validate required fields are present"""
    errors = []
    required_fields = ["code", "name"]

    for field in required_fields:
        if not data.get(field):
            errors.append(f"Row {row_num}: Missing required field '{field}'")

    return errors

def _validate_field_formats(self, data, row_num):
    """Validate field data formats"""
    errors = []

    # Validate numeric fields
    if "price" in data:
        try:
            float(data["price"])
        except ValueError:
            errors.append(f"Row {row_num}: Invalid price format")

    # Validate email format
    if "email" in data and data["email"]:
        import re
        if not re.match(r'^[^@]+@[^@]+\.[^@]+$', data["email"]):
            errors.append(f"Row {row_num}: Invalid email format")

    return errors

def _validate_business_rules(self, data, row_num):
    """Validate business-specific rules"""
    errors = []

    # Check for duplicate codes
    if "code" in data:
        existing = self.search([["code", "=", data["code"]]])
        if existing:
            errors.append(f"Row {row_num}: Duplicate product code '{data['code']}'")

    # Validate price ranges
    if "price" in data and float(data["price"]) < 0:
        errors.append(f"Row {row_num}: Price cannot be negative")

    return errors

Import UI Integration

Generic Import Wizard

Use the built-in import wizard for any model:

# models/my_model.py
class MyModel(Model):
    _name = "my.model"

    # Implement import_csv method
    def import_csv(self, file_path, context={}):
        # Your import logic here
        pass
<!-- actions/my_actions.xml -->
<action>
    <field name="view">form</field>
    <field name="string">Import My Data</field>
    <field name="model">import.data</field>
    <field name="target">popup</field>
    <field name="context">{"import_model": "my.model"}</field>
</action>

Custom Import Forms

Create custom import wizards for complex scenarios:

class ProductImportWizard(Model):
    _name = "product.import.wizard"
    _transient = True

    _fields = {
        "file": fields.File("CSV File", required=True),
        "update_existing": fields.Boolean("Update Existing Products"),
        "create_categories": fields.Boolean("Auto-create Categories"),
        "default_category_id": fields.Many2One("product.categ", "Default Category"),
        "dry_run": fields.Boolean("Dry Run (Validate Only)"),
    }

    def import_products(self, ids, context={}):
        """Import products with wizard options"""
        obj = self.browse(ids[0])

        import_context = {
            "update_existing": obj.update_existing,
            "create_categories": obj.create_categories,
            "default_category_id": obj.default_category_id.id,
            "dry_run": obj.dry_run,
        }

        try:
            result = get_model("product.product").import_csv(
                obj.file, context=import_context
            )

            if obj.dry_run:
                return {
                    "flash": f"Validation successful: {result['valid_rows']} rows ready to import"
                }
            else:
                return {
                    "flash": f"Import complete: {result['created']} created, {result['updated']} updated"
                }

        except Exception as e:
            return {
                "flash": f"Import failed: {str(e)}",
                "flash_type": "danger"
            }

Advanced Import Patterns

Batch Processing

Handle large imports efficiently:

def import_large_csv(self, file_path, batch_size=1000, context={}):
    """Import large CSV files in batches"""
    import csv

    batch = []
    batch_num = 1
    total_created = 0
    total_errors = []

    with open(file_path, 'r', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)

        for row in reader:
            batch.append(row)

            if len(batch) >= batch_size:
                # Process batch
                try:
                    result = self._import_batch(batch, batch_num, context)
                    total_created += result["created"]
                    total_errors.extend(result["errors"])
                except Exception as e:
                    total_errors.append(f"Batch {batch_num} failed: {str(e)}")

                batch = []
                batch_num += 1

                # Optional: Progress callback
                if hasattr(context, 'progress_callback'):
                    context['progress_callback'](batch_num, total_created)

        # Process remaining batch
        if batch:
            try:
                result = self._import_batch(batch, batch_num, context)
                total_created += result["created"]
                total_errors.extend(result["errors"])
            except Exception as e:
                total_errors.append(f"Final batch failed: {str(e)}")

    return {
        "created": total_created,
        "errors": total_errors,
        "batches_processed": batch_num
    }

Relationship Handling

Import data with complex relationships:

def import_orders_with_lines(self, file_path, context={}):
    """Import orders with order lines from CSV"""
    import csv

    orders = {}  # Group lines by order

    with open(file_path, 'r', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)

        for row in reader:
            order_ref = row["Order Number"]

            if order_ref not in orders:
                orders[order_ref] = {
                    "header": {
                        "number": order_ref,
                        "customer_id": self._find_customer(row["Customer"]),
                        "date": row["Order Date"],
                    },
                    "lines": []
                }

            # Add line to order
            orders[order_ref]["lines"].append({
                "product_id": self._find_product(row["Product Code"]),
                "qty": float(row["Quantity"]),
                "price": float(row["Unit Price"]),
            })

    # Create orders with lines
    created_orders = []
    for order_ref, order_data in orders.items():
        try:
            # Prepare One2Many data
            order_data["header"]["lines"] = [
                [0, 0, line_data] for line_data in order_data["lines"]
            ]

            order_id = self.create(order_data["header"], context)
            created_orders.append(order_id)

        except Exception as e:
            print(f"Failed to create order {order_ref}: {str(e)}")

    return {"created_orders": len(created_orders)}

Export Templates

Creating Export Templates

Generate CSV templates for imports:

def generate_import_template(self, context={}):
    """Generate CSV template for imports"""
    import csv
    import io

    # Define template fields
    template_fields = [
        {"name": "code", "label": "Product Code", "required": True},
        {"name": "name", "label": "Product Name", "required": True},
        {"name": "categ_id", "label": "Category", "required": False},
        {"name": "price", "label": "Price", "required": False},
        {"name": "active", "label": "Active (true/false)", "required": False},
    ]

    # Create CSV with headers and sample data
    output = io.StringIO()
    writer = csv.writer(output)

    # Write headers
    headers = [field["label"] for field in template_fields]
    writer.writerow(headers)

    # Write sample data
    writer.writerow([
        "PROD001",
        "Sample Product",
        "Electronics",
        "99.99",
        "true"
    ])

    csv_content = output.getvalue()
    output.close()

    return csv_content

# Usage
template_csv = get_model("product.product").generate_import_template()
with open("product_import_template.csv", "w") as f:
    f.write(template_csv)

Error Handling & Logging

Comprehensive Error Tracking

def import_with_detailed_logging(self, file_path, context={}):
    """Import with comprehensive error logging"""
    import_log = {
        "start_time": time.strftime("%Y-%m-%d %H:%M:%S"),
        "file_path": file_path,
        "total_rows": 0,
        "processed_rows": 0,
        "created_records": 0,
        "updated_records": 0,
        "skipped_rows": 0,
        "errors": [],
        "warnings": []
    }

    try:
        with open(file_path, 'r', encoding='utf-8') as csvfile:
            reader = csv.DictReader(csvfile)
            rows = list(reader)
            import_log["total_rows"] = len(rows)

            for row_num, row in enumerate(rows, start=2):
                try:
                    data = self._prepare_import_data(row)

                    # Check for warnings
                    warnings = self._check_data_warnings(data, row_num)
                    import_log["warnings"].extend(warnings)

                    # Import record
                    existing_id = self._find_existing_record(data)
                    if existing_id:
                        self.write([existing_id], data, context)
                        import_log["updated_records"] += 1
                    else:
                        self.create(data, context)
                        import_log["created_records"] += 1

                    import_log["processed_rows"] += 1

                except Exception as e:
                    error_msg = f"Row {row_num}: {str(e)}"
                    import_log["errors"].append(error_msg)
                    import_log["skipped_rows"] += 1

        import_log["end_time"] = time.strftime("%Y-%m-%d %H:%M:%S")
        import_log["status"] = "completed"

    except Exception as e:
        import_log["end_time"] = time.strftime("%Y-%m-%d %H:%M:%S")
        import_log["status"] = "failed"
        import_log["fatal_error"] = str(e)

    # Save import log
    self._save_import_log(import_log)

    return import_log

def _save_import_log(self, import_log):
    """Save import log to database"""
    get_model("import.log").create({
        "model": self._name,
        "start_time": import_log["start_time"],
        "end_time": import_log.get("end_time"),
        "status": import_log["status"],
        "total_rows": import_log["total_rows"],
        "processed_rows": import_log["processed_rows"],
        "created_records": import_log["created_records"],
        "updated_records": import_log["updated_records"],
        "error_count": len(import_log["errors"]),
        "warning_count": len(import_log["warnings"]),
        "details": json.dumps(import_log, indent=2)
    })

Performance Optimization

Bulk Operations

Use bulk database operations for better performance:

def bulk_import_csv(self, file_path, context={}):
    """High-performance bulk import"""
    import csv
    from netforce import database

    # Prepare bulk insert data
    bulk_data = []

    with open(file_path, 'r', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)

        for row in reader:
            data = self._prepare_import_data(row)
            # Add default fields
            data.update({
                "create_time": time.strftime("%Y-%m-%d %H:%M:%S"),
                "create_uid": get_active_user(),
            })
            bulk_data.append(data)

    # Bulk insert using raw SQL
    if bulk_data:
        fields = list(bulk_data[0].keys())
        placeholders = ", ".join(["%s"] * len(fields))
        field_list = ", ".join(fields)

        sql = f"INSERT INTO {self._table} ({field_list}) VALUES ({placeholders})"

        values_list = []
        for data in bulk_data:
            values_list.append([data[field] for field in fields])

        database.get().executemany(sql, values_list)
        database.get().commit()

    return {"created": len(bulk_data)}

Best Practices

1. Field Mapping

Always provide clear field mapping documentation:

# Document expected CSV format
CSV_FORMAT = {
    "Product Code": "Unique identifier (required)",
    "Product Name": "Display name (required)", 
    "Category": "Category name (will be created if not exists)",
    "Price": "Numeric price in base currency",
    "Active": "true/false or 1/0"
}

2. Data Validation

Implement comprehensive validation:

def validate_import_data(self, data):
    """Validate data before import"""
    errors = []

    # Required field validation
    if not data.get("code"):
        errors.append("Product code is required")

    # Format validation
    if "price" in data:
        try:
            price = float(data["price"])
            if price < 0:
                errors.append("Price cannot be negative")
        except ValueError:
            errors.append("Invalid price format")

    # Business rule validation
    if data.get("code"):
        existing = self.search([["code", "=", data["code"]]])
        if existing and not context.get("update_existing"):
            errors.append(f"Product code '{data['code']}' already exists")

    return errors

3. Progress Feedback

Provide progress feedback for large imports:

def import_with_progress(self, file_path, progress_callback=None):
    """Import with progress reporting"""
    with open(file_path, 'r') as f:
        total_rows = sum(1 for _ in f) - 1  # Exclude header

    processed = 0

    with open(file_path, 'r') as csvfile:
        reader = csv.DictReader(csvfile)

        for row in reader:
            # Process row
            self._import_row(row)

            processed += 1

            # Report progress
            if progress_callback and processed % 100 == 0:
                progress_callback(processed, total_rows)

Next Steps