2026-02-04 15:29:11 +01:00

20 KiB

Technology Patterns Reference

This file contains best practices and patterns for each supported technology. Reference this when generating skills.


Fullstack Technologies

PostgreSQL

Schema Design Patterns:

-- Use UUIDs for public-facing IDs
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    public_id UUID DEFAULT gen_random_uuid() UNIQUE NOT NULL,
    email VARCHAR(255) UNIQUE NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- Partial indexes for common queries
CREATE INDEX CONCURRENTLY idx_users_active
ON users (email) WHERE deleted_at IS NULL;

-- Composite indexes for multi-column queries
CREATE INDEX idx_orders_user_status
ON orders (user_id, status, created_at DESC);

Query Optimization:

-- Always use EXPLAIN ANALYZE
EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT * FROM users WHERE email = 'test@example.com';

-- Use CTEs for readability but be aware of optimization barriers
WITH active_users AS MATERIALIZED (
    SELECT id FROM users WHERE last_login > NOW() - INTERVAL '30 days'
)
SELECT * FROM orders WHERE user_id IN (SELECT id FROM active_users);

Connection Pooling:

  • Use PgBouncer for connection pooling
  • Set pool_mode = transaction for Django
  • Monitor with pgbouncer SHOW POOLS

Django

Model Patterns:

from django.db import models
from django.db.models import Manager, QuerySet


class ActiveManager(Manager):
    def get_queryset(self) -> QuerySet:
        return super().get_queryset().filter(is_active=True)


class BaseModel(models.Model):
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    class Meta:
        abstract = True


class User(BaseModel):
    email = models.EmailField(unique=True, db_index=True)
    is_active = models.BooleanField(default=True, db_index=True)

    objects = Manager()
    active = ActiveManager()

    class Meta:
        ordering = ['-created_at']
        indexes = [
            models.Index(fields=['email', 'is_active']),
        ]
        constraints = [
            models.CheckConstraint(
                check=models.Q(email__icontains='@'),
                name='valid_email_format'
            ),
        ]

QuerySet Optimization:

# Always use select_related for ForeignKey
User.objects.select_related('profile').get(id=1)

# Use prefetch_related for reverse relations and M2M
User.objects.prefetch_related(
    Prefetch(
        'orders',
        queryset=Order.objects.filter(status='completed').select_related('product')
    )
).all()

# Use only() or defer() for partial loading
User.objects.only('id', 'email').filter(is_active=True)

# Use values() or values_list() when you don't need model instances
User.objects.values_list('email', flat=True)

Middleware Pattern:

class RequestTimingMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        start_time = time.monotonic()
        response = self.get_response(request)
        duration = time.monotonic() - start_time
        response['X-Request-Duration'] = f'{duration:.3f}s'
        return response

REST API (DRF)

Serializer Patterns:

from rest_framework import serializers


class UserSerializer(serializers.ModelSerializer):
    full_name = serializers.SerializerMethodField()

    class Meta:
        model = User
        fields = ['id', 'email', 'full_name', 'created_at']
        read_only_fields = ['id', 'created_at']

    def get_full_name(self, obj) -> str:
        return f'{obj.first_name} {obj.last_name}'.strip()

    def validate_email(self, value: str) -> str:
        if User.objects.filter(email=value).exists():
            raise serializers.ValidationError('Email already exists')
        return value.lower()


class CreateUserSerializer(serializers.ModelSerializer):
    password = serializers.CharField(write_only=True, min_length=8)

    class Meta:
        model = User
        fields = ['email', 'password']

    def create(self, validated_data):
        return User.objects.create_user(**validated_data)

View Patterns:

from rest_framework import status
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from drf_spectacular.utils import extend_schema, OpenApiParameter


@extend_schema(
    parameters=[
        OpenApiParameter('status', str, description='Filter by status'),
    ],
    responses={200: OrderSerializer(many=True)},
    tags=['orders'],
)
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def list_orders(request):
    """List all orders for the authenticated user."""
    orders = Order.objects.filter(user=request.user)

    status_filter = request.query_params.get('status')
    if status_filter:
        orders = orders.filter(status=status_filter)

    serializer = OrderSerializer(orders, many=True)
    return Response(serializer.data)

Error Response Format:

# Standard error response
{
    "error": {
        "code": "VALIDATION_ERROR",
        "message": "Invalid input data",
        "details": {
            "email": ["This field is required."],
            "password": ["Password must be at least 8 characters."]
        }
    }
}

