security

veerababumanyam's avatarfrom veerababumanyam

Security and data protection guidelines for RawDrive. Use when implementing authentication, handling user data, validating inputs, or reviewing security-sensitive code.

0stars🔀0forks📁View on GitHub🕐Updated Jan 9, 2026

When & Why to Use This Skill

The RawDrive Security and Data Protection skill is a comprehensive technical guide designed to help developers implement and maintain high-security standards. It provides production-ready code patterns for JWT authentication, Argon2 password hashing, Role-Based Access Control (RBAC), and AES-GCM encryption. By focusing on critical areas like multi-tenant workspace isolation and rigorous input validation, this skill ensures that applications are resilient against common vulnerabilities while maintaining compliance with data protection regulations like GDPR and SOC2.

Use Cases

  • Implementing secure authentication and authorization flows using JWT tokens, 2FA, and granular RBAC permissions.
  • Enforcing strict multi-tenant data isolation to prevent cross-workspace data leakage in cloud environments.
  • Developing robust input validation and file upload handlers to protect against XSS, SQL injection, and malicious file execution.
  • Setting up automated audit logging and PII masking to ensure traceability and user privacy in production logs.
  • Performing pre-deployment security audits using a standardized checklist to identify hardcoded secrets or missing authorization layers.
namesecurity
aliases[auth, authentication, authorization, rbac, encryption, soc2, gdpr, jwt]
descriptionSecurity and data protection guidelines for RawDrive. Use when implementing authentication, handling user data, validating inputs, or reviewing security-sensitive code.

Security & Data Protection Guidelines

Key Files

Purpose Location
Auth middleware backend/src/app/middleware/auth.py
JWT utilities backend/src/app/core/security.py
Dependencies backend/src/app/api/dependencies.py
Encryption backend/src/app/services/encryption_service.py
Audit logging backend/src/app/services/audit_service.py

Security Layers

CloudFlare WAF/DDoS → nginx (TLS 1.3) → Rate Limiting
    → JWT Authentication → RBAC Authorization
        → Workspace Isolation → Encrypted Storage

Authentication

JWT Token Flow

# backend/src/app/core/security.py

ACCESS_TOKEN_EXPIRE = timedelta(minutes=15)
REFRESH_TOKEN_EXPIRE = timedelta(days=7)

def create_tokens(user: User) -> TokenPair:
    access_token = jwt.encode(
        {
            "sub": str(user.id),
            "workspace_id": str(user.workspace_id),
            "exp": datetime.utcnow() + ACCESS_TOKEN_EXPIRE,
        },
        settings.JWT_SECRET,  # From env, NEVER hardcode
        algorithm="HS256"
    )
    # ... refresh token similar
    return TokenPair(access_token=access_token, refresh_token=refresh_token)

Password Security

from argon2 import PasswordHasher

ph = PasswordHasher()

# Hash password (Argon2id)
hashed = ph.hash(password)

# Verify password
try:
    ph.verify(stored_hash, password)
except VerifyMismatchError:
    raise HTTPException(401, "Invalid credentials")

Two-Factor Authentication

import pyotp

# Generate TOTP secret
secret = pyotp.random_base32()
totp = pyotp.TOTP(secret)

# Verify token
is_valid = totp.verify(user_token, valid_window=1)

Authorization (RBAC)

# backend/src/app/api/dependencies.py

def require_permission(*permissions: str):
    async def check(
        user: User = Depends(get_current_user),
        workspace: Workspace = Depends(get_current_workspace),
    ):
        user_permissions = await get_user_permissions(user.id, workspace.id)
        if not any(p in user_permissions for p in permissions):
            raise HTTPException(403, "Permission denied")
        return True
    return Depends(check)

# Usage
@router.delete("/galleries/{id}")
async def delete_gallery(
    id: UUID,
    _: bool = require_permission("gallery:delete"),
):
    ...

Permission Format

# Resource-based permissions
PERMISSIONS = [
    "gallery:create", "gallery:read", "gallery:update", "gallery:delete",
    "asset:upload", "asset:delete", "asset:download",
    "client:manage", "user:manage",
    "billing:view", "billing:manage",
    "workspace:*",  # Wildcard for admin
]

Workspace Data Isolation (CRITICAL)

# ALWAYS include workspace_id in queries
async def get_asset(db: AsyncSession, workspace_id: UUID, asset_id: UUID):
    result = await db.execute(
        select(Asset)
        .where(Asset.workspace_id == workspace_id)  # REQUIRED
        .where(Asset.id == asset_id)
    )
    return result.scalar_one_or_none()

# WRONG - Cross-tenant vulnerability
result = await db.execute(select(Asset).where(Asset.id == asset_id))

