Skip to content

Lambda

GitHub stars GitHub forks License Language

Universal Lambda handler and router for incoming events.

GitHub Repository Issues

Overview

  • 0 stars 0 forks 0 open issues

  • Language: Go License: MIT Category: Serverless

Installation

go get github.com/go-gamma/lambda

Documentation

A universal Lambda framework for Go that auto-detects event types and provides a clean, middleware-based architecture for building Lambda functions.

Features

  • Universal Entry Point: Single handler that auto-detects event types
  • Full AWS Event Suite: API Gateway (v1/v2), ALB, SQS, SNS, S3, EventBridge, DynamoDB Streams, Kinesis
  • Unified Routing: Pattern-based routing for both HTTP and AWS events
  • Built-in Router: Go 1.22+ http.ServeMux with path parameter validation
  • Middleware Chain: Logging, recovery, auth, request ID, X-Ray tracing
  • Event Middleware: Logging, recovery, and tracing for event handlers
  • Concurrent Processing: Configurable parallelism for batch events
  • Standardized Responses: Consistent JSON responses with error mapping
  • Centralized Errors: Base error types with HTTP status mapping
  • DRY Architecture: Interface-based with base struct embedding

Quick Start

package main

import (
    "context"
    "log/slog"
    "os"

    "github.com/aws/aws-lambda-go/lambda"
    "github.com/go-gamma/handler"
    "github.com/go-gamma/handler/pkg/middleware/logging"
    "github.com/go-gamma/handler/pkg/middleware/recovery"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))

    app := handler.New(handler.WithLogger(logger))

    // Add middleware
    app.Use(
        recovery.New(logger),
        logging.New(logger),
    )

    // Register HTTP routes
    r := app.Router()
    r.GET("/users", listUsers)
    r.GET("/users/{id}", getUser)
    r.POST("/users", createUser)

    lambda.Start(app.LambdaHandler())
}

func getUser(ctx context.Context, req *handler.Request) handler.Response {
    id := req.PathParam("id")
    // Fetch user...
    return handler.OK(user)
}

Unified Routing

The framework provides unified routing for both HTTP and AWS events using pattern-based matching.

HTTP Routing

app := handler.New()

// Basic routes
app.GET("/health", healthCheck)
app.POST("/users", createUser)
app.PUT("/users/{id}", updateUser)
app.DELETE("/users/{id}", deleteUser)

// Path parameter validation
app.GET("/users/{id:int}", getUser)       // id must be integer
app.GET("/items/{uuid:uuid}", getItem)    // uuid must be valid UUID
app.GET("/pages/{slug:slug}", getPage)    // slug must be alphanumeric with hyphens

// Validated parameters in handlers
func getUser(ctx context.Context, req *handler.Request) handler.Response {
    id, ok := req.PathParamInt("id")  // Returns int, already validated
    if !ok {
        return handler.BadRequest("invalid user id")
    }
    // ...
}

Supported Parameter Types: - int - Integer validation - uuid - UUID format (8-4-4-4-12 hex) - slug - URL-safe slug (alphanumeric, hyphens) - alpha - Alphabetic only - alphanum - Alphanumeric only

Event Routing

Route AWS events by source (ARN, name, or wildcard pattern):

app := handler.New()

// SNS - Route by topic ARN, name, or wildcard
app.SNS("arn:aws:sns:us-east-1:123456789012:orders", handleOrders)
app.SNS("alerts", handleAlerts)           // Match by name
app.SNS("user-*", handleUserTopics)       // Wildcard pattern

// SQS - Route by queue with concurrent processing
app.SQS("order-queue", handleOrderQueue)
app.SQS("notification-*", handleNotifications)

// Kinesis - Route by stream
app.Kinesis("events-stream", handleEvents)
app.Kinesis("logs-*", handleLogStreams)

Two-Level Routing (Fluent API)

For events with sub-types (S3, DynamoDB, EventBridge), use the fluent API:

// S3 - Route by bucket + event type
app.S3("uploads-bucket").
    PutObject(handleUpload).
    DeleteObject(handleDelete).
    Fallback(handleOtherS3Events)

app.S3("logs-bucket/incoming/*").       // Bucket + key prefix
    PutObject(handleLogUpload)

// DynamoDB Streams - Route by table + event type
app.DynamoDB("Users").
    Insert(handleUserCreated).
    Modify(handleUserUpdated).
    Remove(handleUserDeleted)

// EventBridge - Route by source + detail-type
app.EventBridge("myapp.orders").
    On("Order.Created", handleOrderCreated).
    On("Order.Shipped", handleOrderShipped).
    On("Order.*", handleAllOrders)          // Wildcard detail-type

Route Priority

