API Design Patterns

REST, GraphQL, and gRPC - When to Use Each

1. REST API Design

1.1 RESTful Principles

HTTP Status Codes Quick Reference

Code Meaning Use Case
200 OK Success GET, PUT, PATCH successful
201 Created Resource created POST successful, return Location header
204 No Content Success, no body DELETE successful
400 Bad Request Invalid input Validation errors
401 Unauthorized Authentication required Missing or invalid auth token
403 Forbidden No permission Authenticated but not authorized
404 Not Found Resource doesn't exist Invalid ID or deleted resource
409 Conflict Resource conflict Duplicate email, version mismatch
429 Too Many Requests Rate limit exceeded Return Retry-After header
500 Internal Server Error Server error Unexpected error

REST API Implementation (Python Flask)

from flask import Flask, request, jsonify, url_for
from flask_sqlalchemy import SQLAlchemy
from functools import wraps
from datetime import datetime
import jwt

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://localhost/api_db'
db = SQLAlchemy(app)

# Models
class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)

class Product(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(100), nullable=False)
    description = db.Column(db.Text)
    price = db.Column(db.Float, nullable=False)
    stock = db.Column(db.Integer, default=0)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)

# Authentication decorator
def require_auth(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        token = request.headers.get('Authorization')
        if not token:
            return jsonify({'error': 'Authorization required'}), 401

        try:
            # Verify JWT token
            payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
            request.user_id = payload['user_id']
        except jwt.InvalidTokenError:
            return jsonify({'error': 'Invalid token'}), 401

        return f(*args, **kwargs)
    return decorated

# ============================================
# RESTful Routes
# ============================================

# ✅ Good REST Design
# Resource-based URLs, proper HTTP methods

# List all products (with pagination)
@app.route('/api/v1/products', methods=['GET'])
def get_products():
    """
    GET /api/v1/products?page=1&per_page=20&sort=price&order=asc
    Returns paginated list of products
    """
    page = request.args.get('page', 1, type=int)
    per_page = request.args.get('per_page', 20, type=int)
    sort_by = request.args.get('sort', 'created_at')
    order = request.args.get('order', 'desc')

    # Validate pagination
    if per_page > 100:
        return jsonify({'error': 'per_page max is 100'}), 400

    # Build query
    query = Product.query
    if order == 'asc':
        query = query.order_by(getattr(Product, sort_by).asc())
    else:
        query = query.order_by(getattr(Product, sort_by).desc())

    # Paginate
    pagination = query.paginate(page=page, per_page=per_page, error_out=False)

    return jsonify({
        'products': [
            {
                'id': p.id,
                'name': p.name,
                'price': p.price,
                'stock': p.stock,
                'url': url_for('get_product', product_id=p.id, _external=True)
            }
            for p in pagination.items
        ],
        'pagination': {
            'page': page,
            'per_page': per_page,
            'total': pagination.total,
            'pages': pagination.pages,
            'next': url_for('get_products', page=page+1, per_page=per_page, _external=True)
                    if pagination.has_next else None,
            'prev': url_for('get_products', page=page-1, per_page=per_page, _external=True)
                    if pagination.has_prev else None
        }
    }), 200

# Get single product
@app.route('/api/v1/products/<int:product_id>', methods=['GET'])
def get_product(product_id):
    """
    GET /api/v1/products/123
    Returns single product
    """
    product = Product.query.get_or_404(product_id)

    return jsonify({
        'id': product.id,
        'name': product.name,
        'description': product.description,
        'price': product.price,
        'stock': product.stock,
        'created_at': product.created_at.isoformat(),
        '_links': {
            'self': url_for('get_product', product_id=product.id, _external=True),
            'update': url_for('update_product', product_id=product.id, _external=True),
            'delete': url_for('delete_product', product_id=product.id, _external=True)
        }
    }), 200

# Create product
@app.route('/api/v1/products', methods=['POST'])
@require_auth
def create_product():
    """
    POST /api/v1/products
    Body: {"name": "...", "price": 99.99, "stock": 10}
    Returns: 201 Created with Location header
    """
    data = request.get_json()

    # Validate input
    errors = {}
    if not data.get('name'):
        errors['name'] = 'Name is required'
    if not data.get('price') or data['price'] <= 0:
        errors['price'] = 'Valid price is required'

    if errors:
        return jsonify({'errors': errors}), 400

    # Create product
    product = Product(
        name=data['name'],
        description=data.get('description', ''),
        price=data['price'],
        stock=data.get('stock', 0)
    )

    db.session.add(product)
    db.session.commit()

    # Return 201 with Location header
    response = jsonify({
        'id': product.id,
        'name': product.name,
        'price': product.price,
        'stock': product.stock,
        '_links': {
            'self': url_for('get_product', product_id=product.id, _external=True)
        }
    })
    response.status_code = 201
    response.headers['Location'] = url_for('get_product', product_id=product.id, _external=True)

    return response

# Update product (full replacement)
@app.route('/api/v1/products/<int:product_id>', methods=['PUT'])
@require_auth
def update_product(product_id):
    """
    PUT /api/v1/products/123
    Full replacement of resource
    """
    product = Product.query.get_or_404(product_id)
    data = request.get_json()

    # Validate
    if not data.get('name') or not data.get('price'):
        return jsonify({'error': 'Name and price required'}), 400

    # Update all fields
    product.name = data['name']
    product.description = data.get('description', '')
    product.price = data['price']
    product.stock = data.get('stock', 0)

    db.session.commit()

    return jsonify({
        'id': product.id,
        'name': product.name,
        'price': product.price,
        'stock': product.stock
    }), 200

# Partial update
@app.route('/api/v1/products/<int:product_id>', methods=['PATCH'])
@require_auth
def partial_update_product(product_id):
    """
    PATCH /api/v1/products/123
    Partial update (only provided fields)
    """
    product = Product.query.get_or_404(product_id)
    data = request.get_json()

    # Update only provided fields
    if 'name' in data:
        product.name = data['name']
    if 'price' in data:
        product.price = data['price']
    if 'stock' in data:
        product.stock = data['stock']
    if 'description' in data:
        product.description = data['description']

    db.session.commit()

    return jsonify({
        'id': product.id,
        'name': product.name,
        'price': product.price,
        'stock': product.stock
    }), 200

# Delete product
@app.route('/api/v1/products/<int:product_id>', methods=['DELETE'])
@require_auth
def delete_product(product_id):
    """
    DELETE /api/v1/products/123
    Returns: 204 No Content
    """
    product = Product.query.get_or_404(product_id)

    db.session.delete(product)
    db.session.commit()

    return '', 204

# Nested resource example
@app.route('/api/v1/users/<int:user_id>/orders', methods=['GET'])
def get_user_orders(user_id):
    """
    GET /api/v1/users/123/orders
    Nested resource: orders belonging to a user
    """
    user = User.query.get_or_404(user_id)
    orders = user.orders  # Assuming relationship defined

    return jsonify({
        'user_id': user.id,
        'orders': [
            {
                'id': order.id,
                'total': order.total,
                'status': order.status,
                'url': url_for('get_order', order_id=order.id, _external=True)
            }
            for order in orders
        ]
    }), 200


# ============================================
# Error Handling
# ============================================

@app.errorhandler(404)
def not_found(error):
    return jsonify({'error': 'Resource not found'}), 404

@app.errorhandler(500)
def internal_error(error):
    db.session.rollback()
    return jsonify({'error': 'Internal server error'}), 500

@app.errorhandler(429)
def rate_limit_exceeded(error):
    return jsonify({
        'error': 'Rate limit exceeded',
        'retry_after': 60
    }), 429


# ============================================
# API Versioning
# ============================================

# Version in URL (recommended)
@app.route('/api/v1/products')
@app.route('/api/v2/products')
def versioned_endpoint():
    """
    URL versioning is most explicit and cache-friendly
    """
    pass

# Version in header (alternative)
@app.before_request
def check_version():
    """
    Accept: application/vnd.myapi.v2+json
    """
    accept = request.headers.get('Accept', '')
    if 'vnd.myapi.v2' in accept:
        request.api_version = 2
    else:
        request.api_version = 1
REST Best Practices:

2. GraphQL API Design

2.1 GraphQL Overview

Why GraphQL?

GraphQL Schema Example

import graphene
from graphene import relay
from graphene_sqlalchemy import SQLAlchemyObjectType, SQLAlchemyConnectionField
from sqlalchemy import create_engine, Column, Integer, String, Float, ForeignKey, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from datetime import datetime

Base = declarative_base()
engine = create_engine('postgresql://localhost/graphql_db')
Session = sessionmaker(bind=engine)

# SQLAlchemy Models
class UserModel(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    username = Column(String)
    email = Column(String)
    created_at = Column(DateTime, default=datetime.utcnow)
    orders = relationship("OrderModel", back_populates="user")

class ProductModel(Base):
    __tablename__ = 'products'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    description = Column(String)
    price = Column(Float)
    stock = Column(Integer)

class OrderModel(Base):
    __tablename__ = 'orders'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('users.id'))
    total = Column(Float)
    status = Column(String)
    created_at = Column(DateTime, default=datetime.utcnow)
    user = relationship("UserModel", back_populates="orders")


# GraphQL Types
class User(SQLAlchemyObjectType):
    class Meta:
        model = UserModel
        interfaces = (relay.Node,)

class Product(SQLAlchemyObjectType):
    class Meta:
        model = ProductModel
        interfaces = (relay.Node,)

class Order(SQLAlchemyObjectType):
    class Meta:
        model = OrderModel
        interfaces = (relay.Node,)


# Queries
class Query(graphene.ObjectType):
    """
    Define all read operations
    """
    node = relay.Node.Field()

    # Get all users
    all_users = SQLAlchemyConnectionField(User.connection)

    # Get all products
    all_products = SQLAlchemyConnectionField(Product.connection)

    # Get single user by ID
    user = graphene.Field(User, id=graphene.Int())

    # Get single product by ID
    product = graphene.Field(Product, id=graphene.Int())

    # Search products
    search_products = graphene.List(
        Product,
        query=graphene.String(required=True),
        min_price=graphene.Float(),
        max_price=graphene.Float()
    )

    def resolve_user(self, info, id):
        """Resolver for single user"""
        session = Session()
        return session.query(UserModel).get(id)

    def resolve_product(self, info, id):
        """Resolver for single product"""
        session = Session()
        return session.query(ProductModel).get(id)

    def resolve_search_products(self, info, query, min_price=None, max_price=None):
        """
        Resolver with filtering
        This is where GraphQL shines - flexible querying
        """
        session = Session()

        # Build query
        q = session.query(ProductModel).filter(
            ProductModel.name.ilike(f'%{query}%')
        )

        if min_price is not None:
            q = q.filter(ProductModel.price >= min_price)

        if max_price is not None:
            q = q.filter(ProductModel.price <= max_price)

        return q.all()


# Mutations (Create, Update, Delete)
class CreateProduct(graphene.Mutation):
    """
    Mutation to create a product
    """
    class Arguments:
        name = graphene.String(required=True)
        description = graphene.String()
        price = graphene.Float(required=True)
        stock = graphene.Int()

    # Return type
    product = graphene.Field(lambda: Product)
    ok = graphene.Boolean()

    def mutate(self, info, name, price, description=None, stock=0):
        session = Session()

        product = ProductModel(
            name=name,
            description=description,
            price=price,
            stock=stock
        )

        session.add(product)
        session.commit()
        session.refresh(product)

        return CreateProduct(product=product, ok=True)


class UpdateProduct(graphene.Mutation):
    """
    Mutation to update a product
    """
    class Arguments:
        id = graphene.Int(required=True)
        name = graphene.String()
        description = graphene.String()
        price = graphene.Float()
        stock = graphene.Int()

    product = graphene.Field(lambda: Product)
    ok = graphene.Boolean()

    def mutate(self, info, id, **kwargs):
        session = Session()

        product = session.query(ProductModel).get(id)
        if not product:
            return UpdateProduct(product=None, ok=False)

        # Update fields
        for key, value in kwargs.items():
            setattr(product, key, value)

        session.commit()
        session.refresh(product)

        return UpdateProduct(product=product, ok=True)


class DeleteProduct(graphene.Mutation):
    """
    Mutation to delete a product
    """
    class Arguments:
        id = graphene.Int(required=True)

    ok = graphene.Boolean()

    def mutate(self, info, id):
        session = Session()

        product = session.query(ProductModel).get(id)
        if not product:
            return DeleteProduct(ok=False)

        session.delete(product)
        session.commit()

        return DeleteProduct(ok=True)


class Mutation(graphene.ObjectType):
    """
    Define all write operations
    """
    create_product = CreateProduct.Field()
    update_product = UpdateProduct.Field()
    delete_product = DeleteProduct.Field()


# Create schema
schema = graphene.Schema(query=Query, mutation=Mutation)


# ============================================
# Flask integration
# ============================================

from flask import Flask
from flask_graphql import GraphQLView

app = Flask(__name__)

app.add_url_rule(
    '/graphql',
    view_func=GraphQLView.as_view(
        'graphql',
        schema=schema,
        graphiql=True  # Enable GraphiQL IDE
    )
)

if __name__ == '__main__':
    Base.metadata.create_all(engine)
    app.run(debug=True)

GraphQL Query Examples

// Query 1: Get specific fields only
// ✅ No over-fetching - client decides what to fetch
{
  user(id: 1) {
    username
    email
  }
}

// Query 2: Nested queries (solve N+1 with DataLoader)
// ✅ Get user and all their orders in one request
{
  user(id: 1) {
    username
    email
    orders {
      id
      total
      status
      createdAt
    }
  }
}

// Query 3: Multiple queries in one request
{
  user1: user(id: 1) {
    username
  }
  user2: user(id: 2) {
    username
  }
  topProducts: searchProducts(query: "laptop", maxPrice: 1000) {
    name
    price
  }
}

// Query 4: Fragments for reusability
fragment UserDetails on User {
  id
  username
  email
  createdAt
}

{
  user(id: 1) {
    ...UserDetails
    orders {
      total
    }
  }
}

// Mutation 1: Create product
mutation {
  createProduct(
    name: "MacBook Pro"
    description: "16-inch M3 Max"
    price: 2499.99
    stock: 10
  ) {
    product {
      id
      name
      price
    }
    ok
  }
}

// Mutation 2: Update product
mutation {
  updateProduct(
    id: 1
    price: 2299.99
    stock: 15
  ) {
    product {
      id
      name
      price
      stock
    }
    ok
  }
}
GraphQL Best Practices:

3. gRPC API Design

3.1 gRPC Overview

Why gRPC?

Protocol Buffers Definition

# product.proto
"""
syntax = "proto3";

package ecommerce;

// Product message
message Product {
  int32 id = 1;
  string name = 2;
  string description = 3;
  double price = 4;
  int32 stock = 5;
  int64 created_at = 6;
}

// Request/Response messages
message GetProductRequest {
  int32 id = 1;
}

message GetProductResponse {
  Product product = 1;
}

message ListProductsRequest {
  int32 page = 1;
  int32 page_size = 2;
  string sort_by = 3;
}

message ListProductsResponse {
  repeated Product products = 1;
  int32 total = 2;
  int32 page = 3;
  int32 total_pages = 4;
}

message CreateProductRequest {
  string name = 1;
  string description = 2;
  double price = 3;
  int32 stock = 4;
}

message CreateProductResponse {
  Product product = 1;
}

message UpdateProductRequest {
  int32 id = 1;
  string name = 2;
  string description = 3;
  double price = 4;
  int32 stock = 5;
}

message UpdateProductResponse {
  Product product = 1;
}

message DeleteProductRequest {
  int32 id = 1;
}

message DeleteProductResponse {
  bool success = 1;
}

// Streaming example
message ProductUpdate {
  int32 product_id = 1;
  double new_price = 2;
  int32 new_stock = 3;
  int64 timestamp = 4;
}

// Service definition
service ProductService {
  // Unary RPC (request-response)
  rpc GetProduct(GetProductRequest) returns (GetProductResponse);
  rpc ListProducts(ListProductsRequest) returns (ListProductsResponse);
  rpc CreateProduct(CreateProductRequest) returns (CreateProductResponse);
  rpc UpdateProduct(UpdateProductRequest) returns (UpdateProductResponse);
  rpc DeleteProduct(DeleteProductRequest) returns (DeleteProductResponse);

  // Server streaming (server sends stream of responses)
  rpc WatchProducts(GetProductRequest) returns (stream ProductUpdate);

  // Client streaming (client sends stream of requests)
  rpc BatchCreateProducts(stream CreateProductRequest) returns (CreateProductResponse);

  // Bidirectional streaming
  rpc LivePriceUpdates(stream ProductUpdate) returns (stream ProductUpdate);
}
"""

# Generate Python code from .proto:
# python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. product.proto

gRPC Server Implementation (Python)

import grpc
from concurrent import futures
import time
from datetime import datetime

# Import generated code (from .proto compilation)
# import product_pb2
# import product_pb2_grpc

# For this example, we'll simulate the generated classes
class ProductServiceServicer:
    """
    gRPC server implementation
    Implements methods defined in .proto service
    """

    def GetProduct(self, request, context):
        """
        Unary RPC: Single request, single response
        """
        product_id = request.id

        # Fetch from database
        product = self.db.get_product(product_id)

        if not product:
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details(f'Product {product_id} not found')
            return product_pb2.GetProductResponse()

        # Build response
        response = product_pb2.GetProductResponse(
            product=product_pb2.Product(
                id=product.id,
                name=product.name,
                description=product.description,
                price=product.price,
                stock=product.stock,
                created_at=int(product.created_at.timestamp())
            )
        )

        return response

    def ListProducts(self, request, context):
        """
        List products with pagination
        """
        page = request.page or 1
        page_size = min(request.page_size or 20, 100)  # Max 100 per page

        # Fetch from database
        products, total = self.db.list_products(
            page=page,
            page_size=page_size,
            sort_by=request.sort_by
        )

        # Build response
        response = product_pb2.ListProductsResponse(
            products=[
                product_pb2.Product(
                    id=p.id,
                    name=p.name,
                    description=p.description,
                    price=p.price,
                    stock=p.stock
                )
                for p in products
            ],
            total=total,
            page=page,
            total_pages=(total + page_size - 1) // page_size
        )

        return response

    def CreateProduct(self, request, context):
        """
        Create new product
        """
        # Validate
        if not request.name or request.price <= 0:
            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
            context.set_details('Invalid product data')
            return product_pb2.CreateProductResponse()

        # Create in database
        product = self.db.create_product(
            name=request.name,
            description=request.description,
            price=request.price,
            stock=request.stock
        )

        # Return created product
        response = product_pb2.CreateProductResponse(
            product=product_pb2.Product(
                id=product.id,
                name=product.name,
                description=product.description,
                price=product.price,
                stock=product.stock
            )
        )

        return response

    def WatchProducts(self, request, context):
        """
        Server streaming: Send updates as they happen
        Client receives stream of updates
        """
        product_id = request.id

        # Subscribe to product updates (e.g., Redis pub/sub)
        subscription = self.pubsub.subscribe(f'product:{product_id}')

        try:
            for message in subscription:
                # Send update to client
                update = product_pb2.ProductUpdate(
                    product_id=product_id,
                    new_price=message['price'],
                    new_stock=message['stock'],
                    timestamp=int(time.time())
                )
                yield update

        except GeneratorExit:
            # Client disconnected
            subscription.unsubscribe()

    def BatchCreateProducts(self, request_iterator, context):
        """
        Client streaming: Client sends stream of products
        Server returns single response at the end
        """
        created_count = 0

        for create_request in request_iterator:
            try:
                self.db.create_product(
                    name=create_request.name,
                    description=create_request.description,
                    price=create_request.price,
                    stock=create_request.stock
                )
                created_count += 1
            except Exception as e:
                print(f"Error creating product: {e}")

        return product_pb2.BatchCreateResponse(
            created_count=created_count
        )

    def LivePriceUpdates(self, request_iterator, context):
        """
        Bidirectional streaming:
        - Client sends price updates
        - Server broadcasts to all connected clients
        """
        for update in request_iterator:
            # Validate and apply update
            product = self.db.get_product(update.product_id)

            if product:
                # Update price
                self.db.update_price(update.product_id, update.new_price)

                # Broadcast to all subscribers
                broadcast = product_pb2.ProductUpdate(
                    product_id=update.product_id,
                    new_price=update.new_price,
                    new_stock=product.stock,
                    timestamp=int(time.time())
                )

                # Yield to this client
                yield broadcast


def serve():
    """
    Start gRPC server
    """
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))

    # Add servicer to server
    product_pb2_grpc.add_ProductServiceServicer_to_server(
        ProductServiceServicer(),
        server
    )

    # Listen on port
    server.add_insecure_port('[::]:50051')

    print("gRPC server starting on port 50051...")
    server.start()
    server.wait_for_termination()


