> ## Documentation Index
> Fetch the complete documentation index at: https://docs.magichour.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Securing Your Webhook Handler

> Verify webhook authenticity with HMAC-SHA256 signature validation to prevent malicious requests.

## Why Webhook Security Matters

Without signature verification, anyone who discovers your webhook URL could send fake events to your application, potentially causing:

* ❌ Fake completion events triggering incorrect business logic
* ❌ Data corruption from processing invalid events
* ❌ Unauthorized access to your application
* ❌ Security vulnerabilities

**✅ Solution**: Verify that every webhook is actually from Magic Hour using cryptographic signatures.

<Warning>
  **Production Requirement**: Always verify webhook signatures in production. This is critical for
  security.
</Warning>

## How Signature Verification Works

Magic Hour signs every webhook using HMAC-SHA256 with your webhook secret:

1. **Magic Hour creates** a signed payload: `{timestamp}.{json_payload}`
2. **Magic Hour computes** HMAC-SHA256 signature using your webhook secret
3. **Magic Hour sends** the webhook with signature and timestamp in headers
4. **Your app recreates** the same signed payload
5. **Your app computes** the same HMAC-SHA256 signature
6. **Your app compares** signatures - if they match, the webhook is authentic

## Security Headers

Every webhook includes two security headers to help you verify authenticity:

### `magic-hour-event-signature`

The HMAC-SHA256 signature computed using your webhook secret and the signed payload.

**Example:**

```http theme={null}
magic-hour-event-signature: d1747eb9491aadeac6173f1c9fd5f4aff87c8981ac53d87b0830199221ff50af
```

**What it is**: A 64-character hexadecimal string representing the HMAC-SHA256 hash.

### `magic-hour-event-timestamp`

Unix timestamp (seconds since epoch) when Magic Hour sent the webhook.

**Example:**

```http theme={null}
magic-hour-event-timestamp: 1729314984
```

**What it's for**:

* Prevents replay attacks by rejecting old webhooks
* Recommended tolerance: Accept webhooks within 5 minutes of current time
* If timestamp is too old or too far in the future, reject the webhook

<Info>
  **Header Case**: HTTP headers are case-insensitive. `Magic-Hour-Event-Signature` and
  `magic-hour-event-signature` are equivalent.
</Info>

<Note>
  **Optional but Recommended**: Signature verification is not required to receive webhooks, but it's
  strongly recommended for production to ensure authenticity.
</Note>