# Custom exception handler
def custom_exception_handler(exc, context):
    response = exception_handler(exc, context)
    if response is not None:
        response.data = {
            'error': {
                'code': exc.__class__.__name__.upper(),
                'message': str(exc),
                'details': response.data if isinstance(response.data, dict) else {}
            }
        }
    return response

Next.js

App Router Patterns:

// app/users/[id]/page.tsx
import { notFound } from 'next/navigation';

interface Props {
  params: { id: string };
}

// Generate static params for SSG
export async function generateStaticParams() {
  const users = await getUsers();
  return users.map((user) => ({ id: user.id.toString() }));
}

// Metadata generation
export async function generateMetadata({ params }: Props) {
  const user = await getUser(params.id);
  return {
    title: user?.name ?? 'User Not Found',
    description: user?.bio,
  };
}

// Page component (Server Component by default)
export default async function UserPage({ params }: Props) {
  const user = await getUser(params.id);

  if (!user) {
    notFound();
  }

  return <UserProfile user={user} />;
}

Data Fetching:

// With caching and revalidation
async function getUser(id: string) {
  const res = await fetch(`${API_URL}/users/${id}`, {
    next: {
      revalidate: 60, // Revalidate every 60 seconds
      tags: [`user-${id}`], // For on-demand revalidation
    },
  });

  if (!res.ok) return null;
  return res.json();
}

// Server action for mutations
'use server';

import { revalidateTag } from 'next/cache';

export async function updateUser(id: string, data: FormData) {
  const response = await fetch(`${API_URL}/users/${id}`, {
    method: 'PATCH',
    body: JSON.stringify(Object.fromEntries(data)),
  });

  if (!response.ok) {
    throw new Error('Failed to update user');
  }

  revalidateTag(`user-${id}`);
  return response.json();
}

Middleware:

// middleware.ts
import { NextResponse } from 'next/server';
import type { NextRequest } from 'next/server';

export function middleware(request: NextRequest) {
  // Authentication check
  const token = request.cookies.get('auth-token');

  if (!token && request.nextUrl.pathname.startsWith('/dashboard')) {
    return NextResponse.redirect(new URL('/login', request.url));
  }

  // Add headers
  const response = NextResponse.next();
  response.headers.set('x-request-id', crypto.randomUUID());

  return response;
}

export const config = {
  matcher: ['/dashboard/:path*', '/api/:path*'],
};

React

Component Patterns:

import { forwardRef, memo, useCallback, useMemo } from 'react';

interface ButtonProps extends React.ButtonHTMLAttributes<HTMLButtonElement> {
  variant?: 'primary' | 'secondary';
  isLoading?: boolean;
}

export const Button = memo(forwardRef<HTMLButtonElement, ButtonProps>(
  ({ variant = 'primary', isLoading, children, disabled, ...props }, ref) => {
    const className = useMemo(
      () => `btn btn-${variant} ${isLoading ? 'btn-loading' : ''}`,
      [variant, isLoading]
    );

    return (
      <button
        ref={ref}
        className={className}
        disabled={disabled || isLoading}
        {...props}
      >
        {isLoading ? <Spinner /> : children}
      </button>
    );
  }
));

Button.displayName = 'Button';

Custom Hooks:

function useAsync<T>(asyncFn: () => Promise<T>, deps: unknown[] = []) {
  const [state, setState] = useState<{
    data: T | null;
    error: Error | null;
    isLoading: boolean;
  }>({
    data: null,
    error: null,
    isLoading: true,
  });

  useEffect(() => {
    let isMounted = true;

    setState(prev => ({ ...prev, isLoading: true }));

    asyncFn()
      .then(data => {
        if (isMounted) {
          setState({ data, error: null, isLoading: false });
        }
      })
      .catch(error => {
        if (isMounted) {
          setState({ data: null, error, isLoading: false });
        }
      });

    return () => {
      isMounted = false;
    };
  }, deps);

  return state;
}

Celery

Task Configuration:

# celery.py
from celery import Celery

app = Celery('myapp')

app.conf.update(
    # Broker settings
    broker_url='redis://localhost:6379/0',
    result_backend='redis://localhost:6379/1',

    # Task settings
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,

    # Performance
    worker_prefetch_multiplier=4,
    task_acks_late=True,
    task_reject_on_worker_lost=True,

    # Results
    result_expires=86400,  # 24 hours

    # Retry
    task_default_retry_delay=60,
    task_max_retries=3,

    # Beat schedule
    beat_schedule={
        'cleanup-daily': {
            'task': 'tasks.cleanup',
            'schedule': crontab(hour=2, minute=0),
        },
    },
)

