Trove

Architecture

Trove's layered architecture: storage pipeline, multi-backend routing, middleware, streaming, CAS, and VFS.

Package Overview

trove/
├── trove.go              # Core Trove type and operations (Put, Get, Delete, Head, List, Copy)
├── config.go             # Config type, DefaultConfig(), checksum and retry settings
├── options.go            # Functional options: WithBackend, WithRoute, WithVFS, etc.
├── errors.go             # Sentinel errors (ErrNotFound, ErrBucketEmpty, etc.)
├── driver/               # Driver interface + capability interfaces + option types
│   ├── driver.go         # Core Driver interface, ObjectInfo, ObjectReader, ObjectIterator
│   ├── options.go        # PutOption, GetOption, ListOption, CopyOption, BucketOption
│   ├── capability.go     # MultipartDriver, PresignDriver, VersioningDriver, etc.
│   ├── dsn.go            # DSN parser for connection strings
│   └── registry.go       # Global driver registry
├── drivers/              # Built-in storage backend drivers
│   ├── localdriver/      # Local filesystem (file:///path DSN)
│   ├── memdriver/        # In-memory (testing, no persistence)
│   ├── s3driver/         # AWS S3 / MinIO / S3-compatible
│   ├── gcsdriver/        # Google Cloud Storage
│   ├── azuredriver/      # Azure Blob Storage
│   └── sftpdriver/       # SFTP remote storage
├── middleware/            # Scope-aware middleware pipeline framework
│   ├── middleware.go     # Middleware, ReadMiddleware, WriteMiddleware interfaces
│   ├── registration.go  # Registration type with scope, direction, priority
│   ├── resolver.go       # Resolver: assembles per-operation pipeline with LRU cache
│   ├── scope.go          # Scope interface + built-in scopes (Global, Bucket, Key, etc.)
│   ├── helpers.go        # Convenience constructors: ForBuckets, ForKeys, And, Or, Not
│   ├── cache.go          # LRU scope cache for resolved pipelines
│   ├── encrypt/          # AES-256-GCM encryption with KeyProvider interface
│   ├── compress/         # Zstd compression with min-size and skip-extension logic
│   ├── dedup/            # Content deduplication via CAS integration
│   ├── scan/             # Content scanning (virus, policy)
│   └── watermark/        # Metadata watermarking
├── stream/               # Managed streaming engine
│   ├── stream.go         # Stream type, state machine, lifecycle hooks
│   ├── chunk.go          # Chunk, ChunkAck, ChunkPool (sync.Pool-based)
│   ├── pool.go           # Pool: concurrency-limited stream manager
│   ├── backpressure.go   # Backpressure strategies (Block, Drop, Buffer, Adaptive)
│   └── metrics.go        # Per-stream transfer metrics
├── cas/                  # Content-addressable storage
│   ├── cas.go            # CAS type: Store, Retrieve, Exists, Pin, Unpin, GC
│   ├── index.go          # Index interface + MemoryIndex implementation
│   ├── hash.go           # Hash algorithms (SHA-256, BLAKE3)
│   └── gc.go             # Garbage collection cycle
├── vfs/                  # Virtual filesystem over object storage
│   ├── vfs.go            # FS type: Stat, ReadDir, Walk, Open, Create, Mkdir, Remove, Rename
│   ├── file.go           # File type (read/write with io.Pipe)
│   └── compat.go         # io/fs.FS compatibility adapter
├── id/                   # TypeID-based identifiers
│   └── id.go             # ID type, prefixes (obj_, bkt_, str_, chk_, ...), Parse, New
├── trovetest/            # Conformance test suite for driver implementations
├── internal/             # Internal utilities (router, checksum)
│   ├── router.go         # Multi-backend router (pattern + function routing)
│   └── checksum.go       # SHA-256, BLAKE3, XXHash integrity verification
└── extension/            # Forge extension for DI and lifecycle integration

Storage Pipeline