<Steps>
  <Step title="Extract Headers and Payload">
    Get the signature, timestamp, and raw JSON payload from the incoming request:

    ```python theme={null}
    signature = request.headers.get('magic-hour-event-signature')
    timestamp = request.headers.get('magic-hour-event-timestamp')
    raw_payload = await request.body()  # Raw JSON bytes, not parsed
    ```
  </Step>

  <Step title="Extract the headers and payload">
    **What to do**: Get three pieces of information from the incoming webhook request:

    1. **Signature header**: `magic-hour-event-signature`
    2. **Timestamp header**: `magic-hour-event-timestamp`
    3. **Raw JSON body**: The entire request body as a string (not parsed yet)

    **Code example:**

    ```python theme={null}
    signature = request.headers.get('magic-hour-event-signature')
    timestamp = request.headers.get('magic-hour-event-timestamp')
    raw_body = await request.body()  # Raw bytes, not parsed JSON
    ```

    <Warning>
      **Critical**: You must use the raw request body exactly as received. Do NOT parse the JSON and
      re-stringify it, as this can change formatting and break signature verification.
    </Warning>
  </Step>

  <Step title="Create the signed payload">
    **What to do**: Combine the timestamp and raw payload into a single string for signature computation.

    **Format**: `{timestamp}.{raw_json_payload}`

    **Example signed payload:**

    ```
    1729314984.{"type":"video.completed","payload":{"id":"clx123","status":"complete","downloads":[{"url":"https://videos.magichour.ai/output.mp4"}]}}
    ```

    **Code example:**

    ```python theme={null}
    # Convert raw body to string if it's bytes
    payload_string = raw_body.decode('utf-8') if isinstance(raw_body, bytes) else raw_body

    # Create signed payload
    signed_payload = f"{timestamp}.{payload_string}"
    ```

    **Why**: The signed payload is what Magic Hour used to create the signature. You need to recreate it exactly the same way.
  </Step>

  <Step title="Compute your signature">
    **What to do**: Generate an HMAC-SHA256 hash using:

    * **Key**: Your webhook secret (from Magic Hour Developer Hub)
    * **Message**: The signed payload you just created

    **Algorithm**: HMAC-SHA256 (Hash-based Message Authentication Code with SHA-256)

    <CodeGroup>
      ```python Python theme={null}
      import hmac
      import hashlib
      import os

      # Get your webhook secret from environment variable
      webhook_secret = os.getenv("MAGIC_HOUR_WEBHOOK_SECRET")

      def compute_signature(signed_payload: str, secret: str) -> str:
          """
          Compute HMAC-SHA256 signature

          Args:
              signed_payload: The timestamp.payload string
              secret: Your webhook secret from Magic Hour

          Returns:
              64-character hexadecimal signature string
          """
          return hmac.new(
              secret.encode('utf-8'),      # Convert secret to bytes
              signed_payload.encode('utf-8'),  # Convert payload to bytes
              hashlib.sha256                # Use SHA256 hash
          ).hexdigest()                     # Return as hex string

      # Usage
      expected_signature = compute_signature(signed_payload, webhook_secret)
      ```

      ```typescript Node.js theme={null}
      const crypto = require("crypto");

      function computeSignature(signedPayload: string, webhookSecret: string): string {
        return crypto.createHmac("sha256", webhookSecret).update(signedPayload, "utf8").digest("hex");
      }

      // Usage
      const webhookSecret = process.env.MAGIC_HOUR_WEBHOOK_SECRET;
      const signature = computeSignature(signedPayload, webhookSecret);
      ```

      ```go Go theme={null}
      package main

      import (
          "crypto/hmac"
          "crypto/sha256"
          "encoding/hex"
          "os"
      )

      func computeSignature(signedPayload, webhookSecret string) string {
          mac := hmac.New(sha256.New, []byte(webhookSecret))
          mac.Write([]byte(signedPayload))
          return hex.EncodeToString(mac.Sum(nil))
      }

      // Usage
      func main() {
          webhookSecret := os.Getenv("MAGIC_HOUR_WEBHOOK_SECRET")
          signature := computeSignature(signedPayload, webhookSecret)
      }
      ```

      ```rust Rust theme={null}
      use hmac::{Hmac, Mac};
      use sha2::Sha256;
      use std::env;

      type HmacSha256 = Hmac<Sha256>;

      fn compute_signature(signed_payload: &str, webhook_secret: &str) -> String {
          let mut mac = HmacSha256::new_from_slice(webhook_secret.as_bytes())
              .expect("HMAC can take key of any size");
          mac.update(signed_payload.as_bytes());
          hex::encode(mac.finalize().into_bytes())
      }

      // Usage
      fn main() {
          let webhook_secret = env::var("MAGIC_HOUR_WEBHOOK_SECRET")
              .expect("MAGIC_HOUR_WEBHOOK_SECRET must be set");
          let signature = compute_signature(&signed_payload, &webhook_secret);
      }
      ```
    </CodeGroup>
  </Step>

  <Step title="Verify the signature">
    **What to do**: Compare your computed signature with the signature Magic Hour sent in the header.

    **Important**: Use a constant-time comparison to prevent timing attacks:

    ```python theme={null}
    import hmac
    import time

    # Get the signature from headers
    received_signature = request.headers.get('magic-hour-event-signature')

    # Compare signatures (constant-time comparison for security)
    is_valid = hmac.compare_digest(expected_signature, received_signature)

    if not is_valid:
        # Signature doesn't match - reject the webhook
        raise HTTPException(status_code=401, detail="Invalid signature")
    ```

    **Why constant-time comparison**: Regular string comparison (`==`) can leak timing information that attackers could exploit. `hmac.compare_digest()` prevents this.
  </Step>

  <Step title="Verify the timestamp">
    **What to do**: Check that the webhook was sent recently (within 5 minutes is recommended).

    **Why**: Prevents replay attacks where someone could resend an old valid webhook.

    ```python theme={null}
    import time

    # Get current time
    current_time = int(time.time())

    # Check if timestamp is within 5 minutes (300 seconds)
    timestamp_age = abs(current_time - int(timestamp))

    if timestamp_age > 300:
        # Timestamp too old or too far in future - reject
        raise HTTPException(status_code=401, detail="Timestamp too old")
    ```

    **Adjust tolerance**: You can adjust the 300-second (5-minute) window based on your needs, but keep it reasonable for security.
  </Step>
