Skip to main content
This guide covers production-ready webhook handlers with error handling, logging, and best practices.
New to webhooks? Start with our Webhook Quickstart for a complete end-to-end tutorial.

Production Requirements

Your production webhook handler should:
  • Handle POST requests with JSON payloads
  • Return 2xx status codes for successful processing
  • Process requests quickly (< 10 seconds recommended)
  • Handle retries gracefully (idempotent processing)
  • Log events for debugging and monitoring
  • Verify signatures for security

Retry Behavior

Automatic Retries: If your endpoint doesn’t respond with a 2xx status code, Magic Hour will retry delivery:
  • Duration: Up to 24 hours
  • Pattern: Exponential backoff (1s, 2s, 4s, 8s, …)
  • After 24 hours: Event marked as failed, no more retries
Disabled Webhooks: Pending events are skipped and marked as failed after 24 hours.

Production Handler Examples

from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
from sqlalchemy.orm import Session
import logging
import json
import time

app = FastAPI()
logger = logging.getLogger(__name__)

@app.post("/webhook")
async def webhook_handler(
    request: Request,
    background_tasks: BackgroundTasks
):
    try:
        # Get event data
        event = await request.json()
        event_type = event.get('type')
        payload = event.get('payload', {})

        # Log the event
        logger.info(f"Received webhook: {event_type}", extra={
            'event_type': event_type,
            'job_id': payload.get('id'),
            'status': payload.get('status')
        })

        # Process in background to respond quickly
        background_tasks.add_task(process_webhook_event, event)

        return {"success": True, "timestamp": int(time.time())}

    except Exception as e:
        logger.error(f"Webhook processing failed: {str(e)}")
        raise HTTPException(status_code=500, detail="Internal server error")

async def process_webhook_event(event: dict):
    """Process webhook event in background"""
    try:
        event_type = event['type']
        payload = event['payload']
        job_id = payload['id']

        # Update database record
        with get_db_session() as db:
            job = db.query(Job).filter(Job.magic_hour_id == job_id).first()
            if not job:
                logger.warning(f"Job {job_id} not found in database")
                return

            if event_type == 'video.completed':
                job.status = 'completed'
                job.download_url = payload['downloads'][0]['url']
                job.completed_at = datetime.utcnow()

                # Notify user (email, push notification, etc.)
                await notify_user_completion(job.user_id, job)

            elif event_type == 'video.errored':
                job.status = 'failed'
                job.error_message = payload['error']['message']
                job.failed_at = datetime.utcnow()

                # Notify user of failure
                await notify_user_error(job.user_id, job)

            db.commit()
            logger.info(f"Updated job {job_id} status to {job.status}")

    except Exception as e:
        logger.error(f"Background processing failed: {str(e)}")

Best Practices

1. Idempotent Processing

Handle duplicate events gracefully (Magic Hour may retry):
# Check if event was already processed
if await is_event_processed(event_id):
    logger.info(f"Event {event_id} already processed, skipping")
    return {"success": True}

# Process event and mark as processed
await process_event(event)
await mark_event_processed(event_id)

2. Quick Response Times

Respond within 10 seconds to avoid timeouts:
# ✅ Good: Respond immediately, process in background
@app.post("/webhook")
async def webhook(request: Request, background_tasks: BackgroundTasks):
    event = await request.json()
    background_tasks.add_task(process_event, event)
    return {"success": True}  # Respond immediately

# ❌ Bad: Long processing blocks response
@app.post("/webhook")
async def webhook(request: Request):
    event = await request.json()
    await long_running_process(event)  # This might timeout
    return {"success": True}

3. Comprehensive Logging

Log all events for debugging:
logger.info("Webhook received", extra={
    'event_type': event['type'],
    'job_id': event['payload']['id'],
    'timestamp': int(time.time()),
    'user_agent': request.headers.get('user-agent'),
    'ip_address': request.client.host
})

4. Error Handling

Handle errors gracefully:
try:
    await process_webhook(event)
except ValidationError as e:
    logger.error(f"Invalid webhook data: {e}")
    return {"error": "Invalid data"}, 400
except Exception as e:
    logger.error(f"Webhook processing failed: {e}")
    return {"error": "Internal error"}, 500

Local Testing

Test your handler locally before deploying:
# Start your server
uvicorn main:app --reload --port 8000

# In another terminal, send test event
curl -X POST http://localhost:8000/webhook \
  -H "Content-Type: application/json" \
  -d '{
    "type": "video.completed",
    "payload": {
      "id": "test-job-123",
      "status": "complete",
      "downloads": [
        {
          "url": "https://example.com/video.mp4",
          "expires_at": "2024-12-01T12:00:00Z"
        }
      ]
    }
  }'

Deployment Considerations

Load Balancing

If using multiple servers, ensure webhooks can reach any instance:
# docker-compose.yml
services:
  webhook-handler:
    image: your-webhook-app
    replicas: 3
    ports:
      - "8000-8002:8000"

Health Checks

Add a health check endpoint:
@app.get("/health")
async def health_check():
    return {"status": "healthy", "timestamp": int(time.time())}

Monitoring

Monitor webhook delivery and processing:
# Track metrics
webhook_counter = Counter('webhooks_received_total', ['event_type'])
processing_time = Histogram('webhook_processing_seconds')

@app.post("/webhook")
async def webhook(request: Request):
    start_time = time.time()
    event = await request.json()

    webhook_counter.labels(event_type=event['type']).inc()

    # Process event...

    processing_time.observe(time.time() - start_time)
    return {"success": True}

Next Steps

Common Issues

Webhook timeouts?
  • Respond within 10 seconds
  • Use background tasks for long processing
  • Check server resources and scaling
Missing events?
  • Verify webhook is enabled in Developer Hub
  • Check endpoint URL is correct and accessible
  • Review server logs for errors
Duplicate events?
  • Implement idempotent processing
  • Store processed event IDs
  • Handle retries gracefully
Need help? Contact [email protected]