Every Trove operation follows a layered pipeline. Here is the flow for a Put (write) operation:

User Code

   t.Put(ctx, bucket, key, reader, opts...)

   ├── 1. Validate
   │       Reject empty bucket or key.
   │       Apply default bucket if configured.

   ├── 2. Middleware Pipeline (Write)
   │       Resolver evaluates all registrations against
   │       (ctx, bucket, key). Matching WriteMiddleware
   │       are sorted by priority and chained.

   │       reader → [encrypt] → [compress] → ... → transformed reader

   ├── 3. Router
   │       Resolve the backend for this (bucket, key):
   │         a. Route functions  (highest priority)
   │         b. Pattern routes   (filepath.Match)
   │         c. Default driver   (fallback)

   └── 4. Driver
           drv.Put(ctx, bucket, key, reader, opts...)
           Backend-specific storage (filesystem, S3, GCS, ...)

The Get (read) path is the reverse for middleware -- data flows from the driver through the read middleware pipeline (e.g., decryption, decompression) before reaching the caller.

User Code

   t.Get(ctx, bucket, key, opts...)

   ├── 1. Validate
   ├── 2. Router → resolve backend
   ├── 3. Driver.Get → *ObjectReader
   └── 4. Middleware Pipeline (Read)
           reader → [decompress] → [decrypt] → ... → caller

Multi-Backend Routing

The Router directs each operation to the correct storage backend based on the bucket and key. It supports three levels of routing, evaluated in priority order.

Resolution Priority

PriorityMechanismExample
1 (highest)Route functionsCustom Go function returning backend name
2Pattern routes"*.log" → "archive" (filepath.Match syntax)
3 (lowest)Default driverThe driver passed to trove.Open()

Configuration

Named backends are registered with WithBackend. Routes are added with WithRoute (pattern) and WithRouteFunc (function).

t, err := trove.Open(primaryDriver,
    // Named backends
    trove.WithBackend("archive", archiveDriver),
    trove.WithBackend("cache", cacheDriver),

    // Pattern-based routes
    trove.WithRoute("*.log", "archive"),
    trove.WithRoute("tmp/*", "cache"),

    // Function-based route
    trove.WithRouteFunc(func(bucket, key string) string {
        if bucket == "compliance" {
            return "archive"
        }
        return "" // empty string = fall through
    }),
)

Direct Backend Access

You can bypass routing entirely and pin operations to a specific backend:

archiveTrove, err := t.Backend("archive")
// All operations on archiveTrove go directly to the archive driver.
archiveTrove.Put(ctx, "backups", "db.sql.gz", reader)

Middleware Pipeline

The middleware system intercepts the data stream (not HTTP requests), so it works identically whether objects are accessed via the Go API, REST, or WebSocket.

Direction Model

Each middleware declares which data paths it participates in:

DirectionConstantDescription
ReadDirectionReadDownloads / Gets -- wraps the reader after the driver
WriteDirectionWriteUploads / Puts -- wraps the reader before the driver
ReadWriteDirectionReadWriteBoth paths (e.g., encryption, compression)

Interfaces

A middleware implements one or both of:

// ReadMiddleware intercepts data being read (downloaded).
type ReadMiddleware interface {
    WrapReader(ctx context.Context, r io.ReadCloser, info *driver.ObjectInfo) (io.ReadCloser, error)
}

// WriteMiddleware intercepts data being written (uploaded).
type WriteMiddleware interface {
    WrapWriter(ctx context.Context, w io.WriteCloser, key string) (io.WriteCloser, error)
}

Scope Model

Scopes control when a middleware is active. They are evaluated at runtime against each operation's context, bucket, and key.

ScopeConstructorMatches
ScopeGlobal(default)Every operation
ScopeBucketForBuckets("a", "b")Specific bucket names
ScopeKeyPatternForKeys("*.jpg", "docs/*")Glob patterns on keys
ScopeContentTypeForContentTypes("image/*")MIME type prefixes
ScopeFuncWhen(fn)Arbitrary predicate function

