Skip to content

Security & Access Control

Netforce provides a comprehensive security framework that includes user authentication, role-based access control, permission management, and data protection. The system supports multi-level security with fine-grained permissions.

Overview

The security system consists of several key components:

  • User authentication - Login, sessions, and password management
  • Access control - Role-based permissions and field-level security
  • Data protection - Encryption, audit trails, and secure communication
  • Company isolation - Multi-tenant data segregation

Core Security Models

1. Users and Authentication (base.user)

User management and authentication:

class User(Model):
    _name = "base.user"
    _fields = {
        "name": fields.Char("Full Name", required=True),
        "login": fields.Char("Login", required=True),
        "password": fields.Char("Password"),
        "email": fields.Char("Email"),
        "active": fields.Boolean("Active"),
        "groups": fields.Many2Many("user.group", "User Groups"),
        "companies": fields.Many2Many("company", "Companies"),
        "default_company_id": fields.Many2One("company", "Default Company"),
        "last_login": fields.DateTime("Last Login"),
        "failed_login_attempts": fields.Integer("Failed Login Attempts"),
        "locked_until": fields.DateTime("Locked Until"),
        "password_expires": fields.Date("Password Expires"),
        "require_2fa": fields.Boolean("Require Two-Factor Auth"),
        "api_keys": fields.One2Many("api.key", "user_id", "API Keys"),
        "access_log": fields.One2Many("access.log", "user_id", "Access Log"),
    }

    _defaults = {
        "active": True,
        "failed_login_attempts": 0,
    }

2. User Groups (user.group)

Role-based access control through groups:

class UserGroup(Model):
    _name = "user.group"
    _fields = {
        "name": fields.Char("Group Name", required=True),
        "code": fields.Char("Group Code"),
        "description": fields.Text("Description"),
        "users": fields.Many2Many("base.user", "Users"),
        "permissions": fields.One2Many("permission", "group_id", "Permissions"),
        "parent_id": fields.Many2One("user.group", "Parent Group"),
        "child_ids": fields.One2Many("user.group", "parent_id", "Child Groups"),
        "company_id": fields.Many2One("company", "Company"),
    }

    def get_all_permissions(self, ids, context={}):
        """Get all permissions including inherited from parent groups"""
        permissions = set()

        for group in self.browse(ids):
            # Add direct permissions
            for perm in group.permissions:
                permissions.add((perm.model, perm.access_type))

            # Add inherited permissions
            if group.parent_id:
                parent_perms = self.get_all_permissions([group.parent_id.id])
                permissions.update(parent_perms)

        return permissions

3. Permissions (permission)

Fine-grained permission management:

class Permission(Model):
    _name = "permission"
    _fields = {
        "name": fields.Char("Permission Name"),
        "model": fields.Char("Model Name", required=True),
        "group_id": fields.Many2One("user.group", "User Group"),
        "user_id": fields.Many2One("base.user", "User"),
        "access_type": fields.Selection([
            ["read", "Read"],
            ["write", "Write"],
            ["create", "Create"],
            ["delete", "Delete"],
            ["admin", "Admin"]
        ], "Access Type"),
        "field_access": fields.One2Many("field.access", "permission_id", "Field Access"),
        "record_rules": fields.One2Many("record.rule", "permission_id", "Record Rules"),
    }

class FieldAccess(Model):
    _name = "field.access"
    _fields = {
        "permission_id": fields.Many2One("permission", "Permission"),
        "field_name": fields.Char("Field Name", required=True),
        "access_type": fields.Selection([
            ["read", "Read Only"],
            ["write", "Read/Write"],
            ["hidden", "Hidden"]
        ], "Access Type"),
    }

class RecordRule(Model):
    _name = "record.rule"
    _fields = {
        "permission_id": fields.Many2One("permission", "Permission"),
        "name": fields.Char("Rule Name"),
        "domain": fields.Text("Domain Filter"),
        "apply_create": fields.Boolean("Apply on Create"),
        "apply_read": fields.Boolean("Apply on Read"),
        "apply_write": fields.Boolean("Apply on Write"),
        "apply_delete": fields.Boolean("Apply on Delete"),
    }

