Trove

Streaming Engine

Managed streaming with backpressure, chunked transfers, buffer pooling, and concurrency control.

The streaming engine provides managed, chunked data transfer between clients and storage backends. It handles concurrency limits, backpressure, buffer pooling, and resumable transfers.

Quick Start

import (
    "context"
    "github.com/xraph/trove"
    "github.com/xraph/trove/stream"
    "github.com/xraph/trove/drivers/memdriver"
)

drv := memdriver.New()
t, _ := trove.Open(drv)
defer t.Close(context.Background())

ctx := context.Background()
t.CreateBucket(ctx, "media")

// Create a managed upload stream.
s, err := t.Stream(ctx, "media", "video.mp4", stream.DirectionUpload,
    stream.WithChunkSize(4 * 1024 * 1024),
    stream.WithResumable(),
    stream.WithOnProgress(func(p stream.Progress) {
        fmt.Printf("\r%d%% (%0.f bytes/s)", p.Percent, p.Speed)
    }),
)
if err != nil {
    log.Fatal(err)
}
defer t.Pool().Release(s)

Stream Lifecycle

Every stream follows a state machine:

idle → active → paused ⇄ active → completing → completed
                                ↘ failed
                                ↘ cancelled
StateDescription
idleCreated but not started
activeActively transferring data
pausedTemporarily suspended (resumable)
completingFinal chunk sent, awaiting confirmation
completedTransfer finished successfully
failedTransfer encountered an unrecoverable error
cancelledTransfer was explicitly cancelled

State Transitions

// Start the stream (idle → active).
s.Start()

// Write chunks while active.
buf := s.ChunkPool().Get()
copy(buf, data)
s.Write(&stream.Chunk{Index: 0, Data: buf, Size: len(data)})

// Pause and resume.
s.Control(stream.ControlMsg{Type: stream.CtrlPause})
s.Control(stream.ControlMsg{Type: stream.CtrlResume})

// Complete the stream (active → completing → completed).
s.Complete(&driver.ObjectInfo{Key: "video.mp4", Size: totalSize})

// Or cancel/fail.
s.Control(stream.ControlMsg{Type: stream.CtrlCancel})
s.Fail(errors.New("upload interrupted"))

Directions

Streams support three data flow directions:

DirectionDescription
DirectionUploadClient → Storage
DirectionDownloadStorage → Client
DirectionBiDiBidirectional (sync, mirroring)

Stream Pool

The Pool manages concurrent streams with shared resources, concurrency limits, and bandwidth throttling.

pool := stream.NewPool("uploads", stream.PoolConfig{
    MaxStreams:        8,
    ChunkSize:         16 * 1024 * 1024, // 16MB chunks
    StreamTimeout:     30 * time.Minute,
    IdleTimeout:       5 * time.Minute,
    BackpressureMode:  stream.BackpressureBlock,
})
defer pool.Close()

// Acquire a stream (blocks if pool is full).
s, err := pool.Acquire(ctx, stream.DirectionUpload, "media", "video.mp4")
if err != nil {
    log.Fatal(err)
}
defer pool.Release(s)

// Check pool status.
fmt.Println("Active:", pool.ActiveCount())
snap := pool.Metrics.Snapshot()
fmt.Printf("Total: %d bytes, %d chunks\n", snap.TotalBytes, snap.TotalChunks)

Pool via Trove

Every Trove instance has a default pool configured from PoolSize and ChunkSize:

t, _ := trove.Open(drv, trove.WithPoolSize(32))

// Use the integrated Stream() method.
s, _ := t.Stream(ctx, "bucket", "key", stream.DirectionUpload)
defer t.Pool().Release(s)

Or provide a custom pool config:

t, _ := trove.Open(drv, trove.WithPoolConfig(stream.PoolConfig{
    MaxStreams: 64,
    ChunkSize:  32 * 1024 * 1024,
}))

Chunk Pool

The ChunkPool manages reusable byte buffers to avoid allocation pressure during high-throughput transfers:

pool := stream.NewChunkPool(8 * 1024 * 1024) // 8MB buffers

buf := pool.Get()       // Get a buffer from the pool.
defer pool.Put(buf)     // Return it when done.

stats := pool.Stats()
fmt.Printf("Allocated: %d, Released: %d\n", stats.Allocated, stats.Released)

Buffers are automatically managed when using Stream.ChunkPool():

buf := s.ChunkPool().Get()
copy(buf, data)
s.Write(&stream.Chunk{Index: idx, Data: buf, Size: n})
// The consumer is responsible for returning the buffer via ChunkPool().Put(chunk.Data).

Backpressure

When a stream's chunk channel is full (consumer can't keep up), the backpressure strategy determines what happens:

StrategyBehavior
BackpressureBlockBlocks the sender until the consumer catches up (default)
BackpressureDropDrops the oldest unacknowledged chunk
BackpressureBufferSpills chunks to an overflow buffer
BackpressureAdaptiveBlocks and signals chunk size should be adjusted
s, _ := t.Stream(ctx, "bucket", "key", stream.DirectionUpload,
    stream.WithBackpressure(stream.BackpressureDrop),
    stream.WithChannelSize(32),
)

Hooks

Register callbacks for stream lifecycle events:

s, _ := t.Stream(ctx, "bucket", "key", stream.DirectionUpload,
    stream.WithOnProgress(func(p stream.Progress) {
        fmt.Printf("%d%% complete\n", p.Percent)
    }),
    stream.WithOnChunk(func(c *stream.Chunk, ack stream.ChunkAck) {
        log.Printf("chunk %d processed", c.Index)
    }),
    stream.WithOnComplete(func(info *driver.ObjectInfo) {
        log.Printf("upload complete: %s (%d bytes)", info.Key, info.Size)
    }),
    stream.WithOnError(func(err error) {
        log.Printf("stream error: %v", err)
    }),
)

Metrics

Both pools and individual streams expose real-time metrics:

Pool Metrics

snap := pool.Metrics.Snapshot()
fmt.Printf("Active: %d, Total bytes: %d, Failed: %d\n",
    snap.ActiveStreams, snap.TotalBytes, snap.FailedStreams)

Stream Metrics

m := s.Metrics().StreamSnapshot()
fmt.Printf("Sent: %d, Recv: %d, Throughput: %.0f bytes/s\n",
    m.BytesSent, m.BytesRecv, m.Throughput)

Resumable Transfers

Enable offset tracking for resumable uploads and downloads:

s, _ := t.Stream(ctx, "bucket", "large-file.zip", stream.DirectionUpload,
    stream.WithResumable(),
)

// Check current offset.
fmt.Println("Offset:", s.Offset())

// Seek to a specific byte position.
s.Control(stream.ControlMsg{Type: stream.CtrlSeek, Offset: lastKnownOffset})

RangeDriver Capability

Drivers that implement RangeDriver support byte-range reads for efficient partial downloads:

type RangeDriver interface {
    driver.Driver
    GetRange(ctx context.Context, bucket, key string, offset, length int64) (*ObjectReader, error)
}

This enables resumable downloads starting from a specific byte offset without re-reading the entire object.

Stream Options Reference

OptionDescription
WithChunkSize(n)Set chunk size in bytes
WithResumable()Enable resumable transfer with offset tracking
WithPool(name)Assign stream to a named pool
WithChannelSize(n)Set the chunk channel buffer size
WithBackpressure(mode)Set backpressure strategy
WithOnProgress(fn)Register progress callback
WithOnChunk(fn)Register per-chunk callback
WithOnComplete(fn)Register completion callback
WithOnError(fn)Register error callback

On this page