</Steps>

## Complete Secure Handler Implementation

<CodeGroup>
  ```python Python (FastAPI - Complete) theme={null}
  from fastapi import FastAPI, Request, HTTPException
  import hmac
  import hashlib
  import time
  import json
  import os

  app = FastAPI()

  # Get webhook secret from environment
  WEBHOOK_SECRET = os.getenv("MAGIC_HOUR_WEBHOOK_SECRET")

  def verify_webhook(signature: str, timestamp: str, raw_body: bytes) -> bool:
      """
      Verify webhook signature and timestamp

      Args:
          signature: The magic-hour-event-signature header value
          timestamp: The magic-hour-event-timestamp header value
          raw_body: The raw request body as bytes

      Returns:
          True if webhook is valid, False otherwise
      """

      # Step 1: Verify timestamp is recent (within 5 minutes)
      current_time = int(time.time())
      timestamp_age = abs(current_time - int(timestamp))

      if timestamp_age > 300:  # 5 minutes in seconds
          print(f"❌ Timestamp too old: {timestamp_age} seconds")
          return False

      # Step 2: Create signed payload
      # Format: "{timestamp}.{raw_json_payload}"
      payload_string = raw_body.decode('utf-8')
      signed_payload = f"{timestamp}.{payload_string}"

      # Step 3: Compute expected signature
      expected_signature = hmac.new(
          WEBHOOK_SECRET.encode('utf-8'),  # Secret as bytes
          signed_payload.encode('utf-8'),   # Signed payload as bytes
          hashlib.sha256                    # SHA256 hash function
      ).hexdigest()                         # Convert to hex string

      # Step 4: Compare signatures (constant-time comparison)
      is_valid = hmac.compare_digest(signature, expected_signature)

      if not is_valid:
          print(f"❌ Signature mismatch")
          print(f"   Received:  {signature}")
          print(f"   Expected:  {expected_signature}")

      return is_valid

  @app.post("/webhook")
  async def secure_webhook_handler(request: Request):
      """Secure webhook endpoint with signature verification"""

      # Extract headers
      signature = request.headers.get('magic-hour-event-signature')
      timestamp = request.headers.get('magic-hour-event-timestamp')

      # Validate headers exist
      if not signature or not timestamp:
          raise HTTPException(
              status_code=401,
              detail="Missing security headers"
          )

      # Get raw body for signature verification
      raw_body = await request.body()

      # Verify webhook authenticity
      if not verify_webhook(signature, timestamp, raw_body):
          raise HTTPException(
              status_code=401,
              detail="Invalid signature or timestamp"
          )

      # Now it's safe to parse and process the JSON
      event = json.loads(raw_body)

      print(f"✅ Verified webhook: {event['type']}")

      # Process the event
      match event['type']:
          case 'video.completed':
              print("Video is ready!")
              # Your business logic here
          case 'image.completed':
              print("Image is ready!")
              # Your business logic here
          case _:
              print(f"Received: {event['type']}")

      return {"success": True}

  # Run the server
  if __name__ == "__main__":
      import uvicorn

      if not WEBHOOK_SECRET:
          raise ValueError("MAGIC_HOUR_WEBHOOK_SECRET must be set")

      uvicorn.run(app, host="0.0.0.0", port=8000)
  ```

  ```typescript Node.js (Express) theme={null}
  const express = require("express");
  const crypto = require("crypto");

  const app = express();

  // IMPORTANT: Use express.raw() or express.text() to get raw body for signature verification
  // Do NOT use express.json() if you need raw body for signature verification
  app.use(express.text({ type: "application/json" }));

  const webhookSecret = process.env.MAGIC_HOUR_WEBHOOK_SECRET;

  function computeSignature(signedPayload) {
    return crypto.createHmac("sha256", webhookSecret).update(signedPayload).digest("hex");
  }

  app.post("/webhook", (req, res) => {
    // Get headers
    const signature = req.headers["magic-hour-event-signature"];
    const timestamp = req.headers["magic-hour-event-timestamp"];

    if (!signature || !timestamp) {
      return res.status(401).json({ error: "Missing security headers" });
    }

    if (!signature || !timestamp) {
      return res.status(401).json({ error: "Missing signature headers" });
    }

    const webhookSecret = process.env.MAGIC_HOUR_WEBHOOK_SECRET;
    if (!webhookSecret) {
      return res.status(500).json({ error: "Webhook secret not configured" });
    }

    // Get raw body (req.body is now a string, not parsed JSON)
    const rawBody = req.body;

    // Create signed payload and verify signature
    const signedPayload = `${timestamp}.${rawBody}`;
    const computedSignature = computeSignature(signedPayload);

    // Constant-time comparison
    if (signature !== computedSignature) {
      return res.status(401).json({ error: "Invalid signature" });
    }

    // Now it's safe to parse the JSON
    const event = JSON.parse(rawBody);
    const { type, payload } = event;

    // Process webhook event
    console.log(`✅ Verified webhook: ${type}`);

    // Your business logic here
    if (type === "video.completed") {
      console.log("Video is ready!");
    }

    res.status(200).json({ success: true });
  });
  ```

  ```python Colab/Jupyter theme={null}
  # Secure webhook handler for notebooks
  from fastapi import FastAPI, Request, HTTPException
  import hmac
  import hashlib
  import time
  import json
  import nest_asyncio
  import threading
  import uvicorn

  # Install: !pip install fastapi uvicorn nest-asyncio
  nest_asyncio.apply()

  app = FastAPI()

  # Your webhook secret (get this from Magic Hour Developer Hub)
  WEBHOOK_SECRET = "your-webhook-secret-here"  # Replace with actual secret

  def verify_signature(signature: str, timestamp: str, raw_payload: bytes) -> bool:
      # Check timestamp (within 5 minutes)
      if abs(int(time.time()) - int(timestamp)) > 300:
          print("❌ Timestamp too old")
          return False

      # Create signed payload
      signed_payload = f"{timestamp}.{raw_payload.decode('utf-8')}"

      # Compute expected signature
      expected = hmac.new(
          WEBHOOK_SECRET.encode('utf-8'),
          signed_payload.encode('utf-8'),
          hashlib.sha256
      ).hexdigest()

      # Compare
      if not hmac.compare_digest(signature, expected):
          print("❌ Invalid signature")
          return False

      return True

  @app.post("/webhook")
  async def secure_webhook(request: Request):
      signature = request.headers.get('magic-hour-event-signature')
      timestamp = request.headers.get('magic-hour-event-timestamp')

      if not signature or not timestamp:
          print("❌ Missing headers")
          raise HTTPException(status_code=401, detail="Missing headers")

      raw_payload = await request.body()

      # Verify signature
      if not verify_signature(signature, timestamp, raw_payload):
          raise HTTPException(status_code=401, detail="Invalid signature")

      # Parse verified payload
      event = json.loads(raw_payload)
      print(f"✅ Verified webhook: {event['type']}")
      print(f"📦 Payload: {json.dumps(event['payload'], indent=2)}")

      return {"success": True}

  # Start server
  def run_server():
      uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")

  server_thread = threading.Thread(target=run_server, daemon=True)
  server_thread.start()

  print("🔒 Secure webhook server started!")
  print("📝 Remember to set your WEBHOOK_SECRET variable above")
  ```