When multiple patterns could match, priority determines the winner:

  1. Exact ARN - arn:aws:sns:us-east-1:123:orders (highest)
  2. Exact name - orders
  3. Wildcard pattern - orders-* (lowest)
// These are matched in priority order:
app.SNS("arn:aws:sns:us-east-1:123:orders", handler1)  // Priority 1
app.SNS("orders", handler2)                             // Priority 2
app.SNS("orders-*", handler3)                           // Priority 3

Fallback Handlers

Set fallback handlers for unmatched sources:

app.SNSFallback(handleUnknownSNS)
app.SQSFallback(handleUnknownSQS)
app.S3Fallback(handleUnknownS3)

Event Middleware

Apply middleware to event handlers for cross-cutting concerns:

import (
    evtlogging "github.com/go-gamma/handler/pkg/events/middleware/logging"
    evtrecovery "github.com/go-gamma/handler/pkg/events/middleware/recovery"
    evttracing "github.com/go-gamma/handler/pkg/events/middleware/tracing"
)

// Apply middleware to S3 handlers
app.S3("uploads-bucket").
    Use(evtrecovery.New(logger), evtlogging.New(logger)).
    PutObject(handleUpload).
    DeleteObject(handleDelete)

// Apply middleware to SQS handlers
app.SQS("order-queue").
    Use(evtrecovery.New(logger), evttracing.New(logger)).
    Handler(handleOrder)

Built-in Event Middleware: - evtlogging.New(logger) - Log each record processed - evtrecovery.New(logger) - Recover from panics per record - evttracing.New(logger) - X-Ray subsegment per record

Concurrent Processing

Configure parallelism for batch events (SQS, DynamoDB, Kinesis):

// Process up to 10 SQS messages in parallel
app.SQS("order-queue").
    Concurrency(10).
    Handler(handleOrder)

// Process 5 DynamoDB records in parallel
app.DynamoDB("Users").
    Concurrency(5).
    Insert(handleUserCreated).
    Modify(handleUserUpdated)

// Set global default concurrency
app.SetEventConcurrency(5)

Partial batch failures are automatically reported for SQS, Kinesis, and DynamoDB.

Package Structure

handler/
├── handler.go                 # Main entry point
├── internal/                  # Private implementation
│   ├── detect/                # Event type detection
│   └── adapter/               # HTTP event adapters
│       ├── apigateway/        # API Gateway v1/v2
│       └── alb/               # ALB
├── pkg/                       # Public packages
│   ├── errors/                # Centralized error types
│   ├── router/                # HTTP routing + validation
│   ├── routing/               # Pattern matching (ARN, wildcard)
│   ├── middleware/            # HTTP middleware
│   │   ├── logging/           # Request logging
│   │   ├── recovery/          # Panic recovery
│   │   ├── auth/              # Authentication
│   │   ├── requestid/         # Request ID
│   │   └── tracing/           # X-Ray tracing
│   ├── response/              # Response builders
│   ├── context/               # Context utilities
│   └── events/                # Event handlers + routers
│       ├── router.go          # Base event router
│       ├── middleware.go      # Event middleware interface
│       ├── sqs/               # SQS router
│       ├── sns/               # SNS router
│       ├── s3/                # S3 router
│       ├── dynamodb/          # DynamoDB router
│       ├── kinesis/           # Kinesis router
│       ├── eventbridge/       # EventBridge router
│       └── middleware/        # Event middleware implementations
│           ├── logging/
│           ├── recovery/
│           └── tracing/
└── examples/                  # Example implementations

Event Detection

The framework automatically detects the event type from the incoming JSON:

  • API Gateway v1 (REST): httpMethod + resource
  • API Gateway v2 (HTTP): version: "2.0" + requestContext.http
  • ALB: requestContext.elb
  • SQS: Records[].eventSource == "aws:sqs"
  • SNS: Records[].Sns
  • S3: Records[].eventSource == "aws:s3"
  • EventBridge: source + detail-type
  • DynamoDB Streams: Records[].eventSource == "aws:dynamodb"
  • Kinesis: Records[].eventSource == "aws:kinesis"

HTTP Middleware

// Built-in middleware
app.Use(
    recovery.New(logger),              // Panic recovery
    requestid.New(logger),             // Request ID extraction
    logging.SkipHealthChecks(logger),  // Request logging
    tracing.New(logger),               // X-Ray tracing
)

// Protected routes with auth
protected := r.Group("/api/v1", auth.New(logger).Wrap)
protected.GET("/orders", listOrders)

Event Handlers (Legacy API)

The framework still supports the original event handler API:

// SQS with typed messages
app.OnSQS(sqs.JSONProcessor(logger, func(ctx context.Context, msg OrderMessage, record sqs.Record) error {
    // Process message...
    return nil
}))