if __name__ == '__main__':
    serve()

gRPC Client Implementation

import grpc

# import product_pb2
# import product_pb2_grpc

class ProductClient:
    """
    gRPC client for ProductService
    """

    def __init__(self, host='localhost', port=50051):
        # Create channel
        self.channel = grpc.insecure_channel(f'{host}:{port}')

        # Create stub (client)
        self.stub = product_pb2_grpc.ProductServiceStub(self.channel)

    def get_product(self, product_id):
        """
        Unary call: Get single product
        """
        request = product_pb2.GetProductRequest(id=product_id)

        try:
            response = self.stub.GetProduct(request)
            print(f"Product: {response.product.name} - ${response.product.price}")
            return response.product

        except grpc.RpcError as e:
            print(f"Error: {e.code()} - {e.details()}")
            return None

    def list_products(self, page=1, page_size=20):
        """
        List products with pagination
        """
        request = product_pb2.ListProductsRequest(
            page=page,
            page_size=page_size,
            sort_by='price'
        )

        response = self.stub.ListProducts(request)

        print(f"Total products: {response.total}")
        for product in response.products:
            print(f"  - {product.name}: ${product.price}")

        return response.products

    def create_product(self, name, price, description='', stock=0):
        """
        Create new product
        """
        request = product_pb2.CreateProductRequest(
            name=name,
            description=description,
            price=price,
            stock=stock
        )

        response = self.stub.CreateProduct(request)
        print(f"Created product: {response.product.name} (ID: {response.product.id})")

        return response.product

    def watch_product_updates(self, product_id):
        """
        Server streaming: Watch for product updates
        """
        request = product_pb2.GetProductRequest(id=product_id)

        # Receive stream of updates
        try:
            for update in self.stub.WatchProducts(request):
                print(f"Update for product {update.product_id}:")
                print(f"  Price: ${update.new_price}")
                print(f"  Stock: {update.new_stock}")

        except KeyboardInterrupt:
            print("Stopped watching")

    def batch_create(self, products):
        """
        Client streaming: Send multiple products
        """
        def generate_requests():
            for product_data in products:
                yield product_pb2.CreateProductRequest(
                    name=product_data['name'],
                    description=product_data.get('description', ''),
                    price=product_data['price'],
                    stock=product_data.get('stock', 0)
                )

        response = self.stub.BatchCreateProducts(generate_requests())
        print(f"Created {response.created_count} products")

    def close(self):
        """Close channel"""
        self.channel.close()