</CodeGroup>

## Testing Your Secure Handler

Test signature verification with a sample webhook:

<CodeGroup>
  ```bash cURL Test theme={null}
  # Test with valid signature
  curl -X POST http://localhost:8000/webhook \
    -H "Content-Type: application/json" \
    -H "magic-hour-event-signature: abc123..." \
    -H "magic-hour-event-timestamp: $(date +%s)" \
    -d '{"type":"test.event","payload":{"test":true}}'
  ```

  ```python Python Test Script theme={null}
  import requests
  import hmac
  import hashlib
  import time
  import json

  def create_test_webhook():
      webhook_secret = "your-webhook-secret"
      timestamp = str(int(time.time()))
      payload = {"type": "test.event", "payload": {"test": True}}
      payload_json = json.dumps(payload, separators=(',', ':'))

      # Create signature
      signed_payload = f"{timestamp}.{payload_json}"
      signature = hmac.new(
          webhook_secret.encode('utf-8'),
          signed_payload.encode('utf-8'),
          hashlib.sha256
      ).hexdigest()

      # Send test webhook
      response = requests.post(
          "http://localhost:8000/webhook",
          json=payload,
          headers={
              "magic-hour-event-signature": signature,
              "magic-hour-event-timestamp": timestamp
          }
      )

      print(f"Status: {response.status_code}")
      print(f"Response: {response.json()}")

  create_test_webhook()
  ```