Boolean combinators compose scopes:

// Encrypt images in the "private" bucket only.
scope := middleware.And(
    middleware.ForBuckets("private"),
    middleware.ForKeys("*.jpg", "*.png", "*.gif"),
)

// Compress everything except the "raw" bucket.
scope := middleware.Not(middleware.ForBuckets("raw"))

Registration and Priority

Middleware is registered with a Registration struct that binds a middleware instance to a scope, optional direction override, and priority.

t.UseMiddleware(middleware.Registration{
    Middleware: encrypt.New(encrypt.WithKeyProvider(kp)),
    Scope:     middleware.ForBuckets("secrets"),
    Priority:  -10, // runs before default priority (0)
})
  • Priority controls execution order. Lower values run first.
  • Direction override allows registering a bidirectional middleware for only reads or writes.
  • The Resolver evaluates all registrations, filters by scope match, sorts by priority, and returns the ordered pipeline.

Pipeline Resolution

The Resolver assembles the effective pipeline per-operation with LRU caching:

Registration[]

   ├── Filter by direction (Read or Write)
   ├── Filter by scope match (ctx, bucket, key)
   ├── Sort by priority (ascending), then registration order

   └── Pipeline: [mw1, mw2, mw3, ...]

Streaming Engine

The streaming engine provides managed, chunked data transfer with backpressure, resumability, and lifecycle hooks. It is used for large object transfers where memory efficiency and progress tracking are critical.

Stream Lifecycle

          ┌──────────┐
          │   Idle   │  ← NewStream()
          └────┬─────┘
               │ Start()
          ┌────▼─────┐
    ┌─────│  Active  │─────┐
    │     └────┬─────┘     │
    │ Pause    │ Complete() │ Fail()
    │          │            │
┌───▼────┐ ┌──▼────────┐ ┌─▼───────┐
│ Paused │ │ Completing │ │ Failed  │
└───┬────┘ └──┬────────┘ └─────────┘
    │ Resume   │
    │          │
    └──────────┘ ┌──────────┐
               └►│ Completed│
                 └──────────┘

Terminal states: Completed, Failed, Cancelled. Once a stream reaches a terminal state, no further transitions are possible.

ChunkPool

The ChunkPool wraps sync.Pool with fixed-size byte buffers for zero-copy chunk allocation. Buffers are drawn with Get() and returned with Put(), avoiding garbage collection pressure during large transfers.

pool := stream.NewChunkPool(8 * 1024 * 1024) // 8 MB chunks

buf := pool.Get()   // allocate or reuse
defer pool.Put(buf)  // return to pool

Backpressure Modes

When a consumer cannot keep up with the producer, the backpressure strategy determines behavior:

ModeConstantBehavior
BlockBackpressureBlockBlocks the sender until the consumer catches up (default)
DropBackpressureDropDrops the oldest unacknowledged chunk and retries
BufferBackpressureBufferSpills chunks to an overflow buffer (up to max capacity)
AdaptiveBackpressureAdaptiveDynamically adjusts based on throughput vs. consumption rate

Hooks

Streams support four lifecycle hooks:

HookSignatureCalled
Progressfunc(Progress)Periodically with transfer progress (bytes, percent, speed)
Chunkfunc(*Chunk, ChunkAck)After each chunk is sent/acknowledged
Completefunc(*driver.ObjectInfo)When the stream completes successfully
Errorfunc(error)When the stream encounters an error
s, err := t.Stream(ctx, "uploads", "video.mp4", stream.DirectionUpload,
    stream.WithOnProgress(func(p stream.Progress) {
        fmt.Printf("progress: %d%%  speed: %.1f MB/s\n", p.Percent, p.Speed/1e6)
    }),
    stream.WithOnComplete(func(info *driver.ObjectInfo) {
        fmt.Printf("upload complete: %s\n", info.ETag)
    }),
    stream.WithResumable(),
    stream.WithBackpressure(stream.BackpressureAdaptive),
)