User Authentication

1. Login and Session Management

Implement secure authentication:

def authenticate_user(login, password, context={}):
    """Authenticate user login"""
    import hashlib
    import secrets

    # Find user
    user_ids = get_model("base.user").search([
        ["login", "=", login],
        ["active", "=", True]
    ])

    if not user_ids:
        # Log failed attempt
        log_security_event("auth_failed", {"login": login, "reason": "user_not_found"})
        raise Exception("Invalid credentials")

    user = get_model("base.user").browse(user_ids[0])

    # Check if account is locked
    if user.locked_until and user.locked_until > datetime.now().strftime("%Y-%m-%d %H:%M:%S"):
        raise Exception("Account temporarily locked")

    # Verify password
    if not verify_password(password, user.password):
        # Increment failed attempts
        failed_attempts = user.failed_login_attempts + 1
        user.write({"failed_login_attempts": failed_attempts})

        # Lock account after 5 failed attempts
        if failed_attempts >= 5:
            lock_until = (datetime.now() + timedelta(minutes=30)).strftime("%Y-%m-%d %H:%M:%S")
            user.write({"locked_until": lock_until})

        log_security_event("auth_failed", {"user_id": user.id, "attempts": failed_attempts})
        raise Exception("Invalid credentials")

    # Check password expiry
    if user.password_expires and user.password_expires < datetime.now().strftime("%Y-%m-%d"):
        raise Exception("Password expired")

    # Reset failed attempts on successful login
    user.write({
        "failed_login_attempts": 0,
        "locked_until": None,
        "last_login": time.strftime("%Y-%m-%d %H:%M:%S")
    })

    # Create session
    session_id = create_user_session(user.id, context)

    # Log successful login
    log_security_event("auth_success", {"user_id": user.id})

    return {
        "user_id": user.id,
        "session_id": session_id,
        "companies": [c.id for c in user.companies],
        "default_company": user.default_company_id.id if user.default_company_id else None
    }

def verify_password(plain_password, hashed_password):
    """Verify password against hash"""
    import bcrypt
    return bcrypt.checkpw(plain_password.encode('utf-8'), hashed_password.encode('utf-8'))

def hash_password(password):
    """Hash password securely"""
    import bcrypt
    salt = bcrypt.gensalt()
    return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8')

2. Two-Factor Authentication

Implement 2FA for enhanced security:

def setup_2fa(user_id):
    """Setup two-factor authentication for user"""
    import pyotp
    import qrcode
    import io

    user = get_model("base.user").browse(user_id)

    # Generate secret key
    secret_key = pyotp.random_base32()

    # Create TOTP object
    totp = pyotp.TOTP(secret_key)

    # Generate QR code
    provisioning_uri = totp.provisioning_uri(
        user.email,
        issuer_name="Netforce ERP"
    )

    qr = qrcode.QRCode(version=1, box_size=10, border=5)
    qr.add_data(provisioning_uri)
    qr.make(fit=True)

    # Save QR code image
    img = qr.make_image(fill_color="black", back_color="white")
    buffer = io.BytesIO()
    img.save(buffer, format='PNG')
    qr_code_data = buffer.getvalue()

    # Store secret key
    user.write({
        "totp_secret": secret_key,
        "require_2fa": True
    })

    return {
        "secret_key": secret_key,
        "qr_code": qr_code_data,
        "backup_codes": generate_backup_codes(user_id)
    }

def verify_2fa_token(user_id, token):
    """Verify 2FA token"""
    import pyotp

    user = get_model("base.user").browse(user_id)

    if not user.totp_secret:
        return False

    totp = pyotp.TOTP(user.totp_secret)
    return totp.verify(token, valid_window=1)  # Allow 30-second window

def generate_backup_codes(user_id, count=10):
    """Generate backup codes for 2FA recovery"""
    import secrets
    import string

    codes = []
    for _ in range(count):
        code = ''.join(secrets.choice(string.ascii_uppercase + string.digits) for _ in range(8))
        codes.append(code)

    # Store hashed backup codes
    hashed_codes = [hash_password(code) for code in codes]
    get_model("user.backup.code").create([
        {"user_id": user_id, "code_hash": hash_code, "used": False}
        for hash_code in hashed_codes
    ])

    return codes

Access Control

1. Permission Checking

Implement comprehensive permission checking:

def check_access(user_id, model_name, access_type, record_ids=None, context={}):
    """Check if user has access to perform action"""
    user = get_model("base.user").browse(user_id)

    # Superuser has all access
    if user.login == 'admin':
        return True

    # Get user permissions through groups
    user_permissions = get_user_permissions(user_id)

    # Check model-level permission
    model_key = (model_name, access_type)
    if model_key not in user_permissions and (model_name, 'admin') not in user_permissions:
        return False

    # Check record-level rules if record_ids provided
    if record_ids:
        return check_record_access(user_id, model_name, access_type, record_ids)

    return True

def get_user_permissions(user_id):
    """Get all permissions for user"""
    user = get_model("base.user").browse(user_id)
    permissions = set()

    # Direct user permissions
    for perm in user.permissions:
        permissions.add((perm.model, perm.access_type))

    # Group permissions
    for group in user.groups:
        group_perms = group.get_all_permissions([group.id])
        permissions.update(group_perms)

    return permissions

def check_record_access(user_id, model_name, access_type, record_ids):
    """Check record-level access rules"""
    model = get_model(model_name)
    user = get_model("base.user").browse(user_id)

    # Get applicable record rules
    rules = []
    for group in user.groups:
        for perm in group.permissions:
            if perm.model == model_name:
                for rule in perm.record_rules:
                    if getattr(rule, f"apply_{access_type}", False):
                        rules.append(rule)

    if not rules:
        return True  # No rules = full access

    # Apply rules - record must match at least one rule
    for record_id in record_ids:
        record = model.browse(record_id)
        has_access = False

        for rule in rules:
            if evaluate_domain(rule.domain, record, user):
                has_access = True
                break

        if not has_access:
            return False

    return True

def evaluate_domain(domain_str, record, user):
    """Evaluate domain filter against record"""
    try:
        domain = json.loads(domain_str)
        context = {
            "user": user,
            "company_id": get_active_company(),
            "date": datetime.now().strftime("%Y-%m-%d")
        }
        return model.search([["id", "=", record.id]] + domain, context=context)
    except:
        return False  # Invalid domain = no access

2. Field-level Security

Implement field-level access control:

def filter_fields_by_access(user_id, model_name, field_names, access_type="read"):
    """Filter fields based on user access"""
    if not field_names:
        return []

    user_permissions = get_user_permissions(user_id)
    allowed_fields = []

    for field_name in field_names:
        if check_field_access(user_id, model_name, field_name, access_type):
            allowed_fields.append(field_name)

    return allowed_fields

def check_field_access(user_id, model_name, field_name, access_type):
    """Check access to specific field"""
    user = get_model("base.user").browse(user_id)

    # Check field access rules
    for group in user.groups:
        for perm in group.permissions:
            if perm.model == model_name:
                for field_access in perm.field_access:
                    if field_access.field_name == field_name:
                        if field_access.access_type == "hidden":
                            return False
                        elif access_type == "write" and field_access.access_type == "read":
                            return False

    return True

def secure_read(self, ids, fields=None, context={}):
    """Secure read method with field filtering"""
    user_id = get_active_user()

    # Check read permission
    if not check_access(user_id, self._name, "read", ids):
        raise Exception("Access denied")

    # Filter fields by access
    if fields:
        allowed_fields = filter_fields_by_access(user_id, self._name, fields, "read")
        fields = allowed_fields

    # Apply record rules
    accessible_ids = []
    for record_id in ids:
        if check_record_access(user_id, self._name, "read", [record_id]):
            accessible_ids.append(record_id)

    return super().read(accessible_ids, fields, context)