# Usage example
if __name__ == '__main__':
    client = ProductClient()

    # Unary call
    client.get_product(1)

    # List products
    client.list_products(page=1, page_size=10)

    # Create product
    client.create_product(
        name="MacBook Pro",
        price=2499.99,
        description="16-inch M3 Max",
        stock=10
    )

    # Watch for updates (blocking)
    # client.watch_product_updates(1)

    # Batch create
    products = [
        {'name': 'Product 1', 'price': 99.99},
        {'name': 'Product 2', 'price': 149.99},
        {'name': 'Product 3', 'price': 199.99},
    ]
    client.batch_create(products)

    client.close()
gRPC Best Practices:

4. Comparison: REST vs GraphQL vs gRPC

Feature REST GraphQL gRPC
Protocol HTTP/1.1 HTTP/1.1 HTTP/2
Data Format JSON, XML JSON Protocol Buffers (binary)
Schema OpenAPI (optional) Strongly typed (required) Protocol Buffers (required)
Endpoints Multiple (one per resource) Single (/graphql) Service methods
Caching HTTP caching (GET requests) Complex (usually POST) No built-in caching
Performance Good Good (can be better than REST) Excellent (binary, HTTP/2)
Browser Support Native Native Requires grpc-web
Streaming SSE, WebSockets Subscriptions (WebSockets) Native (bidirectional)
Learning Curve Low Medium High
Best For Public APIs, CRUD apps Complex queries, mobile apps Microservices, real-time

4.1 When to Use Each

Use REST when:
Use GraphQL when:
Use gRPC when:

5. Interview Tips

Key Topics to Mention:
Common Mistakes to Avoid: