Security & Access Control¶
Netforce provides a comprehensive security framework that includes user authentication, role-based access control, permission management, and data protection. The system supports multi-level security with fine-grained permissions.
Overview¶
The security system consists of several key components:
- User authentication - Login, sessions, and password management
- Access control - Role-based permissions and field-level security
- Data protection - Encryption, audit trails, and secure communication
- Company isolation - Multi-tenant data segregation
Core Security Models¶
1. Users and Authentication (base.user)¶
User management and authentication:
class User(Model):
_name = "base.user"
_fields = {
"name": fields.Char("Full Name", required=True),
"login": fields.Char("Login", required=True),
"password": fields.Char("Password"),
"email": fields.Char("Email"),
"active": fields.Boolean("Active"),
"groups": fields.Many2Many("user.group", "User Groups"),
"companies": fields.Many2Many("company", "Companies"),
"default_company_id": fields.Many2One("company", "Default Company"),
"last_login": fields.DateTime("Last Login"),
"failed_login_attempts": fields.Integer("Failed Login Attempts"),
"locked_until": fields.DateTime("Locked Until"),
"password_expires": fields.Date("Password Expires"),
"require_2fa": fields.Boolean("Require Two-Factor Auth"),
"api_keys": fields.One2Many("api.key", "user_id", "API Keys"),
"access_log": fields.One2Many("access.log", "user_id", "Access Log"),
}
_defaults = {
"active": True,
"failed_login_attempts": 0,
}
2. User Groups (user.group)¶
Role-based access control through groups:
class UserGroup(Model):
_name = "user.group"
_fields = {
"name": fields.Char("Group Name", required=True),
"code": fields.Char("Group Code"),
"description": fields.Text("Description"),
"users": fields.Many2Many("base.user", "Users"),
"permissions": fields.One2Many("permission", "group_id", "Permissions"),
"parent_id": fields.Many2One("user.group", "Parent Group"),
"child_ids": fields.One2Many("user.group", "parent_id", "Child Groups"),
"company_id": fields.Many2One("company", "Company"),
}
def get_all_permissions(self, ids, context={}):
"""Get all permissions including inherited from parent groups"""
permissions = set()
for group in self.browse(ids):
# Add direct permissions
for perm in group.permissions:
permissions.add((perm.model, perm.access_type))
# Add inherited permissions
if group.parent_id:
parent_perms = self.get_all_permissions([group.parent_id.id])
permissions.update(parent_perms)
return permissions
3. Permissions (permission)¶
Fine-grained permission management:
class Permission(Model):
_name = "permission"
_fields = {
"name": fields.Char("Permission Name"),
"model": fields.Char("Model Name", required=True),
"group_id": fields.Many2One("user.group", "User Group"),
"user_id": fields.Many2One("base.user", "User"),
"access_type": fields.Selection([
["read", "Read"],
["write", "Write"],
["create", "Create"],
["delete", "Delete"],
["admin", "Admin"]
], "Access Type"),
"field_access": fields.One2Many("field.access", "permission_id", "Field Access"),
"record_rules": fields.One2Many("record.rule", "permission_id", "Record Rules"),
}
class FieldAccess(Model):
_name = "field.access"
_fields = {
"permission_id": fields.Many2One("permission", "Permission"),
"field_name": fields.Char("Field Name", required=True),
"access_type": fields.Selection([
["read", "Read Only"],
["write", "Read/Write"],
["hidden", "Hidden"]
], "Access Type"),
}
class RecordRule(Model):
_name = "record.rule"
_fields = {
"permission_id": fields.Many2One("permission", "Permission"),
"name": fields.Char("Rule Name"),
"domain": fields.Text("Domain Filter"),
"apply_create": fields.Boolean("Apply on Create"),
"apply_read": fields.Boolean("Apply on Read"),
"apply_write": fields.Boolean("Apply on Write"),
"apply_delete": fields.Boolean("Apply on Delete"),
}
User Authentication¶
1. Login and Session Management¶
Implement secure authentication:
def authenticate_user(login, password, context={}):
"""Authenticate user login"""
import hashlib
import secrets
# Find user
user_ids = get_model("base.user").search([
["login", "=", login],
["active", "=", True]
])
if not user_ids:
# Log failed attempt
log_security_event("auth_failed", {"login": login, "reason": "user_not_found"})
raise Exception("Invalid credentials")
user = get_model("base.user").browse(user_ids[0])
# Check if account is locked
if user.locked_until and user.locked_until > datetime.now().strftime("%Y-%m-%d %H:%M:%S"):
raise Exception("Account temporarily locked")
# Verify password
if not verify_password(password, user.password):
# Increment failed attempts
failed_attempts = user.failed_login_attempts + 1
user.write({"failed_login_attempts": failed_attempts})
# Lock account after 5 failed attempts
if failed_attempts >= 5:
lock_until = (datetime.now() + timedelta(minutes=30)).strftime("%Y-%m-%d %H:%M:%S")
user.write({"locked_until": lock_until})
log_security_event("auth_failed", {"user_id": user.id, "attempts": failed_attempts})
raise Exception("Invalid credentials")
# Check password expiry
if user.password_expires and user.password_expires < datetime.now().strftime("%Y-%m-%d"):
raise Exception("Password expired")
# Reset failed attempts on successful login
user.write({
"failed_login_attempts": 0,
"locked_until": None,
"last_login": time.strftime("%Y-%m-%d %H:%M:%S")
})
# Create session
session_id = create_user_session(user.id, context)
# Log successful login
log_security_event("auth_success", {"user_id": user.id})
return {
"user_id": user.id,
"session_id": session_id,
"companies": [c.id for c in user.companies],
"default_company": user.default_company_id.id if user.default_company_id else None
}
def verify_password(plain_password, hashed_password):
"""Verify password against hash"""
import bcrypt
return bcrypt.checkpw(plain_password.encode('utf-8'), hashed_password.encode('utf-8'))
def hash_password(password):
"""Hash password securely"""
import bcrypt
salt = bcrypt.gensalt()
return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8')
2. Two-Factor Authentication¶
Implement 2FA for enhanced security:
def setup_2fa(user_id):
"""Setup two-factor authentication for user"""
import pyotp
import qrcode
import io
user = get_model("base.user").browse(user_id)
# Generate secret key
secret_key = pyotp.random_base32()
# Create TOTP object
totp = pyotp.TOTP(secret_key)
# Generate QR code
provisioning_uri = totp.provisioning_uri(
user.email,
issuer_name="Netforce ERP"
)
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(provisioning_uri)
qr.make(fit=True)
# Save QR code image
img = qr.make_image(fill_color="black", back_color="white")
buffer = io.BytesIO()
img.save(buffer, format='PNG')
qr_code_data = buffer.getvalue()
# Store secret key
user.write({
"totp_secret": secret_key,
"require_2fa": True
})
return {
"secret_key": secret_key,
"qr_code": qr_code_data,
"backup_codes": generate_backup_codes(user_id)
}
def verify_2fa_token(user_id, token):
"""Verify 2FA token"""
import pyotp
user = get_model("base.user").browse(user_id)
if not user.totp_secret:
return False
totp = pyotp.TOTP(user.totp_secret)
return totp.verify(token, valid_window=1) # Allow 30-second window
def generate_backup_codes(user_id, count=10):
"""Generate backup codes for 2FA recovery"""
import secrets
import string
codes = []
for _ in range(count):
code = ''.join(secrets.choice(string.ascii_uppercase + string.digits) for _ in range(8))
codes.append(code)
# Store hashed backup codes
hashed_codes = [hash_password(code) for code in codes]
get_model("user.backup.code").create([
{"user_id": user_id, "code_hash": hash_code, "used": False}
for hash_code in hashed_codes
])
return codes
Access Control¶
1. Permission Checking¶
Implement comprehensive permission checking:
def check_access(user_id, model_name, access_type, record_ids=None, context={}):
"""Check if user has access to perform action"""
user = get_model("base.user").browse(user_id)
# Superuser has all access
if user.login == 'admin':
return True
# Get user permissions through groups
user_permissions = get_user_permissions(user_id)
# Check model-level permission
model_key = (model_name, access_type)
if model_key not in user_permissions and (model_name, 'admin') not in user_permissions:
return False
# Check record-level rules if record_ids provided
if record_ids:
return check_record_access(user_id, model_name, access_type, record_ids)
return True
def get_user_permissions(user_id):
"""Get all permissions for user"""
user = get_model("base.user").browse(user_id)
permissions = set()
# Direct user permissions
for perm in user.permissions:
permissions.add((perm.model, perm.access_type))
# Group permissions
for group in user.groups:
group_perms = group.get_all_permissions([group.id])
permissions.update(group_perms)
return permissions
def check_record_access(user_id, model_name, access_type, record_ids):
"""Check record-level access rules"""
model = get_model(model_name)
user = get_model("base.user").browse(user_id)
# Get applicable record rules
rules = []
for group in user.groups:
for perm in group.permissions:
if perm.model == model_name:
for rule in perm.record_rules:
if getattr(rule, f"apply_{access_type}", False):
rules.append(rule)
if not rules:
return True # No rules = full access
# Apply rules - record must match at least one rule
for record_id in record_ids:
record = model.browse(record_id)
has_access = False
for rule in rules:
if evaluate_domain(rule.domain, record, user):
has_access = True
break
if not has_access:
return False
return True
def evaluate_domain(domain_str, record, user):
"""Evaluate domain filter against record"""
try:
domain = json.loads(domain_str)
context = {
"user": user,
"company_id": get_active_company(),
"date": datetime.now().strftime("%Y-%m-%d")
}
return model.search([["id", "=", record.id]] + domain, context=context)
except:
return False # Invalid domain = no access
2. Field-level Security¶
Implement field-level access control:
def filter_fields_by_access(user_id, model_name, field_names, access_type="read"):
"""Filter fields based on user access"""
if not field_names:
return []
user_permissions = get_user_permissions(user_id)
allowed_fields = []
for field_name in field_names:
if check_field_access(user_id, model_name, field_name, access_type):
allowed_fields.append(field_name)
return allowed_fields
def check_field_access(user_id, model_name, field_name, access_type):
"""Check access to specific field"""
user = get_model("base.user").browse(user_id)
# Check field access rules
for group in user.groups:
for perm in group.permissions:
if perm.model == model_name:
for field_access in perm.field_access:
if field_access.field_name == field_name:
if field_access.access_type == "hidden":
return False
elif access_type == "write" and field_access.access_type == "read":
return False
return True
def secure_read(self, ids, fields=None, context={}):
"""Secure read method with field filtering"""
user_id = get_active_user()
# Check read permission
if not check_access(user_id, self._name, "read", ids):
raise Exception("Access denied")
# Filter fields by access
if fields:
allowed_fields = filter_fields_by_access(user_id, self._name, fields, "read")
fields = allowed_fields
# Apply record rules
accessible_ids = []
for record_id in ids:
if check_record_access(user_id, self._name, "read", [record_id]):
accessible_ids.append(record_id)
return super().read(accessible_ids, fields, context)
3. Menu and UI Security¶
Control access to UI elements:
def get_user_menu(user_id, context={}):
"""Get menu items accessible to user"""
user = get_model("base.user").browse(user_id)
accessible_menus = []
all_menus = get_model("ui.menu").search_browse([
["active", "=", True]
], order="sequence")
for menu in all_menus:
if check_menu_access(user_id, menu):
accessible_menus.append({
"id": menu.id,
"name": menu.name,
"action": menu.action,
"icon": menu.icon,
"parent_id": menu.parent_id.id if menu.parent_id else None
})
return build_menu_tree(accessible_menus)
def check_menu_access(user_id, menu):
"""Check if user can access menu item"""
# Check required groups
if menu.groups:
user_groups = get_model("base.user").browse(user_id).groups
user_group_ids = {g.id for g in user_groups}
menu_group_ids = {g.id for g in menu.groups}
if not user_group_ids & menu_group_ids:
return False
# Check model access if menu links to model
if menu.model:
return check_access(user_id, menu.model, "read")
return True
def secure_form_layout(layout, user_id, model_name):
"""Filter form layout based on user permissions"""
if not layout:
return layout
# Parse layout XML
import xml.etree.ElementTree as ET
root = ET.fromstring(layout)
# Remove fields user cannot access
for field_elem in root.iter("field"):
field_name = field_elem.get("name")
if field_name and not check_field_access(user_id, model_name, field_name, "read"):
field_elem.getparent().remove(field_elem)
# Remove buttons user cannot use
for button_elem in root.iter("button"):
method = button_elem.get("method")
if method and not check_method_access(user_id, model_name, method):
button_elem.getparent().remove(button_elem)
return ET.tostring(root, encoding='unicode')
Data Protection¶
1. Audit Trail¶
Implement comprehensive audit logging:
class AuditLog(Model):
_name = "audit.log"
_fields = {
"date": fields.DateTime("Date", required=True),
"user_id": fields.Many2One("base.user", "User"),
"model": fields.Char("Model", required=True),
"record_id": fields.Integer("Record ID"),
"action": fields.Selection([
["create", "Create"],
["read", "Read"],
["write", "Update"],
["delete", "Delete"],
["login", "Login"],
["logout", "Logout"]
], "Action"),
"old_values": fields.Text("Old Values"),
"new_values": fields.Text("New Values"),
"ip_address": fields.Char("IP Address"),
"user_agent": fields.Text("User Agent"),
"session_id": fields.Char("Session ID"),
"company_id": fields.Many2One("company", "Company"),
}
def log_audit_trail(action, model_name, record_id, old_values=None, new_values=None):
"""Log audit trail entry"""
get_model("audit.log").create({
"date": time.strftime("%Y-%m-%d %H:%M:%S"),
"user_id": get_active_user(),
"model": model_name,
"record_id": record_id,
"action": action,
"old_values": json.dumps(old_values) if old_values else None,
"new_values": json.dumps(new_values) if new_values else None,
"ip_address": get_request_ip(),
"user_agent": get_request_user_agent(),
"session_id": get_session_id(),
"company_id": get_active_company()
})
def secure_write(self, ids, vals, context={}):
"""Secure write with audit trail"""
user_id = get_active_user()
# Check write permission
if not check_access(user_id, self._name, "write", ids):
raise Exception("Access denied")
# Filter writable fields
writable_fields = filter_fields_by_access(user_id, self._name, vals.keys(), "write")
filtered_vals = {k: v for k, v in vals.items() if k in writable_fields}
# Log changes
for record_id in ids:
old_values = self.read(record_id, list(filtered_vals.keys()))[0]
# Perform update
result = super().write([record_id], filtered_vals, context)
# Log audit trail
log_audit_trail("write", self._name, record_id, old_values, filtered_vals)
return result
2. Data Encryption¶
Implement data encryption for sensitive fields:
def encrypt_sensitive_data(value, field_name):
"""Encrypt sensitive field data"""
from cryptography.fernet import Fernet
import base64
# Get encryption key from settings
key = get_encryption_key()
cipher_suite = Fernet(key)
# Encrypt value
encrypted_value = cipher_suite.encrypt(str(value).encode())
return base64.b64encode(encrypted_value).decode()
def decrypt_sensitive_data(encrypted_value, field_name):
"""Decrypt sensitive field data"""
from cryptography.fernet import Fernet
import base64
try:
key = get_encryption_key()
cipher_suite = Fernet(key)
# Decrypt value
encrypted_bytes = base64.b64decode(encrypted_value)
decrypted_value = cipher_suite.decrypt(encrypted_bytes)
return decrypted_value.decode()
except:
return "[ENCRYPTED]" # Return placeholder if decryption fails
def get_encryption_key():
"""Get or generate encryption key"""
key_file = "/etc/netforce/encryption.key"
if os.path.exists(key_file):
with open(key_file, 'rb') as f:
return f.read()
else:
# Generate new key
from cryptography.fernet import Fernet
key = Fernet.generate_key()
os.makedirs(os.path.dirname(key_file), exist_ok=True)
with open(key_file, 'wb') as f:
f.write(key)
return key
# Example usage in model
class Contact(Model):
_name = "contact"
_encrypted_fields = ["tax_id", "bank_account", "ssn"]
def create(self, vals, context={}):
# Encrypt sensitive fields
for field_name in self._encrypted_fields:
if field_name in vals:
vals[field_name] = encrypt_sensitive_data(vals[field_name], field_name)
return super().create(vals, context)
def read(self, ids, fields=None, context={}):
result = super().read(ids, fields, context)
# Decrypt sensitive fields if user has permission
if check_access(get_active_user(), self._name, "decrypt_sensitive"):
for record in result:
for field_name in self._encrypted_fields:
if field_name in record and record[field_name]:
record[field_name] = decrypt_sensitive_data(record[field_name], field_name)
return result
3. Secure Communication¶
Implement secure API communication:
def generate_api_key(user_id, name=""):
"""Generate API key for user"""
import secrets
key = secrets.token_urlsafe(32)
get_model("api.key").create({
"user_id": user_id,
"name": name,
"key": hash_password(key), # Store hashed key
"created_date": time.strftime("%Y-%m-%d %H:%M:%S"),
"active": True,
"permissions": "read,write" # Default permissions
})
return key # Return unhashed key only once
def authenticate_api_key(api_key):
"""Authenticate API request using key"""
if not api_key:
return None
# Find API key record
api_keys = get_model("api.key").search_browse([
["active", "=", True]
])
for key_record in api_keys:
if verify_password(api_key, key_record.key):
# Update last used
key_record.write({
"last_used": time.strftime("%Y-%m-%d %H:%M:%S"),
"usage_count": key_record.usage_count + 1
})
return key_record.user_id.id
return None
def require_https(func):
"""Decorator to require HTTPS for sensitive operations"""
def wrapper(*args, **kwargs):
request = get_request()
if not request.is_secure() and not settings.DEBUG:
raise Exception("HTTPS required for this operation")
return func(*args, **kwargs)
return wrapper
@require_https
def change_password(user_id, old_password, new_password):
"""Change user password securely"""
user = get_model("base.user").browse(user_id)
# Verify old password
if not verify_password(old_password, user.password):
log_security_event("password_change_failed", {"user_id": user_id})
raise Exception("Invalid current password")
# Validate new password strength
if not validate_password_strength(new_password):
raise Exception("Password does not meet security requirements")
# Update password
user.write({
"password": hash_password(new_password),
"password_expires": (datetime.now() + timedelta(days=90)).strftime("%Y-%m-%d"),
"failed_login_attempts": 0
})
# Log password change
log_security_event("password_changed", {"user_id": user_id})
return True
Best Practices¶
1. Security Configuration¶
Configure security settings properly:
def setup_security_settings():
"""Setup recommended security settings"""
settings = {
# Password policy
"password_min_length": 8,
"password_require_uppercase": True,
"password_require_lowercase": True,
"password_require_numbers": True,
"password_require_symbols": True,
"password_max_age_days": 90,
# Session settings
"session_timeout_minutes": 480, # 8 hours
"session_require_ip_match": True,
"max_concurrent_sessions": 3,
# Login security
"max_failed_attempts": 5,
"lockout_duration_minutes": 30,
"require_2fa_for_admin": True,
# Audit settings
"log_all_access": True,
"audit_retention_days": 2555, # 7 years
"log_failed_access": True,
# API security
"api_rate_limit_per_hour": 1000,
"require_api_key": True,
"api_key_rotation_days": 30,
}
for key, value in settings.items():
get_model("settings").set_value(key, value)
def validate_password_strength(password):
"""Validate password meets security requirements"""
import re
settings = get_model("settings")
if len(password) < settings.get_value("password_min_length", 8):
return False
if settings.get_value("password_require_uppercase", True):
if not re.search(r'[A-Z]', password):
return False
if settings.get_value("password_require_lowercase", True):
if not re.search(r'[a-z]', password):
return False
if settings.get_value("password_require_numbers", True):
if not re.search(r'\d', password):
return False
if settings.get_value("password_require_symbols", True):
if not re.search(r'[!@#$%^&*()_+\-=\[\]{};\':"\\|,.<>\?]', password):
return False
return True
2. Security Monitoring¶
Implement security monitoring and alerting:
def monitor_security_events():
"""Monitor for security events and alerts"""
# Check for suspicious login patterns
check_brute_force_attempts()
# Check for privilege escalation
check_privilege_changes()
# Check for data export anomalies
check_data_export_patterns()
# Check for after-hours access
check_unusual_access_times()
def check_brute_force_attempts():
"""Check for brute force login attempts"""
recent_failures = database.get("""
SELECT ip_address, COUNT(*) as attempt_count
FROM security_log
WHERE event_type = 'auth_failed'
AND date >= %s
GROUP BY ip_address
HAVING COUNT(*) >= 10
""", [(datetime.now() - timedelta(hours=1)).strftime("%Y-%m-%d %H:%M:%S")])
for ip, count in recent_failures:
send_security_alert(f"Brute force detected from {ip}: {count} failed attempts")
# Auto-block IP
create_ip_ban(ip, duration_hours=24)
def send_security_alert(message):
"""Send security alert to administrators"""
admin_emails = get_model("base.user").search_browse([
["groups.code", "=", "security_admin"]
])
for admin in admin_emails:
if admin.email:
get_model("email.message").create({
"to": admin.email,
"subject": "Security Alert - Netforce",
"body": f"<p><strong>Security Alert:</strong></p><p>{message}</p>",
"priority": "high",
"type": "out"
})
def create_security_report():
"""Generate security report"""
report_data = {
"date": datetime.now().strftime("%Y-%m-%d"),
"total_users": get_model("base.user").search_count([["active", "=", True]]),
"failed_logins_24h": get_failed_login_count(24),
"successful_logins_24h": get_successful_login_count(24),
"locked_accounts": get_locked_account_count(),
"active_sessions": get_active_session_count(),
"recent_password_changes": get_password_change_count(7),
"admin_actions": get_admin_action_count(7),
"top_accessed_models": get_top_accessed_models(7),
}
return report_data
Next Steps¶
- Learn about Multi-Company for company-specific security
- Explore Background Jobs for secure task processing
- Check Advanced Patterns for complex security scenarios
- Review Testing for security testing practices