Rate Limiting Strategies - Complete Python Implementations

Rate limiting protects your API/service from abuse, ensures fair resource allocation, and prevents system overload. Different algorithms suit different requirements.

For Senior Engineers: Know when to use each algorithm, their trade-offs, and how to implement them both in-memory and distributed (Redis).

Algorithm Comparison

Algorithm Memory Accuracy Burst Handling Best For
Token Bucket O(1) High Allows bursts API rate limiting with burst tolerance
Leaky Bucket O(1) High Smooths bursts Traffic shaping, constant rate output
Fixed Window O(1) Low (boundary issues) Poor Simple quotas, analytics
Sliding Window Log O(N) per user Very High Good Strict rate limiting
Sliding Window Counter O(1) High Good Best balance (production systems)

1. Token Bucket Algorithm

Overview

Tokens are added to a bucket at a fixed rate. Each request consumes a token. If bucket is empty, request is rejected.

graph LR subgraph T0["T=0: Initial State"] B0["Bucket
10/10 tokens"] R0["Request arrives"] R0 -->|"Consume 1 token"| B0_after["Bucket
9/10 tokens
ACCEPTED"] end subgraph T1["T=1: Refill"] B1["Bucket
10/10 tokens"] Note1["Refill: +1 token/sec
Capped at capacity"] end subgraph T2["T=2: Burst Allowed"] B2_before["Bucket
10/10 tokens"] R2["3 Requests
arrive"] R2 -->|"Consume 3 tokens"| B2_after["Bucket
7/10 tokens
ALL ACCEPTED"] end subgraph T3["T=3: Depletion"] B3_before["Bucket
8/10 tokens"] R3["10 Requests
arrive"] R3 -->|"Try to consume 10"| B3_after["Bucket
EMPTY
0/10 tokens"] R3_reject["Request #11
REJECTED
No tokens!"] end T0 --> T1 T1 --> T2 T2 --> T3 style B0_after fill:#3b4252,stroke:#a3be8c,stroke-width:2px,color:#e0e0e0 style B2_after fill:#3b4252,stroke:#ebcb8b,stroke-width:2px,color:#e0e0e0 style B3_after fill:#3b4252,stroke:#bf616a,stroke-width:2px,color:#e0e0e0 style R3_reject fill:#3b4252,stroke:#bf616a,stroke-width:3px,color:#e0e0e0

Key Feature: Allows traffic bursts up to bucket capacity, then enforces rate limit

How It Works

  1. Bucket has capacity (max tokens) and refill rate (tokens/second)
  2. Tokens added at constant rate up to capacity
  3. Each request consumes 1 token (or N for weighted)
  4. If tokens available - accept, else - reject

Pros

  • Allows traffic bursts (up to capacity)
  • Simple to understand
  • Memory efficient O(1)
  • Easy to implement

Cons

  • Bursts can overwhelm downstream
  • No request queuing
  • Boundary issues at window edges

Python Implementation

import time
import threading

class TokenBucket:
    """Token Bucket Rate Limiter

    Args:
        capacity: Maximum tokens in bucket
        refill_rate: Tokens added per second
    """

    def __init__(self, capacity: int, refill_rate: float):
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.tokens = capacity
        self.last_refill = time.time()
        self.lock = threading.Lock()

    def _refill(self):
        """Add tokens based on elapsed time"""
        now = time.time()
        elapsed = now - self.last_refill
        tokens_to_add = elapsed * self.refill_rate

        self.tokens = min(self.capacity, self.tokens + tokens_to_add)
        self.last_refill = now

    def allow_request(self, tokens: int = 1) -> bool:
        """Check if request is allowed (consumes tokens if yes)

        Args:
            tokens: Number of tokens to consume (for weighted limiting)

        Returns:
            True if allowed, False if rate limited
        """
        with self.lock:
            self._refill()

            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False

    def get_available_tokens(self) -> float:
        """Check current token count without consuming"""
        with self.lock:
            self._refill()
            return self.tokens


# Usage Example
limiter = TokenBucket(capacity=10, refill_rate=2.0)  # 10 tokens, +2/sec

# Simulate requests
for i in range(15):
    if limiter.allow_request():
        print(f"Request {i+1}: Allowed (tokens: {limiter.get_available_tokens():.2f})")
    else:
        print(f"Request {i+1}: Rate Limited")
    time.sleep(0.1)  # 100ms between requests


# Weighted Token Bucket (different costs per request)
class WeightedTokenBucket(TokenBucket):
    """Token bucket with different costs per operation"""

    def allow_read(self) -> bool:
        """Reads cost 1 token"""
        return self.allow_request(tokens=1)

    def allow_write(self) -> bool:
        """Writes cost 5 tokens (more expensive)"""
        return self.allow_request(tokens=5)

    def allow_search(self) -> bool:
        """Complex searches cost 10 tokens"""
        return self.allow_request(tokens=10)


# Usage
weighted_limiter = WeightedTokenBucket(capacity=100, refill_rate=10.0)
print(weighted_limiter.allow_read())   # Cost: 1
print(weighted_limiter.allow_write())  # Cost: 5
print(weighted_limiter.allow_search()) # Cost: 10

Real-World Usage

2. Leaky Bucket Algorithm

Overview

Requests enter a queue (bucket) and are processed at a constant rate. If queue is full, new requests are rejected.

graph TB Input["Requests Arrive
(Variable Bursty Rate)"] subgraph Bucket["LEAKY BUCKET (FIFO Queue)"] Queue["Queue
Capacity: 10 requests

Current:
[req1, req2, req3, req4]"] end Output["Requests Processed
(Constant Steady Rate)
5 req/sec"] Input -->|"Enqueue"| Queue Queue -->|"Dequeue at
constant rate
(leak)"| Output Full["Queue Full?"] Reject["REJECT
New Request"] Input -.->|"If capacity
reached"| Full Full -->|"Yes"| Reject Note1["Effect: Smooths bursty traffic
into steady stream"] style Input fill:#3b4252,stroke:#bf616a,stroke-width:2px,color:#e0e0e0 style Queue fill:#3b4252,stroke:#ebcb8b,stroke-width:2px,color:#e0e0e0 style Output fill:#3b4252,stroke:#a3be8c,stroke-width:2px,color:#e0e0e0 style Reject fill:#3b4252,stroke:#bf616a,stroke-width:3px,color:#e0e0e0 style Note1 fill:#3b4252,stroke:#88c0d0,stroke-width:2px,color:#e0e0e0

Key Feature: Smooths bursty traffic into constant rate output (traffic shaping)

How It Works

  1. Requests enter a FIFO queue (the "bucket")
  2. Requests "leak" from bucket at constant rate
  3. If queue is full - reject new requests
  4. Output is always smooth (constant rate)

Python Implementation

import time
import threading
from collections import deque
from typing import Optional

class LeakyBucket:
    """Leaky Bucket Rate Limiter

    Processes requests at constant rate, queues excess.

    Args:
        capacity: Maximum queue size
        leak_rate: Requests processed per second
    """

    def __init__(self, capacity: int, leak_rate: float):
        self.capacity = capacity
        self.leak_rate = leak_rate
        self.queue = deque()
        self.last_leak = time.time()
        self.lock = threading.Lock()

    def _leak(self):
        """Remove requests from queue at constant rate"""
        now = time.time()
        elapsed = now - self.last_leak

        # Calculate how many requests should have leaked
        leaks = int(elapsed * self.leak_rate)

        if leaks > 0:
            # Remove up to 'leaks' items from queue
            for _ in range(min(leaks, len(self.queue))):
                self.queue.popleft()
            self.last_leak = now

    def allow_request(self) -> bool:
        """Add request to queue if space available

        Returns:
            True if queued, False if bucket is full
        """
        with self.lock:
            self._leak()

            if len(self.queue) < self.capacity:
                self.queue.append(time.time())
                return True
            return False

    def get_queue_size(self) -> int:
        """Current queue size"""
        with self.lock:
            self._leak()
            return len(self.queue)


# Usage
limiter = LeakyBucket(capacity=10, leak_rate=2.0)  # Queue 10, process 2/sec

for i in range(20):
    if limiter.allow_request():
        print(f"Request {i+1}: Queued (queue size: {limiter.get_queue_size()})")
    else:
        print(f"Request {i+1}: Rejected (bucket full)")
    time.sleep(0.1)


# Leaky Bucket as Meter (no queue, just metering)
class LeakyBucketMeter:
    """Simplified leaky bucket - just tracks last request time

    Ensures minimum interval between requests
    """

    def __init__(self, rate: float):
        """
        Args:
            rate: Maximum requests per second
        """
        self.min_interval = 1.0 / rate
        self.last_request = 0
        self.lock = threading.Lock()

    def allow_request(self) -> bool:
        """Check if enough time passed since last request"""
        with self.lock:
            now = time.time()
            time_passed = now - self.last_request

            if time_passed >= self.min_interval:
                self.last_request = now
                return True
            return False

    def wait_time(self) -> float:
        """How long to wait before next request allowed"""
        with self.lock:
            now = time.time()
            time_passed = now - self.last_request
            return max(0, self.min_interval - time_passed)


# Usage
meter = LeakyBucketMeter(rate=5.0)  # Max 5 req/sec

for i in range(10):
    if meter.allow_request():
        print(f"Request {i+1}: Allowed")
    else:
        wait = meter.wait_time()
        print(f"Request {i+1}: Wait {wait:.2f}s")
    time.sleep(0.1)
Token Bucket vs Leaky Bucket:
- Token Bucket: Allows bursts (accumulates tokens)
- Leaky Bucket: Smooths bursts (constant output rate)
- Token Bucket is more common for API rate limiting

3. Fixed Window Counter

Overview

Count requests in fixed time windows (e.g., per minute). Reset counter at window boundary.

Window 1 (00:00-00:59)         Window 2 (01:00-01:59)
+────────────────────+         +────────────────────+
│ Count: 0 - 1 - 2...│  -      │ Count: 0 (RESET)   │
│ Limit: 100         │         │ Limit: 100         │
+────────────────────+         +────────────────────+

Problem: Boundary burst!
  00:30-01:00: 100 requests
  01:00-01:30: 100 requests
  - 200 requests in 1 minute! (should be 100)
            

How It Works

  1. Divide time into fixed windows (e.g., 1 minute)
  2. Count requests in current window
  3. If count < limit - accept, else - reject
  4. Reset counter at window boundary

Python Implementation

import time
import threading
from typing import Dict

class FixedWindowCounter:
    """Fixed Window Rate Limiter

    Simple but has boundary issues.

    Args:
        limit: Maximum requests per window
        window_size: Window duration in seconds
    """

    def __init__(self, limit: int, window_size: int):
        self.limit = limit
        self.window_size = window_size
        self.counters: Dict[str, int] = {}  # window_key -> count
        self.lock = threading.Lock()

    def _get_window_key(self) -> str:
        """Get current window identifier"""
        now = int(time.time())
        window_start = (now // self.window_size) * self.window_size
        return str(window_start)

    def allow_request(self, user_id: str) -> bool:
        """Check if request is allowed for user

        Args:
            user_id: User identifier

        Returns:
            True if allowed, False if rate limited
        """
        with self.lock:
            window_key = self._get_window_key()
            key = f"{user_id}:{window_key}"

            # Get current count for this window
            count = self.counters.get(key, 0)

            if count < self.limit:
                self.counters[key] = count + 1
                return True
            return False

    def get_count(self, user_id: str) -> int:
        """Get current count for user in this window"""
        with self.lock:
            window_key = self._get_window_key()
            key = f"{user_id}:{window_key}"
            return self.counters.get(key, 0)

    def cleanup_old_windows(self):
        """Remove old window data (call periodically)"""
        with self.lock:
            current_window = self._get_window_key()
            # Remove all keys except current window
            self.counters = {
                k: v for k, v in self.counters.items()
                if k.endswith(current_window)
            }


# Usage
limiter = FixedWindowCounter(limit=5, window_size=10)  # 5 req per 10 sec

user_id = "user_123"
for i in range(10):
    if limiter.allow_request(user_id):
        count = limiter.get_count(user_id)
        print(f"Request {i+1}: Allowed (count: {count}/{limiter.limit})")
    else:
        print(f"Request {i+1}: Rate Limited")
    time.sleep(1)

# Demonstrate boundary issue
print("\n--- Boundary Issue Demo ---")
limiter2 = FixedWindowCounter(limit=3, window_size=5)

# Send 3 requests at end of window
time.sleep(4)  # Wait until near end of window
for i in range(3):
    limiter2.allow_request(user_id)
print(f"End of window 1: {limiter2.get_count(user_id)} requests")

# Window resets, send 3 more immediately
time.sleep(2)  # Cross window boundary
for i in range(3):
    limiter2.allow_request(user_id)
print(f"Start of window 2: {limiter2.get_count(user_id)} requests")
print("Total in ~1 second: 6 requests (limit was 3!)")

Boundary Issue

At window boundaries, users can send 2x limit in short time:

When to Use

4. Sliding Window Log

Overview

Store timestamp of each request. Count requests in last N seconds. Most accurate but memory-intensive.

Sliding window (60 seconds)

Timestamps: [10:00:05, 10:00:15, 10:00:45, 10:00:55, 10:01:02]
                                                         ^
                                                        now

Window: [now - 60s, now] = [10:00:02, 10:01:02]

Count requests in window:
  10:00:05 - (in window)
  10:00:15 -
  10:00:45 -
  10:00:55 -
  10:01:02 -

Total: 5 requests
            

Python Implementation

import time
import threading
from collections import deque
from typing import Deque

class SlidingWindowLog:
    """Sliding Window Log Rate Limiter

    Most accurate but memory-intensive O(N) per user.

    Args:
        limit: Maximum requests in window
        window_size: Window duration in seconds
    """

    def __init__(self, limit: int, window_size: int):
        self.limit = limit
        self.window_size = window_size
        self.logs: Dict[str, Deque[float]] = {}  # user_id -> timestamps
        self.lock = threading.Lock()

    def allow_request(self, user_id: str) -> bool:
        """Check if request is allowed

        Args:
            user_id: User identifier

        Returns:
            True if allowed, False if rate limited
        """
        with self.lock:
            now = time.time()

            # Get or create log for user
            if user_id not in self.logs:
                self.logs[user_id] = deque()

            log = self.logs[user_id]

            # Remove timestamps outside window
            cutoff = now - self.window_size
            while log and log[0] < cutoff:
                log.popleft()

            # Check if under limit
            if len(log) < self.limit:
                log.append(now)
                return True
            return False

    def get_count(self, user_id: str) -> int:
        """Get current request count for user"""
        with self.lock:
            if user_id not in self.logs:
                return 0

            now = time.time()
            log = self.logs[user_id]
            cutoff = now - self.window_size

            # Remove old entries
            while log and log[0] < cutoff:
                log.popleft()

            return len(log)

    def get_memory_usage(self) -> int:
        """Total timestamps stored"""
        return sum(len(log) for log in self.logs.values())


# Usage
limiter = SlidingWindowLog(limit=5, window_size=10)  # 5 req per 10 sec

user_id = "user_123"
for i in range(10):
    if limiter.allow_request(user_id):
        count = limiter.get_count(user_id)
        print(f"Request {i+1}: Allowed (count: {count}/{limiter.limit})")
    else:
        count = limiter.get_count(user_id)
        print(f"Request {i+1}: Rate Limited (count: {count}/{limiter.limit})")
    time.sleep(1.5)

print(f"\nMemory usage: {limiter.get_memory_usage()} timestamps stored")


# Demonstrate accuracy (no boundary issue)
print("\n--- No Boundary Issue ---")
limiter2 = SlidingWindowLog(limit=3, window_size=5)

# Send requests
for i in range(3):
    limiter2.allow_request(user_id)
    time.sleep(1)

# Try to send more (should be rejected, window hasn't fully passed)
if limiter2.allow_request(user_id):
    print("Request 4: Allowed")
else:
    print("Request 4: Rejected (accurate sliding window!)")

Pros

  • Most accurate
  • No boundary issues
  • True sliding window
  • Simple to understand

Cons

  • Memory-intensive O(N) per user
  • Expensive at scale
  • Need to clean up old entries

5. Sliding Window Counter (Best for Production)

Overview

Hybrid of fixed window and sliding log. Approximates sliding window using two counters. O(1) memory with good accuracy.

Previous Window        Current Window
[10:00-10:59]         [10:01-10:01:59]
Count: 80                      ^
                           Now: 10:01:30

Sliding window estimate:
  Weight of prev window = (60 - 30) / 60 = 0.5
  Estimate = (80 x 0.5) + current_count
           = 40 + current_count

Approximates true sliding window with O(1) memory!
            

Python Implementation

import time
import threading
from typing import Dict, Tuple

class SlidingWindowCounter:
    """Sliding Window Counter Rate Limiter

    Best balance: O(1) memory, high accuracy, no boundary issues.
    This is what most production systems use!

    Args:
        limit: Maximum requests in window
        window_size: Window duration in seconds
    """

    def __init__(self, limit: int, window_size: int):
        self.limit = limit
        self.window_size = window_size
        # user_id -> (prev_count, prev_window_start, curr_count, curr_window_start)
        self.windows: Dict[str, Tuple[int, int, int, int]] = {}
        self.lock = threading.Lock()

    def _get_window_start(self, timestamp: float) -> int:
        """Get window start time for given timestamp"""
        return int(timestamp // self.window_size) * self.window_size

    def allow_request(self, user_id: str) -> bool:
        """Check if request is allowed

        Uses weighted count from previous + current window

        Args:
            user_id: User identifier

        Returns:
            True if allowed, False if rate limited
        """
        with self.lock:
            now = time.time()
            curr_window = self._get_window_start(now)

            # Get or initialize windows
            if user_id not in self.windows:
                self.windows[user_id] = (0, curr_window - self.window_size, 1, curr_window)
                return True

            prev_count, prev_window, curr_count, curr_window_stored = self.windows[user_id]

            # Check if we moved to a new window
            if curr_window != curr_window_stored:
                # Shift: current becomes previous, reset current
                prev_count = curr_count
                prev_window = curr_window_stored
                curr_count = 0

            # Calculate weighted count for sliding window
            time_in_curr_window = now - curr_window
            weight_prev = (self.window_size - time_in_curr_window) / self.window_size
            estimated_count = (prev_count * weight_prev) + curr_count

            if estimated_count < self.limit:
                curr_count += 1
                self.windows[user_id] = (prev_count, prev_window, curr_count, curr_window)
                return True

            return False

    def get_estimated_count(self, user_id: str) -> float:
        """Get estimated count in sliding window"""
        with self.lock:
            if user_id not in self.windows:
                return 0.0

            now = time.time()
            curr_window = self._get_window_start(now)
            prev_count, _, curr_count, curr_window_stored = self.windows[user_id]

            if curr_window != curr_window_stored:
                prev_count = curr_count
                curr_count = 0

            time_in_curr_window = now - curr_window
            weight_prev = (self.window_size - time_in_curr_window) / self.window_size

            return (prev_count * weight_prev) + curr_count


# Usage
limiter = SlidingWindowCounter(limit=5, window_size=10)  # 5 req per 10 sec

user_id = "user_123"
for i in range(10):
    if limiter.allow_request(user_id):
        estimate = limiter.get_estimated_count(user_id)
        print(f"Request {i+1}: Allowed (estimated: {estimate:.2f}/{limiter.limit})")
    else:
        estimate = limiter.get_estimated_count(user_id)
        print(f"Request {i+1}: Rate Limited (estimated: {estimate:.2f}/{limiter.limit})")
    time.sleep(1.5)


# Compare with Fixed Window
print("\n--- Comparison: Sliding vs Fixed Window ---")
fixed = FixedWindowCounter(limit=5, window_size=10)
sliding = SlidingWindowCounter(limit=5, window_size=10)

# Send burst at window boundary
time.sleep(8)  # Near end of window
for i in range(5):
    fixed.allow_request(user_id)
    sliding.allow_request(user_id)

time.sleep(3)  # Cross boundary
for i in range(5):
    fixed_ok = fixed.allow_request(user_id)
    sliding_ok = sliding.allow_request(user_id)
    print(f"Request {i+1}: Fixed={fixed_ok}, Sliding={sliding_ok}")

print("\nResult: Sliding window prevents boundary burst!")
Why This Is Best:
- O(1) memory (only 2 counters per user)
- ~99% accurate (better than fixed window)
- No boundary issues
- Production-ready (used by Cloudflare, Kong, AWS)

6. Distributed Rate Limiting with Redis

Overview

For distributed systems (multiple servers), use Redis for centralized rate limiting state.

Python Implementation - Redis-based

import redis
import time
from typing import Optional

class RedisTokenBucket:
    """Distributed Token Bucket using Redis

    Atomically implemented using Lua script for correctness.
    """

    # Lua script for atomic token bucket operation
    LUA_SCRIPT = """
    local key = KEYS[1]
    local capacity = tonumber(ARGV[1])
    local refill_rate = tonumber(ARGV[2])
    local tokens_requested = tonumber(ARGV[3])
    local now = tonumber(ARGV[4])

    -- Get current state
    local state = redis.call('HMGET', key, 'tokens', 'last_refill')
    local tokens = tonumber(state[1]) or capacity
    local last_refill = tonumber(state[2]) or now

    -- Refill tokens
    local elapsed = now - last_refill
    local tokens_to_add = elapsed * refill_rate
    tokens = math.min(capacity, tokens + tokens_to_add)

    -- Check if request allowed
    if tokens >= tokens_requested then
        tokens = tokens - tokens_requested
        redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
        redis.call('EXPIRE', key, 3600)  -- 1 hour TTL
        return 1  -- Allowed
    else
        return 0  -- Rejected
    end
    """

    def __init__(self, redis_client: redis.Redis, capacity: int, refill_rate: float):
        self.redis = redis_client
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.script = self.redis.register_script(self.LUA_SCRIPT)

    def allow_request(self, user_id: str, tokens: int = 1) -> bool:
        """Check if request allowed (distributed across servers)

        Args:
            user_id: User identifier
            tokens: Tokens to consume

        Returns:
            True if allowed, False if rate limited
        """
        key = f"rate_limit:token_bucket:{user_id}"
        now = time.time()

        result = self.script(
            keys=[key],
            args=[self.capacity, self.refill_rate, tokens, now]
        )

        return bool(result)


# Redis Fixed Window Counter (simpler)
class RedisFixedWindow:
    """Simple Redis-based fixed window counter"""

    def __init__(self, redis_client: redis.Redis, limit: int, window_size: int):
        self.redis = redis_client
        self.limit = limit
        self.window_size = window_size

    def allow_request(self, user_id: str) -> bool:
        """Check if request allowed

        Uses Redis INCR + EXPIRE for atomic counter
        """
        now = int(time.time())
        window = (now // self.window_size) * self.window_size
        key = f"rate_limit:fixed_window:{user_id}:{window}"

        # Atomic increment
        count = self.redis.incr(key)

        if count == 1:
            # First request in this window - set expiry
            self.redis.expire(key, self.window_size * 2)

        return count <= self.limit


# Redis Sliding Window Counter (production-grade)
class RedisSlidingWindow:
    """Production-grade sliding window using Redis"""

    LUA_SCRIPT = """
    local key = KEYS[1]
    local limit = tonumber(ARGV[1])
    local window = tonumber(ARGV[2])
    local now = tonumber(ARGV[3])

    local curr_window = math.floor(now / window) * window
    local prev_window = curr_window - window

    local prev_key = key .. ':' .. prev_window
    local curr_key = key .. ':' .. curr_window

    local prev_count = tonumber(redis.call('GET', prev_key) or 0)
    local curr_count = tonumber(redis.call('GET', curr_key) or 0)

    -- Calculate weight
    local time_in_curr = now - curr_window
    local weight = (window - time_in_curr) / window
    local estimated = (prev_count * weight) + curr_count

    if estimated < limit then
        redis.call('INCR', curr_key)
        redis.call('EXPIRE', curr_key, window * 2)
        return 1
    else
        return 0
    end
    """

    def __init__(self, redis_client: redis.Redis, limit: int, window_size: int):
        self.redis = redis_client
        self.limit = limit
        self.window_size = window_size
        self.script = self.redis.register_script(self.LUA_SCRIPT)

    def allow_request(self, user_id: str) -> bool:
        key = f"rate_limit:sliding:{user_id}"
        now = time.time()

        result = self.script(
            keys=[key],
            args=[self.limit, self.window_size, now]
        )

        return bool(result)


# Usage with Redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# Token Bucket
token_limiter = RedisTokenBucket(r, capacity=100, refill_rate=10.0)
print(token_limiter.allow_request("user_123"))

# Fixed Window
fixed_limiter = RedisFixedWindow(r, limit=100, window_size=60)
print(fixed_limiter.allow_request("user_456"))

# Sliding Window (RECOMMENDED)
sliding_limiter = RedisSlidingWindow(r, limit=100, window_size=60)
print(sliding_limiter.allow_request("user_789"))
Production Tips:
- Use Lua scripts for atomicity (avoid race conditions)
- Set TTL on keys to prevent memory leaks
- Use Redis Cluster for horizontal scaling
- Monitor Redis memory usage
- Consider local cache + Redis (reduce latency)

Rate Limiting Decorator (Practical Usage)

Python Decorator for APIs

from functools import wraps
from flask import Flask, request, jsonify
import redis

app = Flask(__name__)
r = redis.Redis(decode_responses=True)

def rate_limit(limit: int, window: int):
    """Decorator for rate limiting Flask routes

    Args:
        limit: Max requests per window
        window: Window size in seconds
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Get user ID (from header, JWT, IP, etc.)
            user_id = request.headers.get('X-User-ID', request.remote_addr)

            # Use Redis sliding window
            limiter = RedisSlidingWindow(r, limit, window)

            if not limiter.allow_request(user_id):
                return jsonify({
                    'error': 'Rate limit exceeded',
                    'retry_after': window
                }), 429  # Too Many Requests

            return func(*args, **kwargs)
        return wrapper
    return decorator


# Usage in API
@app.route('/api/search')
@rate_limit(limit=10, window=60)  # 10 requests per minute
def search():
    query = request.args.get('q')
    # ... expensive search operation
    return jsonify({'results': []})


@app.route('/api/upload')
@rate_limit(limit=5, window=3600)  # 5 uploads per hour
def upload():
    # ... handle file upload
    return jsonify({'status': 'uploaded'})

Summary & Recommendations

Scenario Recommended Algorithm Why
Production API (distributed) Redis Sliding Window Counter O(1) memory, accurate, no boundary issues, distributed
Single server, high performance Token Bucket (in-memory) Fast, allows bursts, simple
Traffic shaping, constant rate Leaky Bucket Smooths bursts, constant output
Strict enforcement needed Sliding Window Log Most accurate (if memory isn't issue)
Simple quotas, analytics Fixed Window Counter Simplest implementation
Weighted requests (different costs) Token Bucket Naturally supports different token costs

Interview Tips