Task Patterns:

from celery import shared_task, chain, group, chord
from celery.exceptions import MaxRetriesExceededError


@shared_task(
    bind=True,
    name='process.item',
    max_retries=3,
    autoretry_for=(ConnectionError,),
    retry_backoff=True,
    retry_backoff_max=600,
    time_limit=300,
    soft_time_limit=240,
)
def process_item(self, item_id: int) -> dict:
    """Process a single item with automatic retry."""
    try:
        item = Item.objects.get(id=item_id)
        result = do_processing(item)
        return {'status': 'success', 'item_id': item_id, 'result': result}
    except Item.DoesNotExist:
        return {'status': 'not_found', 'item_id': item_id}
    except SoftTimeLimitExceeded:
        self.retry(countdown=60)


def process_batch(item_ids: list[int]) -> None:
    """Process items in parallel then aggregate."""
    workflow = chord(
        group(process_item.s(item_id) for item_id in item_ids),
        aggregate_results.s()
    )
    workflow.apply_async()

Redis

Caching Patterns:

import json
from functools import wraps
from typing import Callable, TypeVar

import redis

T = TypeVar('T')
client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)


def cache(ttl: int = 3600, prefix: str = 'cache'):
    """Decorator for caching function results."""
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @wraps(func)
        def wrapper(*args, **kwargs) -> T:
            # Generate cache key
            key_parts = [prefix, func.__name__] + [str(a) for a in args]
            key = ':'.join(key_parts)

            # Try cache
            cached = client.get(key)
            if cached:
                return json.loads(cached)

            # Execute and cache
            result = func(*args, **kwargs)
            client.setex(key, ttl, json.dumps(result))
            return result

        wrapper.invalidate = lambda *args: client.delete(
            ':'.join([prefix, func.__name__] + [str(a) for a in args])
        )
        return wrapper
    return decorator


# Rate limiting
def is_rate_limited(key: str, limit: int, window: int) -> bool:
    """Check if action is rate limited using sliding window."""
    pipe = client.pipeline()
    now = time.time()
    window_start = now - window

    pipe.zremrangebyscore(key, 0, window_start)
    pipe.zadd(key, {str(now): now})
    pipe.zcard(key)
    pipe.expire(key, window)

    results = pipe.execute()
    return results[2] > limit

DevOps Technologies

GitLab CI/CD

Pipeline Structure:

stages:
  - test
  - build
  - deploy

variables:
  DOCKER_TLS_CERTDIR: "/certs"
  PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache/pip"

.python_cache: &python_cache
  cache:
    key: ${CI_COMMIT_REF_SLUG}
    paths:
      - .cache/pip
      - .venv/

test:
  stage: test
  image: python:3.12
  <<: *python_cache
  before_script:
    - python -m venv .venv
    - source .venv/bin/activate
    - pip install -r requirements-dev.txt
  script:
    - pytest --cov --cov-report=xml
  coverage: '/TOTAL.*\s+(\d+%)$/'
  artifacts:
    reports:
      coverage_report:
        coverage_format: cobertura
        path: coverage.xml

build:
  stage: build
  image: docker:24.0
  services:
    - docker:24.0-dind
  script:
    - docker build -t $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA .
    - docker push $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
  rules:
    - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH

deploy:production:
  stage: deploy
  image: bitnami/kubectl:latest
  script:
    - kubectl set image deployment/app app=$CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
  environment:
    name: production
    url: https://app.example.com
  rules:
    - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH
      when: manual

Docker Compose

Production-Ready Pattern:

version: '3.8'

services:
  app:
    build:
      context: .
      target: production
      args:
        - BUILDKIT_INLINE_CACHE=1
    image: ${IMAGE_NAME:-app}:${IMAGE_TAG:-latest}
    restart: unless-stopped
    depends_on:
      db:
        condition: service_healthy
      redis:
        condition: service_healthy
    environment:
      - DATABASE_URL=postgres://user:pass@db:5432/app
      - REDIS_URL=redis://redis:6379/0
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health/"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 40s
    deploy:
      resources:
        limits:
          cpus: '1'
          memory: 512M
        reservations:
          cpus: '0.25'
          memory: 128M
    logging:
      driver: json-file
      options:
        max-size: "10m"
        max-file: "3"

  db:
    image: postgres:15-alpine
    restart: unless-stopped
    volumes:
      - postgres_data:/var/lib/postgresql/data
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=pass
      - POSTGRES_DB=app
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U user -d app"]
      interval: 10s
      timeout: 5s
      retries: 5

  redis:
    image: redis:7-alpine
    restart: unless-stopped
    command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru
    volumes:
      - redis_data:/data
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5