CAS Architecture

The content-addressable storage (CAS) engine stores objects under their content hash, enabling automatic deduplication and integrity verification.

Core Operations

OperationMethodDescription
Storecas.Store(ctx, reader)Hash content, store if new, increment ref count if existing
Retrievecas.Retrieve(ctx, hash)Get content by hash
Existscas.Exists(ctx, hash)Check if hash exists
Pincas.Pin(ctx, hash)Prevent garbage collection
Unpincas.Unpin(ctx, hash)Allow garbage collection
GCcas.GC(ctx)Remove unreferenced, unpinned objects

Reference Counting and Pinning

Every Store call that produces a duplicate hash increments the reference count instead of storing a copy. When references are removed (via DecrementRef), the ref count decreases. Objects with RefCount == 0 and Pinned == false are eligible for garbage collection.

Garbage Collection Cycle

GC(ctx)

   ├── 1. Index.ListUnpinned()
   │       Returns all entries with RefCount=0 AND Pinned=false

   ├── 2. For each entry:
   │       ├── driver.Delete(bucket, key)  -- remove from storage
   │       └── index.Delete(hash)          -- remove from index

   └── 3. Return GCResult { Scanned, Deleted, FreedBytes, Errors }

Index Interface

The Index interface abstracts the CAS metadata store. The default MemoryIndex is suitable for testing. Production deployments should use a database-backed implementation.

cas := cas.New(driver,
    cas.WithAlgorithm(cas.AlgSHA256),
    cas.WithBucket("cas-store"),
    cas.WithIndex(myDatabaseIndex),
)

hash, info, err := cas.Store(ctx, reader)

VFS Architecture

The virtual filesystem (VFS) adapts Trove's flat key/value object storage into a hierarchical directory structure. It provides familiar filesystem operations and compatibility with Go's standard io/fs.FS interface.

Flat-to-Hierarchical Mapping

Object keys are treated as file paths with "/" as the directory separator. Directories are inferred from key prefixes and can optionally be represented by zero-length marker objects with a trailing "/".

Object KeyVFS PathType
docs/readme.md/docs/readme.mdFile
docs//docsDirectory (marker)
photos/2024/img.jpg/photos/2024/img.jpgFile

Operations

OperationMethodDescription
Statfs.Stat(ctx, path)File info (checks file, then directory children)
ReadDirfs.ReadDir(ctx, path)List directory entries
Walkfs.Walk(ctx, root, fn)Recursive traversal
Openfs.Open(ctx, path)Open file for reading
Createfs.Create(ctx, path)Create/overwrite file (pipe-based writer)
Mkdirfs.Mkdir(ctx, path)Create directory marker
Removefs.Remove(ctx, path)Delete a file
RemoveAllfs.RemoveAll(ctx, path)Delete path and all children
Renamefs.Rename(ctx, old, new)Move/rename via copy + delete

Usage

vfs := t.VFS("documents")

// Create a file.
f, _ := vfs.Create(ctx, "reports/q4.txt")
f.Write([]byte("Q4 earnings report"))
f.Close()

// Read it back.
f, _ = vfs.Open(ctx, "reports/q4.txt")
data, _ := io.ReadAll(f)
f.Close()

// List a directory.
entries, _ := vfs.ReadDir(ctx, "reports")
for _, e := range entries {
    fmt.Println(e.Name(), e.IsDir())
}

Capability Interface Model

The core driver.Driver interface is intentionally minimal. Advanced features are exposed via optional capability interfaces. Consumers use type assertions to check for and access these capabilities at runtime.

CapabilityInterfaceMethods
Multipart uploadsMultipartDriverInitiateMultipart, UploadPart, CompleteMultipart, AbortMultipart
Pre-signed URLsPresignDriverPresignGet, PresignPut
Object versioningVersioningDriverListVersions, GetVersion, DeleteVersion, RestoreVersion
Change notificationsNotificationDriverWatch (returns <-chan ObjectEvent)
Lifecycle rulesLifecycleDriverSetLifecycle, GetLifecycle
Byte-range readsRangeDriverGetRange (offset + length)
Server-side copyServerCopyDriverServerCopy (avoids download + re-upload)