# WRONG - Never trust client-provided workspace_id
workspace_id = request.body.workspace_id  # SECURITY VULNERABILITY

Storage Object Keys

# All storage keys MUST include workspace_id prefix
object_key = f"workspaces/{workspace_id}/assets/{asset_id}/original/{filename}"

Input Validation

from pydantic import BaseModel, Field, validator

class CreateGalleryRequest(BaseModel):
    name: str = Field(..., min_length=1, max_length=200)
    description: str | None = Field(None, max_length=2000)

    @validator("name")
    def sanitize_name(cls, v):
        # Remove HTML tags and dangerous characters
        return re.sub(r'[<>]', '', v).strip()

# In route handler
@router.post("/galleries")
async def create_gallery(
    request: CreateGalleryRequest,  # Auto-validated
    workspace: Workspace = Depends(get_current_workspace),
):
    # workspace_id from auth, NOT from request
    return await gallery_service.create(request, workspace.id)

File Upload Security

ALLOWED_TYPES = ["image/jpeg", "image/png", "image/webp", "image/heic"]
MAX_SIZE = 100 * 1024 * 1024  # 100MB

async def validate_upload(file: UploadFile):
    # Check MIME type
    if file.content_type not in ALLOWED_TYPES:
        raise HTTPException(400, f"Invalid file type: {file.content_type}")

    # Check size
    content = await file.read()
    if len(content) > MAX_SIZE:
        raise HTTPException(400, "File too large (max 100MB)")

    # Verify magic bytes match declared type
    if not verify_magic_bytes(content[:12], file.content_type):
        raise HTTPException(400, "File content doesn't match type")

    await file.seek(0)
    return True

Rate Limiting

# backend/src/app/middleware/rate_limit.py
from slowapi import Limiter

limiter = Limiter(key_func=get_remote_address)

# General API: 100/minute
@router.get("/galleries")
@limiter.limit("100/minute")
async def list_galleries():
    ...

# Auth endpoints: 5/15min
@router.post("/auth/login")
@limiter.limit("5/15minutes")
async def login():
    ...

# AI operations: 30/minute per workspace
@router.post("/ai/analyze")
@limiter.limit("30/minute", key_func=lambda r: r.user.workspace_id)
async def analyze():
    ...

Encryption

# backend/src/app/services/encryption_service.py
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives.ciphers.aead import AESGCM

# Load key from env - NEVER hardcode
ENCRYPTION_KEY = base64.urlsafe_b64decode(os.environ["ENCRYPTION_KEY"])

def encrypt(plaintext: str) -> str:
    nonce = os.urandom(12)
    aesgcm = AESGCM(ENCRYPTION_KEY)
    ciphertext = aesgcm.encrypt(nonce, plaintext.encode(), None)
    return base64.b64encode(nonce + ciphertext).decode()

def decrypt(ciphertext: str) -> str:
    data = base64.b64decode(ciphertext)
    nonce, encrypted = data[:12], data[12:]
    aesgcm = AESGCM(ENCRYPTION_KEY)
    return aesgcm.decrypt(nonce, encrypted, None).decode()

PII Protection

# NEVER log PII
logger.info("User registered", extra={"user_id": user.id})  # CORRECT
logger.info("User registered", extra={"email": user.email})  # WRONG

# Mask sensitive data in responses
def mask_email(email: str) -> str:
    local, domain = email.split("@")
    return f"{local[:2]}***@{domain}"

Audit Logging

# backend/src/app/services/audit_service.py

async def audit_log(
    workspace_id: UUID,
    user_id: UUID,
    action: str,
    resource_type: str,
    resource_id: UUID,
    ip_address: str,
    metadata: dict = None,
):
    await db.execute(
        insert(AuditLog).values(
            workspace_id=workspace_id,
            user_id=user_id,
            action=action,
            resource_type=resource_type,
            resource_id=resource_id,
            ip_address=ip_address,
            metadata=metadata or {},
        )
    )

# Usage
await audit_log(
    workspace_id=workspace.id,
    user_id=user.id,
    action="asset.delete",
    resource_type="asset",
    resource_id=asset_id,
    ip_address=request.client.host,
)

Security Checklist

Pre-Deployment

  • All endpoints require authentication
  • All queries include workspace_id filter
  • Storage keys include workspace_id prefix
  • Input validation on all user inputs
  • File uploads validated (type, size, content)
  • Rate limiting configured
  • No hardcoded secrets or API keys
  • Secrets loaded from environment
  • SQL injection prevented (parameterized queries)
  • XSS prevented (sanitization + CSP)
  • HTTPS enforced
  • Audit logging enabled

Code Review

  • No PII in logs
  • No secrets in code
  • Proper error handling
  • Authorization before data access
  • Workspace isolation enforced