3. Menu and UI Security

Control access to UI elements:

def get_user_menu(user_id, context={}):
    """Get menu items accessible to user"""
    user = get_model("base.user").browse(user_id)
    accessible_menus = []

    all_menus = get_model("ui.menu").search_browse([
        ["active", "=", True]
    ], order="sequence")

    for menu in all_menus:
        if check_menu_access(user_id, menu):
            accessible_menus.append({
                "id": menu.id,
                "name": menu.name,
                "action": menu.action,
                "icon": menu.icon,
                "parent_id": menu.parent_id.id if menu.parent_id else None
            })

    return build_menu_tree(accessible_menus)

def check_menu_access(user_id, menu):
    """Check if user can access menu item"""
    # Check required groups
    if menu.groups:
        user_groups = get_model("base.user").browse(user_id).groups
        user_group_ids = {g.id for g in user_groups}
        menu_group_ids = {g.id for g in menu.groups}

        if not user_group_ids & menu_group_ids:
            return False

    # Check model access if menu links to model
    if menu.model:
        return check_access(user_id, menu.model, "read")

    return True

def secure_form_layout(layout, user_id, model_name):
    """Filter form layout based on user permissions"""
    if not layout:
        return layout

    # Parse layout XML
    import xml.etree.ElementTree as ET
    root = ET.fromstring(layout)

    # Remove fields user cannot access
    for field_elem in root.iter("field"):
        field_name = field_elem.get("name")
        if field_name and not check_field_access(user_id, model_name, field_name, "read"):
            field_elem.getparent().remove(field_elem)

    # Remove buttons user cannot use
    for button_elem in root.iter("button"):
        method = button_elem.get("method")
        if method and not check_method_access(user_id, model_name, method):
            button_elem.getparent().remove(button_elem)

    return ET.tostring(root, encoding='unicode')

Data Protection

1. Audit Trail

Implement comprehensive audit logging:

class AuditLog(Model):
    _name = "audit.log"
    _fields = {
        "date": fields.DateTime("Date", required=True),
        "user_id": fields.Many2One("base.user", "User"),
        "model": fields.Char("Model", required=True),
        "record_id": fields.Integer("Record ID"),
        "action": fields.Selection([
            ["create", "Create"],
            ["read", "Read"],
            ["write", "Update"],
            ["delete", "Delete"],
            ["login", "Login"],
            ["logout", "Logout"]
        ], "Action"),
        "old_values": fields.Text("Old Values"),
        "new_values": fields.Text("New Values"),
        "ip_address": fields.Char("IP Address"),
        "user_agent": fields.Text("User Agent"),
        "session_id": fields.Char("Session ID"),
        "company_id": fields.Many2One("company", "Company"),
    }

def log_audit_trail(action, model_name, record_id, old_values=None, new_values=None):
    """Log audit trail entry"""
    get_model("audit.log").create({
        "date": time.strftime("%Y-%m-%d %H:%M:%S"),
        "user_id": get_active_user(),
        "model": model_name,
        "record_id": record_id,
        "action": action,
        "old_values": json.dumps(old_values) if old_values else None,
        "new_values": json.dumps(new_values) if new_values else None,
        "ip_address": get_request_ip(),
        "user_agent": get_request_user_agent(),
        "session_id": get_session_id(),
        "company_id": get_active_company()
    })

def secure_write(self, ids, vals, context={}):
    """Secure write with audit trail"""
    user_id = get_active_user()

    # Check write permission
    if not check_access(user_id, self._name, "write", ids):
        raise Exception("Access denied")

    # Filter writable fields
    writable_fields = filter_fields_by_access(user_id, self._name, vals.keys(), "write")
    filtered_vals = {k: v for k, v in vals.items() if k in writable_fields}

    # Log changes
    for record_id in ids:
        old_values = self.read(record_id, list(filtered_vals.keys()))[0]

        # Perform update
        result = super().write([record_id], filtered_vals, context)

        # Log audit trail
        log_audit_trail("write", self._name, record_id, old_values, filtered_vals)

    return result

