Trove

Middleware Pipeline

Direction-aware, scope-aware middleware pipeline for encryption, compression, deduplication, and custom transforms.

The middleware system provides a composable pipeline for transforming data on read and write paths. Each middleware is scoped and direction-aware, so you can apply encryption globally, compress only specific buckets, or deduplicate based on key patterns.

Quick Start

import (
    "github.com/xraph/trove"
    "github.com/xraph/trove/middleware"
    "github.com/xraph/trove/middleware/encrypt"
    "github.com/xraph/trove/middleware/compress"
    "github.com/xraph/trove/drivers/memdriver"
)

drv := memdriver.New()
t, _ := trove.Open(drv,
    // Encrypt all objects with AES-256-GCM.
    trove.WithMiddleware(encrypt.New(
        encrypt.WithKeyProvider(encrypt.NewStaticKeyProvider(myKey)),
    )),

    // Compress only the "logs" bucket.
    trove.WithScopedMiddleware(
        middleware.ForBuckets("logs"),
        compress.New(),
    ),
)
defer t.Close(ctx)

How It Works

Every Put() and Get() call passes data through a resolved middleware pipeline:

Put:  data --> compress.WrapWriter --> encrypt.WrapWriter --> driver.Put
Get:  driver.Get --> encrypt.WrapReader --> compress.WrapReader --> data

The pipeline is assembled per-operation by the Resolver, which:

  1. Filters middleware by direction (read, write, or both)
  2. Filters middleware by scope (bucket, key pattern, content type, or custom predicate)
  3. Sorts by priority (lower number runs first, default 100)
  4. Caches the result for repeated lookups (LRU, capacity 1024)

Direction

Every middleware declares its direction:

type Direction int

const (
    DirectionRead      Direction = 1 << iota // read/download path only
    DirectionWrite                           // write/upload path only
    DirectionReadWrite = DirectionRead | DirectionWrite
)
  • ReadWrite middleware (encrypt, compress) transforms both paths
  • Write-only middleware (dedup) only runs on uploads
  • Read-only middleware could post-process downloads (e.g., watermarking)

Scopes

Scopes control when middleware activates. The built-in scopes cover common cases:

// Match specific buckets.
middleware.ForBuckets("media", "documents")

// Match object keys by glob pattern.
middleware.ForKeys("*.log", "reports/*.csv")

// Match by content type prefix.
middleware.ForContentTypes("image/", "video/")

// Custom predicate for any runtime condition.
middleware.When(func(ctx context.Context, bucket, key string) bool {
    return extractTenant(ctx) == "acme"
})

// Compose scopes with boolean logic.
middleware.And(
    middleware.ForBuckets("media"),
    middleware.ForKeys("*.jpg", "*.png"),
)
middleware.Or(scope1, scope2)
middleware.Not(middleware.ForBuckets("public"))

Registration Options

Register middleware at different levels:

// Global — applies to all operations.
trove.WithMiddleware(mw)

// Read-only or write-only.
trove.WithReadMiddleware(mw)
trove.WithWriteMiddleware(mw)

// Scoped — only activates when scope matches.
trove.WithScopedMiddleware(scope, mw)
trove.WithScopedReadMiddleware(scope, mw)
trove.WithScopedWriteMiddleware(scope, mw)

// Explicit priority (lower = runs first).
trove.WithMiddlewareAt(10, mw)

Runtime Registration

Add or remove middleware after initialization:

t.UseMiddleware(middleware.Registration{
    Middleware: myMW,
    Scope:     middleware.ForBuckets("temp"),
    Direction: middleware.DirectionWrite,
    Priority:  50,
})

t.RemoveMiddleware("my-middleware-name")

Built-in Middleware

Encryption (middleware/encrypt)

AES-256-GCM authenticated encryption with nonce management.

enc := encrypt.New(
    encrypt.WithKeyProvider(encrypt.NewStaticKeyProvider(key)),
)
  • Format: [4-byte nonce length][nonce][ciphertext + GCM tag]
  • Direction: ReadWrite
  • Key rotation: Implement the KeyProvider interface for Vault-backed keys
type KeyProvider interface {
    Key(ctx context.Context) ([]byte, error)
}