volumes:
  postgres_data:
  redis_data:

networks:
  default:
    driver: bridge

Kubernetes/K3s

Deployment Best Practices:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: app
spec:
  replicas: 3
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 0
  selector:
    matchLabels:
      app: app
  template:
    spec:
      terminationGracePeriodSeconds: 30
      containers:
        - name: app
          image: app:latest
          ports:
            - containerPort: 8000
          resources:
            requests:
              cpu: 100m
              memory: 128Mi
            limits:
              cpu: 500m
              memory: 256Mi
          livenessProbe:
            httpGet:
              path: /health/live/
              port: 8000
            initialDelaySeconds: 30
            periodSeconds: 10
          readinessProbe:
            httpGet:
              path: /health/ready/
              port: 8000
            initialDelaySeconds: 5
            periodSeconds: 5
          lifecycle:
            preStop:
              exec:
                command: ["/bin/sh", "-c", "sleep 10"]

Prometheus

Alert Rule Patterns:

groups:
  - name: slo.rules
    rules:
      # Error budget burn rate
      - alert: ErrorBudgetBurnRate
        expr: |
          (
            sum(rate(http_requests_total{status=~"5.."}[1h]))
            / sum(rate(http_requests_total[1h]))
          ) > (14.4 * (1 - 0.999))
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Error budget burning too fast"
          description: "At current error rate, monthly error budget will be exhausted in {{ $value | humanize }}"

      # Availability SLO
      - record: slo:availability:ratio
        expr: |
          1 - (
            sum(rate(http_requests_total{status=~"5.."}[30d]))
            / sum(rate(http_requests_total[30d]))
          )

Grafana

Dashboard Variables:

{
  "templating": {
    "list": [
      {
        "name": "namespace",
        "type": "query",
        "query": "label_values(kube_pod_info, namespace)",
        "refresh": 2,
        "includeAll": true,
        "multi": true
      },
      {
        "name": "pod",
        "type": "query",
        "query": "label_values(kube_pod_info{namespace=~\"$namespace\"}, pod)",
        "refresh": 2,
        "includeAll": true,
        "multi": true
      }
    ]
  }
}

Nginx

Reverse Proxy Pattern:

upstream backend {
    least_conn;
    server backend1:8000 weight=3;
    server backend2:8000 weight=2;
    server backend3:8000 backup;
    keepalive 32;
}

server {
    listen 443 ssl http2;
    server_name api.example.com;

    ssl_certificate /etc/ssl/certs/cert.pem;
    ssl_certificate_key /etc/ssl/private/key.pem;
    ssl_protocols TLSv1.2 TLSv1.3;
    ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256;

    # Security headers
    add_header X-Frame-Options "SAMEORIGIN" always;
    add_header X-Content-Type-Options "nosniff" always;
    add_header X-XSS-Protection "1; mode=block" always;

    location /api/ {
        proxy_pass http://backend;
        proxy_http_version 1.1;
        proxy_set_header Connection "";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;

        # Timeouts
        proxy_connect_timeout 10s;
        proxy_send_timeout 60s;
        proxy_read_timeout 60s;

        # Buffering
        proxy_buffering on;
        proxy_buffer_size 4k;
        proxy_buffers 8 4k;
    }

    location /static/ {
        alias /var/www/static/;
        expires 30d;
        add_header Cache-Control "public, immutable";
        gzip_static on;
    }
}

Traefik

IngressRoute Pattern:

apiVersion: traefik.io/v1alpha1
kind: IngressRoute
metadata:
  name: app
spec:
  entryPoints:
    - websecure
  routes:
    - match: Host(`app.example.com`) && PathPrefix(`/api`)
      kind: Rule
      services:
        - name: app-api
          port: 8000
          weight: 100
      middlewares:
        - name: rate-limit
        - name: retry
    - match: Host(`app.example.com`)
      kind: Rule
      services:
        - name: app-frontend
          port: 3000
  tls:
    certResolver: letsencrypt
    options:
      name: modern-tls

---
apiVersion: traefik.io/v1alpha1
kind: Middleware
metadata:
  name: rate-limit
spec:
  rateLimit:
    average: 100
    period: 1m
    burst: 50

---
apiVersion: traefik.io/v1alpha1
kind: Middleware
metadata:
  name: retry
spec:
  retry:
    attempts: 3
    initialInterval: 100ms