2. Data Encryption

Implement data encryption for sensitive fields:

def encrypt_sensitive_data(value, field_name):
    """Encrypt sensitive field data"""
    from cryptography.fernet import Fernet
    import base64

    # Get encryption key from settings
    key = get_encryption_key()
    cipher_suite = Fernet(key)

    # Encrypt value
    encrypted_value = cipher_suite.encrypt(str(value).encode())
    return base64.b64encode(encrypted_value).decode()

def decrypt_sensitive_data(encrypted_value, field_name):
    """Decrypt sensitive field data"""
    from cryptography.fernet import Fernet
    import base64

    try:
        key = get_encryption_key()
        cipher_suite = Fernet(key)

        # Decrypt value
        encrypted_bytes = base64.b64decode(encrypted_value)
        decrypted_value = cipher_suite.decrypt(encrypted_bytes)
        return decrypted_value.decode()
    except:
        return "[ENCRYPTED]"  # Return placeholder if decryption fails

def get_encryption_key():
    """Get or generate encryption key"""
    key_file = "/etc/netforce/encryption.key"

    if os.path.exists(key_file):
        with open(key_file, 'rb') as f:
            return f.read()
    else:
        # Generate new key
        from cryptography.fernet import Fernet
        key = Fernet.generate_key()

        os.makedirs(os.path.dirname(key_file), exist_ok=True)
        with open(key_file, 'wb') as f:
            f.write(key)

        return key

# Example usage in model
class Contact(Model):
    _name = "contact"
    _encrypted_fields = ["tax_id", "bank_account", "ssn"]

    def create(self, vals, context={}):
        # Encrypt sensitive fields
        for field_name in self._encrypted_fields:
            if field_name in vals:
                vals[field_name] = encrypt_sensitive_data(vals[field_name], field_name)

        return super().create(vals, context)

    def read(self, ids, fields=None, context={}):
        result = super().read(ids, fields, context)

        # Decrypt sensitive fields if user has permission
        if check_access(get_active_user(), self._name, "decrypt_sensitive"):
            for record in result:
                for field_name in self._encrypted_fields:
                    if field_name in record and record[field_name]:
                        record[field_name] = decrypt_sensitive_data(record[field_name], field_name)

        return result

3. Secure Communication

Implement secure API communication:

def generate_api_key(user_id, name=""):
    """Generate API key for user"""
    import secrets

    key = secrets.token_urlsafe(32)

    get_model("api.key").create({
        "user_id": user_id,
        "name": name,
        "key": hash_password(key),  # Store hashed key
        "created_date": time.strftime("%Y-%m-%d %H:%M:%S"),
        "active": True,
        "permissions": "read,write"  # Default permissions
    })

    return key  # Return unhashed key only once

def authenticate_api_key(api_key):
    """Authenticate API request using key"""
    if not api_key:
        return None

    # Find API key record
    api_keys = get_model("api.key").search_browse([
        ["active", "=", True]
    ])

    for key_record in api_keys:
        if verify_password(api_key, key_record.key):
            # Update last used
            key_record.write({
                "last_used": time.strftime("%Y-%m-%d %H:%M:%S"),
                "usage_count": key_record.usage_count + 1
            })

            return key_record.user_id.id

    return None

def require_https(func):
    """Decorator to require HTTPS for sensitive operations"""
    def wrapper(*args, **kwargs):
        request = get_request()
        if not request.is_secure() and not settings.DEBUG:
            raise Exception("HTTPS required for this operation")
        return func(*args, **kwargs)
    return wrapper

@require_https
def change_password(user_id, old_password, new_password):
    """Change user password securely"""
    user = get_model("base.user").browse(user_id)

    # Verify old password
    if not verify_password(old_password, user.password):
        log_security_event("password_change_failed", {"user_id": user_id})
        raise Exception("Invalid current password")

    # Validate new password strength
    if not validate_password_strength(new_password):
        raise Exception("Password does not meet security requirements")

    # Update password
    user.write({
        "password": hash_password(new_password),
        "password_expires": (datetime.now() + timedelta(days=90)).strftime("%Y-%m-%d"),
        "failed_login_attempts": 0
    })

    # Log password change
    log_security_event("password_changed", {"user_id": user_id})

    return True