Compression (middleware/compress)

Zstd compression with smart skip logic.

comp := compress.New(
    compress.WithMinSize(1024),          // skip files < 1KB (default)
    compress.WithExclude("custom"),      // add custom skip extensions
)
  • Auto-detect: Reads check for zstd magic bytes, passing through uncompressed data transparently
  • Skip list: jpg, png, gif, webp, mp4, mp3, zip, gz, bz2, xz, zst, br, rar, 7z
  • Size guard: If compressed output is larger than input, original data is kept

Deduplication (middleware/dedup)

Content-hash deduplication with callback support.

dup := dedup.New(
    dedup.WithOnDuplicate(func(ctx context.Context, key, hash string) {
        log.Printf("duplicate detected: %s (hash: %s)", key, hash)
    }),
)
  • Direction: Write-only
  • Detection only: Reports duplicates via callback; data still flows through. Pair with CAS for actual storage dedup.
  • Hash algorithm: Uses BLAKE3 by default, configurable via WithHashAlgorithm()

Content Scanning (middleware/scan)

Write-only middleware that scans uploaded content for threats using a pluggable ScanProvider interface. Blocks malicious uploads before they reach storage.

import "github.com/xraph/trove/middleware/scan"

scanner := scan.New(
    scan.WithProvider(scan.NewClamAVProvider("tcp://localhost:3310")),
    scan.WithMaxSize(25 << 20),              // skip files > 25MB
    scan.WithSkipExtensions(".log", ".tmp"),  // skip known safe extensions
    scan.WithOnDetect(func(ctx context.Context, key string, result *scan.ScanResult) {
        log.Printf("threat detected in %s: %s", key, result.Threat)
    }),
)
  • Direction: Write-only
  • Blocking: Returns trove.ErrContentBlocked when a threat is detected
  • Buffered: Content is buffered in memory, scanned on Close(), then flushed to the underlying writer only if clean
  • ClamAV provider: Built-in provider implementing the ClamAV INSTREAM protocol over TCP
  • Custom providers: Implement scan.ScanProvider for other scanning backends
type ScanProvider interface {
    Scan(ctx context.Context, r io.Reader) (*ScanResult, error)
}

Watermark (middleware/watermark)

Read-only middleware that embeds invisible watermark metadata in downloaded images. Operates at the byte level without image decoding.

import "github.com/xraph/trove/middleware/watermark"

wm := watermark.New(
    watermark.WithText("licensed-to:acme-corp"),
    watermark.WithTypes("image/png", "image/jpeg"),
)

// Or with dynamic text based on context.
wm := watermark.New(
    watermark.WithTextFunc(func(ctx context.Context) string {
        return fmt.Sprintf("user:%s", getUserID(ctx))
    }),
)
  • Direction: Read-only
  • Supported formats: PNG (tEXt chunk injection), JPEG (COM marker injection)
  • Passthrough: Non-matching content types are passed through unmodified
  • Zero dependencies: Uses byte-level manipulation, no image processing libraries

Writing Custom Middleware

Implement the Middleware interface plus ReadMiddleware and/or WriteMiddleware:

type Watermark struct{}

func (w *Watermark) Name() string                  { return "watermark" }
func (w *Watermark) Direction() middleware.Direction { return middleware.DirectionRead }

func (w *Watermark) WrapReader(ctx context.Context, r io.ReadCloser, info *driver.ObjectInfo) (io.ReadCloser, error) {
    // Transform the read stream.
    return &watermarkReader{inner: r}, nil
}

Register it:

trove.WithScopedReadMiddleware(
    middleware.ForKeys("*.jpg", "*.png"),
    &Watermark{},
)

Resolver

The Resolver is the core of the middleware system. Access it directly for introspection:

resolver := t.Resolver()

// List all registrations.
regs := resolver.Registrations()

// Resolve the pipeline for a specific operation.
readers := resolver.ResolveRead(ctx, "media", "photo.jpg")
writers := resolver.ResolveWrite(ctx, "media", "photo.jpg")

Pipeline results are cached with an LRU cache (1024 entries) that auto-invalidates when registrations change.

On this page