Middleware Pipeline
Direction-aware, scope-aware middleware pipeline for encryption, compression, deduplication, and custom transforms.
The middleware system provides a composable pipeline for transforming data on read and write paths. Each middleware is scoped and direction-aware, so you can apply encryption globally, compress only specific buckets, or deduplicate based on key patterns.
Quick Start
import (
"github.com/xraph/trove"
"github.com/xraph/trove/middleware"
"github.com/xraph/trove/middleware/encrypt"
"github.com/xraph/trove/middleware/compress"
"github.com/xraph/trove/drivers/memdriver"
)
drv := memdriver.New()
t, _ := trove.Open(drv,
// Encrypt all objects with AES-256-GCM.
trove.WithMiddleware(encrypt.New(
encrypt.WithKeyProvider(encrypt.NewStaticKeyProvider(myKey)),
)),
// Compress only the "logs" bucket.
trove.WithScopedMiddleware(
middleware.ForBuckets("logs"),
compress.New(),
),
)
defer t.Close(ctx)How It Works
Every Put() and Get() call passes data through a resolved middleware pipeline:
Put: data --> compress.WrapWriter --> encrypt.WrapWriter --> driver.Put
Get: driver.Get --> encrypt.WrapReader --> compress.WrapReader --> dataThe pipeline is assembled per-operation by the Resolver, which:
- Filters middleware by direction (read, write, or both)
- Filters middleware by scope (bucket, key pattern, content type, or custom predicate)
- Sorts by priority (lower number runs first, default 100)
- Caches the result for repeated lookups (LRU, capacity 1024)
Direction
Every middleware declares its direction:
type Direction int
const (
DirectionRead Direction = 1 << iota // read/download path only
DirectionWrite // write/upload path only
DirectionReadWrite = DirectionRead | DirectionWrite
)- ReadWrite middleware (encrypt, compress) transforms both paths
- Write-only middleware (dedup) only runs on uploads
- Read-only middleware could post-process downloads (e.g., watermarking)
Scopes
Scopes control when middleware activates. The built-in scopes cover common cases:
// Match specific buckets.
middleware.ForBuckets("media", "documents")
// Match object keys by glob pattern.
middleware.ForKeys("*.log", "reports/*.csv")
// Match by content type prefix.
middleware.ForContentTypes("image/", "video/")
// Custom predicate for any runtime condition.
middleware.When(func(ctx context.Context, bucket, key string) bool {
return extractTenant(ctx) == "acme"
})
// Compose scopes with boolean logic.
middleware.And(
middleware.ForBuckets("media"),
middleware.ForKeys("*.jpg", "*.png"),
)
middleware.Or(scope1, scope2)
middleware.Not(middleware.ForBuckets("public"))Registration Options
Register middleware at different levels:
// Global — applies to all operations.
trove.WithMiddleware(mw)
// Read-only or write-only.
trove.WithReadMiddleware(mw)
trove.WithWriteMiddleware(mw)
// Scoped — only activates when scope matches.
trove.WithScopedMiddleware(scope, mw)
trove.WithScopedReadMiddleware(scope, mw)
trove.WithScopedWriteMiddleware(scope, mw)
// Explicit priority (lower = runs first).
trove.WithMiddlewareAt(10, mw)Runtime Registration
Add or remove middleware after initialization:
t.UseMiddleware(middleware.Registration{
Middleware: myMW,
Scope: middleware.ForBuckets("temp"),
Direction: middleware.DirectionWrite,
Priority: 50,
})
t.RemoveMiddleware("my-middleware-name")Built-in Middleware
Encryption (middleware/encrypt)
AES-256-GCM authenticated encryption with nonce management.
enc := encrypt.New(
encrypt.WithKeyProvider(encrypt.NewStaticKeyProvider(key)),
)- Format:
[4-byte nonce length][nonce][ciphertext + GCM tag] - Direction: ReadWrite
- Key rotation: Implement the
KeyProviderinterface for Vault-backed keys
type KeyProvider interface {
Key(ctx context.Context) ([]byte, error)
}Compression (middleware/compress)
Zstd compression with smart skip logic.
comp := compress.New(
compress.WithMinSize(1024), // skip files < 1KB (default)
compress.WithExclude("custom"), // add custom skip extensions
)- Auto-detect: Reads check for zstd magic bytes, passing through uncompressed data transparently
- Skip list: jpg, png, gif, webp, mp4, mp3, zip, gz, bz2, xz, zst, br, rar, 7z
- Size guard: If compressed output is larger than input, original data is kept
Deduplication (middleware/dedup)
Content-hash deduplication with callback support.
dup := dedup.New(
dedup.WithOnDuplicate(func(ctx context.Context, key, hash string) {
log.Printf("duplicate detected: %s (hash: %s)", key, hash)
}),
)- Direction: Write-only
- Detection only: Reports duplicates via callback; data still flows through. Pair with CAS for actual storage dedup.
- Hash algorithm: Uses BLAKE3 by default, configurable via
WithHashAlgorithm()
Content Scanning (middleware/scan)
Write-only middleware that scans uploaded content for threats using a pluggable ScanProvider interface. Blocks malicious uploads before they reach storage.
import "github.com/xraph/trove/middleware/scan"
scanner := scan.New(
scan.WithProvider(scan.NewClamAVProvider("tcp://localhost:3310")),
scan.WithMaxSize(25 << 20), // skip files > 25MB
scan.WithSkipExtensions(".log", ".tmp"), // skip known safe extensions
scan.WithOnDetect(func(ctx context.Context, key string, result *scan.ScanResult) {
log.Printf("threat detected in %s: %s", key, result.Threat)
}),
)- Direction: Write-only
- Blocking: Returns
trove.ErrContentBlockedwhen a threat is detected - Buffered: Content is buffered in memory, scanned on
Close(), then flushed to the underlying writer only if clean - ClamAV provider: Built-in provider implementing the ClamAV INSTREAM protocol over TCP
- Custom providers: Implement
scan.ScanProviderfor other scanning backends
type ScanProvider interface {
Scan(ctx context.Context, r io.Reader) (*ScanResult, error)
}Watermark (middleware/watermark)
Read-only middleware that embeds invisible watermark metadata in downloaded images. Operates at the byte level without image decoding.
import "github.com/xraph/trove/middleware/watermark"
wm := watermark.New(
watermark.WithText("licensed-to:acme-corp"),
watermark.WithTypes("image/png", "image/jpeg"),
)
// Or with dynamic text based on context.
wm := watermark.New(
watermark.WithTextFunc(func(ctx context.Context) string {
return fmt.Sprintf("user:%s", getUserID(ctx))
}),
)- Direction: Read-only
- Supported formats: PNG (tEXt chunk injection), JPEG (COM marker injection)
- Passthrough: Non-matching content types are passed through unmodified
- Zero dependencies: Uses byte-level manipulation, no image processing libraries
Writing Custom Middleware
Implement the Middleware interface plus ReadMiddleware and/or WriteMiddleware:
type Watermark struct{}
func (w *Watermark) Name() string { return "watermark" }
func (w *Watermark) Direction() middleware.Direction { return middleware.DirectionRead }
func (w *Watermark) WrapReader(ctx context.Context, r io.ReadCloser, info *driver.ObjectInfo) (io.ReadCloser, error) {
// Transform the read stream.
return &watermarkReader{inner: r}, nil
}Register it:
trove.WithScopedReadMiddleware(
middleware.ForKeys("*.jpg", "*.png"),
&Watermark{},
)Resolver
The Resolver is the core of the middleware system. Access it directly for introspection:
resolver := t.Resolver()
// List all registrations.
regs := resolver.Registrations()
// Resolve the pipeline for a specific operation.
readers := resolver.ResolveRead(ctx, "media", "photo.jpg")
writers := resolver.ResolveWrite(ctx, "media", "photo.jpg")Pipeline results are cached with an LRU cache (1024 entries) that auto-invalidates when registrations change.