</CodeGroup>

## Best Practices

### 1. Environment Variables

Store your webhook secret securely:

```bash theme={null}
# .env file
MAGIC_HOUR_WEBHOOK_SECRET=whsec_abc123def456...
```

### 2. Error Handling

Handle verification failures gracefully:

```python theme={null}
try:
    if not verify_webhook_signature(...):
        logger.warning("Invalid webhook signature", extra={
            'ip': request.client.host,
            'user_agent': request.headers.get('user-agent')
        })
        raise HTTPException(status_code=401, detail="Invalid signature")
except Exception as e:
    logger.error(f"Webhook verification failed: {e}")
    raise HTTPException(status_code=500, detail="Verification error")
```

### 3. Logging

Log security events for monitoring:

```python theme={null}
# Log successful verifications
logger.info("Webhook verified successfully", extra={
    'event_type': event['type'],
    'timestamp': timestamp,
    'signature_valid': True
})

# Log failed verifications
logger.warning("Webhook verification failed", extra={
    'reason': 'invalid_signature',
    'timestamp': timestamp,
    'ip_address': request.client.host
})
```

### 4. Rate Limiting

Protect against abuse:

```python theme={null}
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

@app.post("/webhook")
@limiter.limit("100/minute")  # Max 100 webhooks per minute per IP
async def webhook_handler(request: Request):
    # ... verification logic
```

## Troubleshooting

### Common Issues

**"Invalid signature" errors:**

* ✅ Ensure you're using the raw JSON payload, not parsed/re-stringified
* ✅ Check that your webhook secret is correct
* ✅ Verify timestamp format (Unix seconds, not milliseconds)
* ✅ Make sure you're concatenating `timestamp.payload` correctly

**"Timestamp too old" errors:**

* ✅ Check server clock synchronization
* ✅ Increase tolerance window if needed (but keep it reasonable)
* ✅ Ensure timestamp is in seconds, not milliseconds

**Missing headers:**

* ✅ Verify webhook is registered correctly in Magic Hour
* ✅ Check that your endpoint URL is correct
* ✅ Ensure headers are being received (log all headers for debugging)

### Debug Mode

Add debug logging to troubleshoot signature issues:

```python theme={null}
def debug_signature_verification(signature, timestamp, raw_payload, secret):
    signed_payload = f"{timestamp}.{raw_payload.decode('utf-8')}"
    expected = hmac.new(secret.encode(), signed_payload.encode(), hashlib.sha256).hexdigest()

    print(f"🔍 Debug Info:")
    print(f"   Received signature: {signature}")
    print(f"   Expected signature: {expected}")
    print(f"   Timestamp: {timestamp}")
    print(f"   Signed payload: {signed_payload[:100]}...")
    print(f"   Match: {hmac.compare_digest(signature, expected)}")

    return hmac.compare_digest(signature, expected)
```

## Next Steps

<CardGroup cols={2}>
  <Card title="Event Types Reference" icon="book" href="/integration/webhook/event-types">
    Learn about all available webhook events and their payloads
  </Card>

  <Card title="Production Deployment" icon="server" href="/integration/webhook/create-handler">
    Deploy secure webhook handlers to production
  </Card>

  <Card title="Webhook Quickstart" icon="rocket" href="/integration/webhook/overview">
    Start with basic webhook setup and testing
  </Card>

  <Card title="API Reference" icon="webhook" href="/webhook-reference">
    Complete webhook API documentation
  </Card>
</CardGroup>

Need help with webhook security? Contact [support@magichour.ai](mailto:support@magichour.ai)