// S3
app.OnS3(s3.ProcessorFunc(logger, func(ctx context.Context, record s3.Record) error {
    if record.IsObjectCreated() {
        // Handle upload...
    }
    return nil
}))

// EventBridge with routing
ebRouter := eventbridge.NewRouter(logger)
ebRouter.On("Order.Created", handleOrderCreated)
ebRouter.On("User.Registered", handleUserRegistered)
app.OnEventBridge(ebRouter)

Error Handling

// All errors embed BaseError for consistent formatting
err := errors.NotFound("user not found")
err = err.WithMeta("userId", id)

// Validation errors
v := errors.NewValidator()
v.Required(name, "name")
v.MinLength(password, "password", 8)
if err := v.Error(); err != nil {
    return handler.Error(err)
}

// HTTP errors with status codes
err := errors.BadRequest("invalid input")
err := errors.Forbidden("access denied")
err := errors.Internal("something went wrong")

Response Helpers

// JSON responses
handler.OK(data)                    // 200
handler.Created(data)               // 201
handler.NoContent()                 // 204
handler.JSON(statusCode, data)      // Custom

// Error responses
handler.Error(err)                  // Auto-detects status
handler.NotFound("resource not found")
handler.BadRequest("invalid input")
handler.Unauthorized("not authenticated")
handler.Forbidden("not authorized")

// Pagination
response.Paginated(items, page, pageSize, totalCount)

Complete Example

package main

import (
    "context"
    "log/slog"
    "os"

    "github.com/aws/aws-lambda-go/lambda"
    "github.com/go-gamma/handler"
    "github.com/go-gamma/handler/pkg/events/s3"
    "github.com/go-gamma/handler/pkg/events/sqs"
    "github.com/go-gamma/handler/pkg/events/dynamodb"
    evtlogging "github.com/go-gamma/handler/pkg/events/middleware/logging"
    evtrecovery "github.com/go-gamma/handler/pkg/events/middleware/recovery"
    "github.com/go-gamma/handler/pkg/middleware/logging"
    "github.com/go-gamma/handler/pkg/middleware/recovery"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    app := handler.New(handler.WithLogger(logger))

    // HTTP middleware
    app.Use(recovery.New(logger), logging.New(logger))

    // HTTP routes with validation
    app.GET("/health", healthCheck)
    app.GET("/users/{id:int}", getUser)
    app.GET("/items/{uuid:uuid}", getItem)
    app.POST("/users", createUser)

    // SQS routing with concurrency
    app.SQS("order-queue").
        Use(evtrecovery.New(logger), evtlogging.New(logger)).
        Concurrency(10).
        Handler(handleOrderMessage)

    app.SQS("notification-*", handleNotification)

    // S3 two-level routing
    app.S3("uploads-bucket").
        Use(evtrecovery.New(logger)).
        PutObject(handleUpload).
        DeleteObject(handleDelete)

    // DynamoDB Streams routing
    app.DynamoDB("Users").
        Use(evtlogging.New(logger)).
        Concurrency(5).
        Insert(handleUserCreated).
        Modify(handleUserUpdated).
        Remove(handleUserDeleted)

    // EventBridge routing
    app.EventBridge("myapp.orders").
        On("Order.Created", handleOrderCreated).
        On("Order.*", handleOtherOrders)

    lambda.Start(app.LambdaHandler())
}

// Handler implementations
func healthCheck(ctx context.Context, req *handler.Request) handler.Response {
    return handler.OK(map[string]string{"status": "healthy"})
}

func getUser(ctx context.Context, req *handler.Request) handler.Response {
    id, _ := req.PathParamInt("id")
    // Fetch user by id...
    return handler.OK(user)
}

func handleUpload(ctx context.Context, record s3.Record) error {
    slog.InfoContext(ctx, "upload",
        "bucket", record.S3.Bucket.Name,
        "key", record.S3.Object.Key)
    return nil
}

func handleUserCreated(ctx context.Context, record dynamodb.Record) error {
    slog.InfoContext(ctx, "user created", "keys", record.DynamoDB.Keys)
    return nil
}

func handleOrderMessage(ctx context.Context, record sqs.Record) error {
    slog.InfoContext(ctx, "processing order", "messageId", record.MessageId)
    return nil
}

Dependencies

  • github.com/aws/aws-lambda-go - Lambda runtime
  • github.com/aws/aws-sdk-go-v2/aws/arn - ARN parsing
  • golang.org/x/sync/errgroup - Concurrent processing
  • log/slog - Structured logging (Go 1.21+ stdlib)
  • Go 1.22+ required for enhanced router

Development

# Run tests
make test

# Run tests with race detection
make test-race

# Run linter
make lint

# Generate coverage report
make coverage

# Run benchmarks
make bench