Lambda¶
Universal Lambda handler and router for incoming events.
Quick Links¶
Overview¶
-
0 stars 0 forks 0 open issues
-
Language: Go License: MIT Category: Serverless
Installation¶
Documentation¶
A universal Lambda framework for Go that auto-detects event types and provides a clean, middleware-based architecture for building Lambda functions.
Features¶
- Universal Entry Point: Single handler that auto-detects event types
- Full AWS Event Suite: API Gateway (v1/v2), ALB, SQS, SNS, S3, EventBridge, DynamoDB Streams, Kinesis
- Unified Routing: Pattern-based routing for both HTTP and AWS events
- Built-in Router: Go 1.22+
http.ServeMuxwith path parameter validation - Middleware Chain: Logging, recovery, auth, request ID, X-Ray tracing
- Event Middleware: Logging, recovery, and tracing for event handlers
- Concurrent Processing: Configurable parallelism for batch events
- Standardized Responses: Consistent JSON responses with error mapping
- Centralized Errors: Base error types with HTTP status mapping
- DRY Architecture: Interface-based with base struct embedding
Quick Start¶
package main
import (
"context"
"log/slog"
"os"
"github.com/aws/aws-lambda-go/lambda"
"github.com/go-gamma/handler"
"github.com/go-gamma/handler/pkg/middleware/logging"
"github.com/go-gamma/handler/pkg/middleware/recovery"
)
func main() {
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
app := handler.New(handler.WithLogger(logger))
// Add middleware
app.Use(
recovery.New(logger),
logging.New(logger),
)
// Register HTTP routes
r := app.Router()
r.GET("/users", listUsers)
r.GET("/users/{id}", getUser)
r.POST("/users", createUser)
lambda.Start(app.LambdaHandler())
}
func getUser(ctx context.Context, req *handler.Request) handler.Response {
id := req.PathParam("id")
// Fetch user...
return handler.OK(user)
}
Unified Routing¶
The framework provides unified routing for both HTTP and AWS events using pattern-based matching.
HTTP Routing¶
app := handler.New()
// Basic routes
app.GET("/health", healthCheck)
app.POST("/users", createUser)
app.PUT("/users/{id}", updateUser)
app.DELETE("/users/{id}", deleteUser)
// Path parameter validation
app.GET("/users/{id:int}", getUser) // id must be integer
app.GET("/items/{uuid:uuid}", getItem) // uuid must be valid UUID
app.GET("/pages/{slug:slug}", getPage) // slug must be alphanumeric with hyphens
// Validated parameters in handlers
func getUser(ctx context.Context, req *handler.Request) handler.Response {
id, ok := req.PathParamInt("id") // Returns int, already validated
if !ok {
return handler.BadRequest("invalid user id")
}
// ...
}
Supported Parameter Types: - int - Integer validation - uuid - UUID format (8-4-4-4-12 hex) - slug - URL-safe slug (alphanumeric, hyphens) - alpha - Alphabetic only - alphanum - Alphanumeric only
Event Routing¶
Route AWS events by source (ARN, name, or wildcard pattern):
app := handler.New()
// SNS - Route by topic ARN, name, or wildcard
app.SNS("arn:aws:sns:us-east-1:123456789012:orders", handleOrders)
app.SNS("alerts", handleAlerts) // Match by name
app.SNS("user-*", handleUserTopics) // Wildcard pattern
// SQS - Route by queue with concurrent processing
app.SQS("order-queue", handleOrderQueue)
app.SQS("notification-*", handleNotifications)
// Kinesis - Route by stream
app.Kinesis("events-stream", handleEvents)
app.Kinesis("logs-*", handleLogStreams)
Two-Level Routing (Fluent API)¶
For events with sub-types (S3, DynamoDB, EventBridge), use the fluent API:
// S3 - Route by bucket + event type
app.S3("uploads-bucket").
PutObject(handleUpload).
DeleteObject(handleDelete).
Fallback(handleOtherS3Events)
app.S3("logs-bucket/incoming/*"). // Bucket + key prefix
PutObject(handleLogUpload)
// DynamoDB Streams - Route by table + event type
app.DynamoDB("Users").
Insert(handleUserCreated).
Modify(handleUserUpdated).
Remove(handleUserDeleted)
// EventBridge - Route by source + detail-type
app.EventBridge("myapp.orders").
On("Order.Created", handleOrderCreated).
On("Order.Shipped", handleOrderShipped).
On("Order.*", handleAllOrders) // Wildcard detail-type
Route Priority¶
When multiple patterns could match, priority determines the winner:
- Exact ARN -
arn:aws:sns:us-east-1:123:orders(highest) - Exact name -
orders - Wildcard pattern -
orders-*(lowest)
// These are matched in priority order:
app.SNS("arn:aws:sns:us-east-1:123:orders", handler1) // Priority 1
app.SNS("orders", handler2) // Priority 2
app.SNS("orders-*", handler3) // Priority 3
Fallback Handlers¶
Set fallback handlers for unmatched sources:
Event Middleware¶
Apply middleware to event handlers for cross-cutting concerns:
import (
evtlogging "github.com/go-gamma/handler/pkg/events/middleware/logging"
evtrecovery "github.com/go-gamma/handler/pkg/events/middleware/recovery"
evttracing "github.com/go-gamma/handler/pkg/events/middleware/tracing"
)
// Apply middleware to S3 handlers
app.S3("uploads-bucket").
Use(evtrecovery.New(logger), evtlogging.New(logger)).
PutObject(handleUpload).
DeleteObject(handleDelete)
// Apply middleware to SQS handlers
app.SQS("order-queue").
Use(evtrecovery.New(logger), evttracing.New(logger)).
Handler(handleOrder)
Built-in Event Middleware: - evtlogging.New(logger) - Log each record processed - evtrecovery.New(logger) - Recover from panics per record - evttracing.New(logger) - X-Ray subsegment per record
Concurrent Processing¶
Configure parallelism for batch events (SQS, DynamoDB, Kinesis):
// Process up to 10 SQS messages in parallel
app.SQS("order-queue").
Concurrency(10).
Handler(handleOrder)
// Process 5 DynamoDB records in parallel
app.DynamoDB("Users").
Concurrency(5).
Insert(handleUserCreated).
Modify(handleUserUpdated)
// Set global default concurrency
app.SetEventConcurrency(5)
Partial batch failures are automatically reported for SQS, Kinesis, and DynamoDB.
Package Structure¶
handler/
├── handler.go # Main entry point
├── internal/ # Private implementation
│ ├── detect/ # Event type detection
│ └── adapter/ # HTTP event adapters
│ ├── apigateway/ # API Gateway v1/v2
│ └── alb/ # ALB
├── pkg/ # Public packages
│ ├── errors/ # Centralized error types
│ ├── router/ # HTTP routing + validation
│ ├── routing/ # Pattern matching (ARN, wildcard)
│ ├── middleware/ # HTTP middleware
│ │ ├── logging/ # Request logging
│ │ ├── recovery/ # Panic recovery
│ │ ├── auth/ # Authentication
│ │ ├── requestid/ # Request ID
│ │ └── tracing/ # X-Ray tracing
│ ├── response/ # Response builders
│ ├── context/ # Context utilities
│ └── events/ # Event handlers + routers
│ ├── router.go # Base event router
│ ├── middleware.go # Event middleware interface
│ ├── sqs/ # SQS router
│ ├── sns/ # SNS router
│ ├── s3/ # S3 router
│ ├── dynamodb/ # DynamoDB router
│ ├── kinesis/ # Kinesis router
│ ├── eventbridge/ # EventBridge router
│ └── middleware/ # Event middleware implementations
│ ├── logging/
│ ├── recovery/
│ └── tracing/
└── examples/ # Example implementations
Event Detection¶
The framework automatically detects the event type from the incoming JSON:
- API Gateway v1 (REST):
httpMethod+resource - API Gateway v2 (HTTP):
version: "2.0"+requestContext.http - ALB:
requestContext.elb - SQS:
Records[].eventSource == "aws:sqs" - SNS:
Records[].Sns - S3:
Records[].eventSource == "aws:s3" - EventBridge:
source+detail-type - DynamoDB Streams:
Records[].eventSource == "aws:dynamodb" - Kinesis:
Records[].eventSource == "aws:kinesis"
HTTP Middleware¶
// Built-in middleware
app.Use(
recovery.New(logger), // Panic recovery
requestid.New(logger), // Request ID extraction
logging.SkipHealthChecks(logger), // Request logging
tracing.New(logger), // X-Ray tracing
)
// Protected routes with auth
protected := r.Group("/api/v1", auth.New(logger).Wrap)
protected.GET("/orders", listOrders)
Event Handlers (Legacy API)¶
The framework still supports the original event handler API:
// SQS with typed messages
app.OnSQS(sqs.JSONProcessor(logger, func(ctx context.Context, msg OrderMessage, record sqs.Record) error {
// Process message...
return nil
}))
// S3
app.OnS3(s3.ProcessorFunc(logger, func(ctx context.Context, record s3.Record) error {
if record.IsObjectCreated() {
// Handle upload...
}
return nil
}))
// EventBridge with routing
ebRouter := eventbridge.NewRouter(logger)
ebRouter.On("Order.Created", handleOrderCreated)
ebRouter.On("User.Registered", handleUserRegistered)
app.OnEventBridge(ebRouter)
Error Handling¶
// All errors embed BaseError for consistent formatting
err := errors.NotFound("user not found")
err = err.WithMeta("userId", id)
// Validation errors
v := errors.NewValidator()
v.Required(name, "name")
v.MinLength(password, "password", 8)
if err := v.Error(); err != nil {
return handler.Error(err)
}
// HTTP errors with status codes
err := errors.BadRequest("invalid input")
err := errors.Forbidden("access denied")
err := errors.Internal("something went wrong")
Response Helpers¶
// JSON responses
handler.OK(data) // 200
handler.Created(data) // 201
handler.NoContent() // 204
handler.JSON(statusCode, data) // Custom
// Error responses
handler.Error(err) // Auto-detects status
handler.NotFound("resource not found")
handler.BadRequest("invalid input")
handler.Unauthorized("not authenticated")
handler.Forbidden("not authorized")
// Pagination
response.Paginated(items, page, pageSize, totalCount)
Complete Example¶
package main
import (
"context"
"log/slog"
"os"
"github.com/aws/aws-lambda-go/lambda"
"github.com/go-gamma/handler"
"github.com/go-gamma/handler/pkg/events/s3"
"github.com/go-gamma/handler/pkg/events/sqs"
"github.com/go-gamma/handler/pkg/events/dynamodb"
evtlogging "github.com/go-gamma/handler/pkg/events/middleware/logging"
evtrecovery "github.com/go-gamma/handler/pkg/events/middleware/recovery"
"github.com/go-gamma/handler/pkg/middleware/logging"
"github.com/go-gamma/handler/pkg/middleware/recovery"
)
func main() {
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
app := handler.New(handler.WithLogger(logger))
// HTTP middleware
app.Use(recovery.New(logger), logging.New(logger))
// HTTP routes with validation
app.GET("/health", healthCheck)
app.GET("/users/{id:int}", getUser)
app.GET("/items/{uuid:uuid}", getItem)
app.POST("/users", createUser)
// SQS routing with concurrency
app.SQS("order-queue").
Use(evtrecovery.New(logger), evtlogging.New(logger)).
Concurrency(10).
Handler(handleOrderMessage)
app.SQS("notification-*", handleNotification)
// S3 two-level routing
app.S3("uploads-bucket").
Use(evtrecovery.New(logger)).
PutObject(handleUpload).
DeleteObject(handleDelete)
// DynamoDB Streams routing
app.DynamoDB("Users").
Use(evtlogging.New(logger)).
Concurrency(5).
Insert(handleUserCreated).
Modify(handleUserUpdated).
Remove(handleUserDeleted)
// EventBridge routing
app.EventBridge("myapp.orders").
On("Order.Created", handleOrderCreated).
On("Order.*", handleOtherOrders)
lambda.Start(app.LambdaHandler())
}
// Handler implementations
func healthCheck(ctx context.Context, req *handler.Request) handler.Response {
return handler.OK(map[string]string{"status": "healthy"})
}
func getUser(ctx context.Context, req *handler.Request) handler.Response {
id, _ := req.PathParamInt("id")
// Fetch user by id...
return handler.OK(user)
}
func handleUpload(ctx context.Context, record s3.Record) error {
slog.InfoContext(ctx, "upload",
"bucket", record.S3.Bucket.Name,
"key", record.S3.Object.Key)
return nil
}
func handleUserCreated(ctx context.Context, record dynamodb.Record) error {
slog.InfoContext(ctx, "user created", "keys", record.DynamoDB.Keys)
return nil
}
func handleOrderMessage(ctx context.Context, record sqs.Record) error {
slog.InfoContext(ctx, "processing order", "messageId", record.MessageId)
return nil
}
Dependencies¶
github.com/aws/aws-lambda-go- Lambda runtimegithub.com/aws/aws-sdk-go-v2/aws/arn- ARN parsinggolang.org/x/sync/errgroup- Concurrent processinglog/slog- Structured logging (Go 1.21+ stdlib)- Go 1.22+ required for enhanced router