Type Assertion Pattern

drv := t.Driver()

// Check if the driver supports pre-signed URLs.
if presigner, ok := drv.(driver.PresignDriver); ok {
    url, err := presigner.PresignGet(ctx, "uploads", "report.pdf", 15*time.Minute)
    fmt.Println("download URL:", url)
}

// Check for multipart upload support.
if mp, ok := drv.(driver.MultipartDriver); ok {
    uploadID, _ := mp.InitiateMultipart(ctx, "uploads", "large-file.bin")
    // ... upload parts ...
    mp.CompleteMultipart(ctx, "uploads", "large-file.bin", uploadID, parts)
}

This pattern avoids bloating the base Driver interface. Drivers only implement the capabilities they support, and callers gracefully degrade when a capability is absent.

TypeID System

Every entity in Trove uses a id.ID type based on TypeID. IDs are:

  • Prefixed -- the prefix identifies the entity type (e.g., obj_, str_)
  • K-sortable -- UUIDv7-based suffix, so lexicographic order matches creation order
  • Globally unique -- collision-resistant across distributed systems
  • URL-safe -- no special characters, safe for use in paths and query strings

Prefixes

PrefixEntityConstructor
obj_Objectid.NewObjectID()
bkt_Bucketid.NewBucketID()
upl_Upload sessionid.NewUploadSessionID()
dwn_Download sessionid.NewDownloadSessionID()
str_Streamid.NewStreamID()
pol_Poolid.NewPoolID()
ver_Versionid.NewVersionID()
chk_Chunkid.NewChunkID()

Usage

import "github.com/xraph/trove/id"

// Generate a new object ID.
objID := id.NewObjectID()
fmt.Println(objID) // "obj_01h455vb4pex5vsknk084sn02q"

// Parse and validate.
parsed, err := id.ParseObjectID("obj_01h455vb4pex5vsknk084sn02q")

// Check the prefix.
fmt.Println(parsed.Prefix()) // "obj"

// IDs implement encoding.TextMarshaler, sql.Scanner, and bson.ValueMarshaler
// for seamless serialization to JSON, databases, and MongoDB.

Module Dependency Graph

trove (core)
├── trove/driver           (zero deps -- defines Driver, ObjectInfo, capability interfaces)
├── trove/id               (external: go.jetify.com/typeid)
├── trove/internal          (depends on driver)
├── trove/middleware         (depends on driver)
│   ├── middleware/encrypt  (depends on middleware)
│   ├── middleware/compress (depends on middleware, klauspost/compress)
│   ├── middleware/dedup    (depends on middleware, cas)
│   ├── middleware/scan     (depends on middleware)
│   └── middleware/watermark(depends on middleware)
├── trove/stream            (depends on driver, id)
├── trove/cas               (depends on driver)
├── trove/vfs               (depends on driver -- uses Storage interface to avoid circular import)
├── trove/drivers/localdriver   (depends on driver)
├── trove/drivers/memdriver     (depends on driver)
├── trove/drivers/s3driver      (depends on driver, aws-sdk-go-v2)
├── trove/drivers/gcsdriver     (depends on driver, cloud.google.com/go/storage)
├── trove/drivers/azuredriver   (depends on driver, azure-sdk-for-go)
├── trove/drivers/sftpdriver    (depends on driver, pkg/sftp)
├── trove/trovetest         (depends on driver)
└── trove/extension          (depends on trove, Forge framework)

The stream package depends only on driver and id -- no dependency on the core trove package. The vfs package defines a Storage interface mirroring Trove's methods to avoid circular imports. The cas package depends only on driver, keeping it composable outside of Trove.

On this page