Architecture
Trove's layered architecture: storage pipeline, multi-backend routing, middleware, streaming, CAS, and VFS.
Package Overview
trove/
├── trove.go # Core Trove type and operations (Put, Get, Delete, Head, List, Copy)
├── config.go # Config type, DefaultConfig(), checksum and retry settings
├── options.go # Functional options: WithBackend, WithRoute, WithVFS, etc.
├── errors.go # Sentinel errors (ErrNotFound, ErrBucketEmpty, etc.)
├── driver/ # Driver interface + capability interfaces + option types
│ ├── driver.go # Core Driver interface, ObjectInfo, ObjectReader, ObjectIterator
│ ├── options.go # PutOption, GetOption, ListOption, CopyOption, BucketOption
│ ├── capability.go # MultipartDriver, PresignDriver, VersioningDriver, etc.
│ ├── dsn.go # DSN parser for connection strings
│ └── registry.go # Global driver registry
├── drivers/ # Built-in storage backend drivers
│ ├── localdriver/ # Local filesystem (file:///path DSN)
│ ├── memdriver/ # In-memory (testing, no persistence)
│ ├── s3driver/ # AWS S3 / MinIO / S3-compatible
│ ├── gcsdriver/ # Google Cloud Storage
│ ├── azuredriver/ # Azure Blob Storage
│ └── sftpdriver/ # SFTP remote storage
├── middleware/ # Scope-aware middleware pipeline framework
│ ├── middleware.go # Middleware, ReadMiddleware, WriteMiddleware interfaces
│ ├── registration.go # Registration type with scope, direction, priority
│ ├── resolver.go # Resolver: assembles per-operation pipeline with LRU cache
│ ├── scope.go # Scope interface + built-in scopes (Global, Bucket, Key, etc.)
│ ├── helpers.go # Convenience constructors: ForBuckets, ForKeys, And, Or, Not
│ ├── cache.go # LRU scope cache for resolved pipelines
│ ├── encrypt/ # AES-256-GCM encryption with KeyProvider interface
│ ├── compress/ # Zstd compression with min-size and skip-extension logic
│ ├── dedup/ # Content deduplication via CAS integration
│ ├── scan/ # Content scanning (virus, policy)
│ └── watermark/ # Metadata watermarking
├── stream/ # Managed streaming engine
│ ├── stream.go # Stream type, state machine, lifecycle hooks
│ ├── chunk.go # Chunk, ChunkAck, ChunkPool (sync.Pool-based)
│ ├── pool.go # Pool: concurrency-limited stream manager
│ ├── backpressure.go # Backpressure strategies (Block, Drop, Buffer, Adaptive)
│ └── metrics.go # Per-stream transfer metrics
├── cas/ # Content-addressable storage
│ ├── cas.go # CAS type: Store, Retrieve, Exists, Pin, Unpin, GC
│ ├── index.go # Index interface + MemoryIndex implementation
│ ├── hash.go # Hash algorithms (SHA-256, BLAKE3)
│ └── gc.go # Garbage collection cycle
├── vfs/ # Virtual filesystem over object storage
│ ├── vfs.go # FS type: Stat, ReadDir, Walk, Open, Create, Mkdir, Remove, Rename
│ ├── file.go # File type (read/write with io.Pipe)
│ └── compat.go # io/fs.FS compatibility adapter
├── id/ # TypeID-based identifiers
│ └── id.go # ID type, prefixes (obj_, bkt_, str_, chk_, ...), Parse, New
├── trovetest/ # Conformance test suite for driver implementations
├── internal/ # Internal utilities (router, checksum)
│ ├── router.go # Multi-backend router (pattern + function routing)
│ └── checksum.go # SHA-256, BLAKE3, XXHash integrity verification
└── extension/ # Forge extension for DI and lifecycle integrationStorage Pipeline
Every Trove operation follows a layered pipeline. Here is the flow for a Put (write) operation:
User Code
│
t.Put(ctx, bucket, key, reader, opts...)
│
├── 1. Validate
│ Reject empty bucket or key.
│ Apply default bucket if configured.
│
├── 2. Middleware Pipeline (Write)
│ Resolver evaluates all registrations against
│ (ctx, bucket, key). Matching WriteMiddleware
│ are sorted by priority and chained.
│
│ reader → [encrypt] → [compress] → ... → transformed reader
│
├── 3. Router
│ Resolve the backend for this (bucket, key):
│ a. Route functions (highest priority)
│ b. Pattern routes (filepath.Match)
│ c. Default driver (fallback)
│
└── 4. Driver
drv.Put(ctx, bucket, key, reader, opts...)
Backend-specific storage (filesystem, S3, GCS, ...)The Get (read) path is the reverse for middleware -- data flows from the driver through the read middleware pipeline (e.g., decryption, decompression) before reaching the caller.
User Code
│
t.Get(ctx, bucket, key, opts...)
│
├── 1. Validate
├── 2. Router → resolve backend
├── 3. Driver.Get → *ObjectReader
└── 4. Middleware Pipeline (Read)
reader → [decompress] → [decrypt] → ... → callerMulti-Backend Routing
The Router directs each operation to the correct storage backend based on the bucket and key. It supports three levels of routing, evaluated in priority order.
Resolution Priority
| Priority | Mechanism | Example |
|---|---|---|
| 1 (highest) | Route functions | Custom Go function returning backend name |
| 2 | Pattern routes | "*.log" → "archive" (filepath.Match syntax) |
| 3 (lowest) | Default driver | The driver passed to trove.Open() |
Configuration
Named backends are registered with WithBackend. Routes are added with WithRoute (pattern) and WithRouteFunc (function).
t, err := trove.Open(primaryDriver,
// Named backends
trove.WithBackend("archive", archiveDriver),
trove.WithBackend("cache", cacheDriver),
// Pattern-based routes
trove.WithRoute("*.log", "archive"),
trove.WithRoute("tmp/*", "cache"),
// Function-based route
trove.WithRouteFunc(func(bucket, key string) string {
if bucket == "compliance" {
return "archive"
}
return "" // empty string = fall through
}),
)Direct Backend Access
You can bypass routing entirely and pin operations to a specific backend:
archiveTrove, err := t.Backend("archive")
// All operations on archiveTrove go directly to the archive driver.
archiveTrove.Put(ctx, "backups", "db.sql.gz", reader)Middleware Pipeline
The middleware system intercepts the data stream (not HTTP requests), so it works identically whether objects are accessed via the Go API, REST, or WebSocket.
Direction Model
Each middleware declares which data paths it participates in:
| Direction | Constant | Description |
|---|---|---|
| Read | DirectionRead | Downloads / Gets -- wraps the reader after the driver |
| Write | DirectionWrite | Uploads / Puts -- wraps the reader before the driver |
| ReadWrite | DirectionReadWrite | Both paths (e.g., encryption, compression) |
Interfaces
A middleware implements one or both of:
// ReadMiddleware intercepts data being read (downloaded).
type ReadMiddleware interface {
WrapReader(ctx context.Context, r io.ReadCloser, info *driver.ObjectInfo) (io.ReadCloser, error)
}
// WriteMiddleware intercepts data being written (uploaded).
type WriteMiddleware interface {
WrapWriter(ctx context.Context, w io.WriteCloser, key string) (io.WriteCloser, error)
}Scope Model
Scopes control when a middleware is active. They are evaluated at runtime against each operation's context, bucket, and key.
| Scope | Constructor | Matches |
|---|---|---|
ScopeGlobal | (default) | Every operation |
ScopeBucket | ForBuckets("a", "b") | Specific bucket names |
ScopeKeyPattern | ForKeys("*.jpg", "docs/*") | Glob patterns on keys |
ScopeContentType | ForContentTypes("image/*") | MIME type prefixes |
ScopeFunc | When(fn) | Arbitrary predicate function |
Boolean combinators compose scopes:
// Encrypt images in the "private" bucket only.
scope := middleware.And(
middleware.ForBuckets("private"),
middleware.ForKeys("*.jpg", "*.png", "*.gif"),
)
// Compress everything except the "raw" bucket.
scope := middleware.Not(middleware.ForBuckets("raw"))Registration and Priority
Middleware is registered with a Registration struct that binds a middleware instance to a scope, optional direction override, and priority.
t.UseMiddleware(middleware.Registration{
Middleware: encrypt.New(encrypt.WithKeyProvider(kp)),
Scope: middleware.ForBuckets("secrets"),
Priority: -10, // runs before default priority (0)
})- Priority controls execution order. Lower values run first.
- Direction override allows registering a bidirectional middleware for only reads or writes.
- The
Resolverevaluates all registrations, filters by scope match, sorts by priority, and returns the ordered pipeline.
Pipeline Resolution
The Resolver assembles the effective pipeline per-operation with LRU caching:
Registration[]
│
├── Filter by direction (Read or Write)
├── Filter by scope match (ctx, bucket, key)
├── Sort by priority (ascending), then registration order
│
└── Pipeline: [mw1, mw2, mw3, ...]Streaming Engine
The streaming engine provides managed, chunked data transfer with backpressure, resumability, and lifecycle hooks. It is used for large object transfers where memory efficiency and progress tracking are critical.
Stream Lifecycle
┌──────────┐
│ Idle │ ← NewStream()
└────┬─────┘
│ Start()
┌────▼─────┐
┌─────│ Active │─────┐
│ └────┬─────┘ │
│ Pause │ Complete() │ Fail()
│ │ │
┌───▼────┐ ┌──▼────────┐ ┌─▼───────┐
│ Paused │ │ Completing │ │ Failed │
└───┬────┘ └──┬────────┘ └─────────┘
│ Resume │
│ │
└──────────┘ ┌──────────┐
└►│ Completed│
└──────────┘Terminal states: Completed, Failed, Cancelled. Once a stream reaches a terminal state, no further transitions are possible.
ChunkPool
The ChunkPool wraps sync.Pool with fixed-size byte buffers for zero-copy chunk allocation. Buffers are drawn with Get() and returned with Put(), avoiding garbage collection pressure during large transfers.
pool := stream.NewChunkPool(8 * 1024 * 1024) // 8 MB chunks
buf := pool.Get() // allocate or reuse
defer pool.Put(buf) // return to poolBackpressure Modes
When a consumer cannot keep up with the producer, the backpressure strategy determines behavior:
| Mode | Constant | Behavior |
|---|---|---|
| Block | BackpressureBlock | Blocks the sender until the consumer catches up (default) |
| Drop | BackpressureDrop | Drops the oldest unacknowledged chunk and retries |
| Buffer | BackpressureBuffer | Spills chunks to an overflow buffer (up to max capacity) |
| Adaptive | BackpressureAdaptive | Dynamically adjusts based on throughput vs. consumption rate |
Hooks
Streams support four lifecycle hooks:
| Hook | Signature | Called |
|---|---|---|
| Progress | func(Progress) | Periodically with transfer progress (bytes, percent, speed) |
| Chunk | func(*Chunk, ChunkAck) | After each chunk is sent/acknowledged |
| Complete | func(*driver.ObjectInfo) | When the stream completes successfully |
| Error | func(error) | When the stream encounters an error |
s, err := t.Stream(ctx, "uploads", "video.mp4", stream.DirectionUpload,
stream.WithOnProgress(func(p stream.Progress) {
fmt.Printf("progress: %d%% speed: %.1f MB/s\n", p.Percent, p.Speed/1e6)
}),
stream.WithOnComplete(func(info *driver.ObjectInfo) {
fmt.Printf("upload complete: %s\n", info.ETag)
}),
stream.WithResumable(),
stream.WithBackpressure(stream.BackpressureAdaptive),
)CAS Architecture
The content-addressable storage (CAS) engine stores objects under their content hash, enabling automatic deduplication and integrity verification.
Core Operations
| Operation | Method | Description |
|---|---|---|
| Store | cas.Store(ctx, reader) | Hash content, store if new, increment ref count if existing |
| Retrieve | cas.Retrieve(ctx, hash) | Get content by hash |
| Exists | cas.Exists(ctx, hash) | Check if hash exists |
| Pin | cas.Pin(ctx, hash) | Prevent garbage collection |
| Unpin | cas.Unpin(ctx, hash) | Allow garbage collection |
| GC | cas.GC(ctx) | Remove unreferenced, unpinned objects |
Reference Counting and Pinning
Every Store call that produces a duplicate hash increments the reference count instead of storing a copy. When references are removed (via DecrementRef), the ref count decreases. Objects with RefCount == 0 and Pinned == false are eligible for garbage collection.
Garbage Collection Cycle
GC(ctx)
│
├── 1. Index.ListUnpinned()
│ Returns all entries with RefCount=0 AND Pinned=false
│
├── 2. For each entry:
│ ├── driver.Delete(bucket, key) -- remove from storage
│ └── index.Delete(hash) -- remove from index
│
└── 3. Return GCResult { Scanned, Deleted, FreedBytes, Errors }Index Interface
The Index interface abstracts the CAS metadata store. The default MemoryIndex is suitable for testing. Production deployments should use a database-backed implementation.
cas := cas.New(driver,
cas.WithAlgorithm(cas.AlgSHA256),
cas.WithBucket("cas-store"),
cas.WithIndex(myDatabaseIndex),
)
hash, info, err := cas.Store(ctx, reader)VFS Architecture
The virtual filesystem (VFS) adapts Trove's flat key/value object storage into a hierarchical directory structure. It provides familiar filesystem operations and compatibility with Go's standard io/fs.FS interface.
Flat-to-Hierarchical Mapping
Object keys are treated as file paths with "/" as the directory separator. Directories are inferred from key prefixes and can optionally be represented by zero-length marker objects with a trailing "/".
| Object Key | VFS Path | Type |
|---|---|---|
docs/readme.md | /docs/readme.md | File |
docs/ | /docs | Directory (marker) |
photos/2024/img.jpg | /photos/2024/img.jpg | File |
Operations
| Operation | Method | Description |
|---|---|---|
| Stat | fs.Stat(ctx, path) | File info (checks file, then directory children) |
| ReadDir | fs.ReadDir(ctx, path) | List directory entries |
| Walk | fs.Walk(ctx, root, fn) | Recursive traversal |
| Open | fs.Open(ctx, path) | Open file for reading |
| Create | fs.Create(ctx, path) | Create/overwrite file (pipe-based writer) |
| Mkdir | fs.Mkdir(ctx, path) | Create directory marker |
| Remove | fs.Remove(ctx, path) | Delete a file |
| RemoveAll | fs.RemoveAll(ctx, path) | Delete path and all children |
| Rename | fs.Rename(ctx, old, new) | Move/rename via copy + delete |
Usage
vfs := t.VFS("documents")
// Create a file.
f, _ := vfs.Create(ctx, "reports/q4.txt")
f.Write([]byte("Q4 earnings report"))
f.Close()
// Read it back.
f, _ = vfs.Open(ctx, "reports/q4.txt")
data, _ := io.ReadAll(f)
f.Close()
// List a directory.
entries, _ := vfs.ReadDir(ctx, "reports")
for _, e := range entries {
fmt.Println(e.Name(), e.IsDir())
}Capability Interface Model
The core driver.Driver interface is intentionally minimal. Advanced features are exposed via optional capability interfaces. Consumers use type assertions to check for and access these capabilities at runtime.
| Capability | Interface | Methods |
|---|---|---|
| Multipart uploads | MultipartDriver | InitiateMultipart, UploadPart, CompleteMultipart, AbortMultipart |
| Pre-signed URLs | PresignDriver | PresignGet, PresignPut |
| Object versioning | VersioningDriver | ListVersions, GetVersion, DeleteVersion, RestoreVersion |
| Change notifications | NotificationDriver | Watch (returns <-chan ObjectEvent) |
| Lifecycle rules | LifecycleDriver | SetLifecycle, GetLifecycle |
| Byte-range reads | RangeDriver | GetRange (offset + length) |
| Server-side copy | ServerCopyDriver | ServerCopy (avoids download + re-upload) |
Type Assertion Pattern
drv := t.Driver()
// Check if the driver supports pre-signed URLs.
if presigner, ok := drv.(driver.PresignDriver); ok {
url, err := presigner.PresignGet(ctx, "uploads", "report.pdf", 15*time.Minute)
fmt.Println("download URL:", url)
}
// Check for multipart upload support.
if mp, ok := drv.(driver.MultipartDriver); ok {
uploadID, _ := mp.InitiateMultipart(ctx, "uploads", "large-file.bin")
// ... upload parts ...
mp.CompleteMultipart(ctx, "uploads", "large-file.bin", uploadID, parts)
}This pattern avoids bloating the base Driver interface. Drivers only implement the capabilities they support, and callers gracefully degrade when a capability is absent.
TypeID System
Every entity in Trove uses a id.ID type based on TypeID. IDs are:
- Prefixed -- the prefix identifies the entity type (e.g.,
obj_,str_) - K-sortable -- UUIDv7-based suffix, so lexicographic order matches creation order
- Globally unique -- collision-resistant across distributed systems
- URL-safe -- no special characters, safe for use in paths and query strings
Prefixes
| Prefix | Entity | Constructor |
|---|---|---|
obj_ | Object | id.NewObjectID() |
bkt_ | Bucket | id.NewBucketID() |
upl_ | Upload session | id.NewUploadSessionID() |
dwn_ | Download session | id.NewDownloadSessionID() |
str_ | Stream | id.NewStreamID() |
pol_ | Pool | id.NewPoolID() |
ver_ | Version | id.NewVersionID() |
chk_ | Chunk | id.NewChunkID() |
Usage
import "github.com/xraph/trove/id"
// Generate a new object ID.
objID := id.NewObjectID()
fmt.Println(objID) // "obj_01h455vb4pex5vsknk084sn02q"
// Parse and validate.
parsed, err := id.ParseObjectID("obj_01h455vb4pex5vsknk084sn02q")
// Check the prefix.
fmt.Println(parsed.Prefix()) // "obj"
// IDs implement encoding.TextMarshaler, sql.Scanner, and bson.ValueMarshaler
// for seamless serialization to JSON, databases, and MongoDB.Module Dependency Graph
trove (core)
├── trove/driver (zero deps -- defines Driver, ObjectInfo, capability interfaces)
├── trove/id (external: go.jetify.com/typeid)
├── trove/internal (depends on driver)
├── trove/middleware (depends on driver)
│ ├── middleware/encrypt (depends on middleware)
│ ├── middleware/compress (depends on middleware, klauspost/compress)
│ ├── middleware/dedup (depends on middleware, cas)
│ ├── middleware/scan (depends on middleware)
│ └── middleware/watermark(depends on middleware)
├── trove/stream (depends on driver, id)
├── trove/cas (depends on driver)
├── trove/vfs (depends on driver -- uses Storage interface to avoid circular import)
├── trove/drivers/localdriver (depends on driver)
├── trove/drivers/memdriver (depends on driver)
├── trove/drivers/s3driver (depends on driver, aws-sdk-go-v2)
├── trove/drivers/gcsdriver (depends on driver, cloud.google.com/go/storage)
├── trove/drivers/azuredriver (depends on driver, azure-sdk-for-go)
├── trove/drivers/sftpdriver (depends on driver, pkg/sftp)
├── trove/trovetest (depends on driver)
└── trove/extension (depends on trove, Forge framework)The stream package depends only on driver and id -- no dependency on the core trove package. The vfs package defines a Storage interface mirroring Trove's methods to avoid circular imports. The cas package depends only on driver, keeping it composable outside of Trove.