Best Practices

1. Security Configuration

Configure security settings properly:

def setup_security_settings():
    """Setup recommended security settings"""
    settings = {
        # Password policy
        "password_min_length": 8,
        "password_require_uppercase": True,
        "password_require_lowercase": True,
        "password_require_numbers": True,
        "password_require_symbols": True,
        "password_max_age_days": 90,

        # Session settings
        "session_timeout_minutes": 480,  # 8 hours
        "session_require_ip_match": True,
        "max_concurrent_sessions": 3,

        # Login security
        "max_failed_attempts": 5,
        "lockout_duration_minutes": 30,
        "require_2fa_for_admin": True,

        # Audit settings
        "log_all_access": True,
        "audit_retention_days": 2555,  # 7 years
        "log_failed_access": True,

        # API security
        "api_rate_limit_per_hour": 1000,
        "require_api_key": True,
        "api_key_rotation_days": 30,
    }

    for key, value in settings.items():
        get_model("settings").set_value(key, value)

def validate_password_strength(password):
    """Validate password meets security requirements"""
    import re

    settings = get_model("settings")

    if len(password) < settings.get_value("password_min_length", 8):
        return False

    if settings.get_value("password_require_uppercase", True):
        if not re.search(r'[A-Z]', password):
            return False

    if settings.get_value("password_require_lowercase", True):
        if not re.search(r'[a-z]', password):
            return False

    if settings.get_value("password_require_numbers", True):
        if not re.search(r'\d', password):
            return False

    if settings.get_value("password_require_symbols", True):
        if not re.search(r'[!@#$%^&*()_+\-=\[\]{};\':"\\|,.<>\?]', password):
            return False

    return True

2. Security Monitoring

Implement security monitoring and alerting:

def monitor_security_events():
    """Monitor for security events and alerts"""

    # Check for suspicious login patterns
    check_brute_force_attempts()

    # Check for privilege escalation
    check_privilege_changes()

    # Check for data export anomalies
    check_data_export_patterns()

    # Check for after-hours access
    check_unusual_access_times()

def check_brute_force_attempts():
    """Check for brute force login attempts"""
    recent_failures = database.get("""
        SELECT ip_address, COUNT(*) as attempt_count
        FROM security_log 
        WHERE event_type = 'auth_failed'
        AND date >= %s
        GROUP BY ip_address
        HAVING COUNT(*) >= 10
    """, [(datetime.now() - timedelta(hours=1)).strftime("%Y-%m-%d %H:%M:%S")])

    for ip, count in recent_failures:
        send_security_alert(f"Brute force detected from {ip}: {count} failed attempts")

        # Auto-block IP
        create_ip_ban(ip, duration_hours=24)

def send_security_alert(message):
    """Send security alert to administrators"""
    admin_emails = get_model("base.user").search_browse([
        ["groups.code", "=", "security_admin"]
    ])

    for admin in admin_emails:
        if admin.email:
            get_model("email.message").create({
                "to": admin.email,
                "subject": "Security Alert - Netforce",
                "body": f"<p><strong>Security Alert:</strong></p><p>{message}</p>",
                "priority": "high",
                "type": "out"
            })

def create_security_report():
    """Generate security report"""
    report_data = {
        "date": datetime.now().strftime("%Y-%m-%d"),
        "total_users": get_model("base.user").search_count([["active", "=", True]]),
        "failed_logins_24h": get_failed_login_count(24),
        "successful_logins_24h": get_successful_login_count(24), 
        "locked_accounts": get_locked_account_count(),
        "active_sessions": get_active_session_count(),
        "recent_password_changes": get_password_change_count(7),
        "admin_actions": get_admin_action_count(7),
        "top_accessed_models": get_top_accessed_models(7),
    }

    return report_data

Next Steps