Streaming Engine
Managed streaming with backpressure, chunked transfers, buffer pooling, and concurrency control.
The streaming engine provides managed, chunked data transfer between clients and storage backends. It handles concurrency limits, backpressure, buffer pooling, and resumable transfers.
Quick Start
import (
"context"
"github.com/xraph/trove"
"github.com/xraph/trove/stream"
"github.com/xraph/trove/drivers/memdriver"
)
drv := memdriver.New()
t, _ := trove.Open(drv)
defer t.Close(context.Background())
ctx := context.Background()
t.CreateBucket(ctx, "media")
// Create a managed upload stream.
s, err := t.Stream(ctx, "media", "video.mp4", stream.DirectionUpload,
stream.WithChunkSize(4 * 1024 * 1024),
stream.WithResumable(),
stream.WithOnProgress(func(p stream.Progress) {
fmt.Printf("\r%d%% (%0.f bytes/s)", p.Percent, p.Speed)
}),
)
if err != nil {
log.Fatal(err)
}
defer t.Pool().Release(s)Stream Lifecycle
Every stream follows a state machine:
idle → active → paused ⇄ active → completing → completed
↘ failed
↘ cancelled| State | Description |
|---|---|
idle | Created but not started |
active | Actively transferring data |
paused | Temporarily suspended (resumable) |
completing | Final chunk sent, awaiting confirmation |
completed | Transfer finished successfully |
failed | Transfer encountered an unrecoverable error |
cancelled | Transfer was explicitly cancelled |
State Transitions
// Start the stream (idle → active).
s.Start()
// Write chunks while active.
buf := s.ChunkPool().Get()
copy(buf, data)
s.Write(&stream.Chunk{Index: 0, Data: buf, Size: len(data)})
// Pause and resume.
s.Control(stream.ControlMsg{Type: stream.CtrlPause})
s.Control(stream.ControlMsg{Type: stream.CtrlResume})
// Complete the stream (active → completing → completed).
s.Complete(&driver.ObjectInfo{Key: "video.mp4", Size: totalSize})
// Or cancel/fail.
s.Control(stream.ControlMsg{Type: stream.CtrlCancel})
s.Fail(errors.New("upload interrupted"))Directions
Streams support three data flow directions:
| Direction | Description |
|---|---|
DirectionUpload | Client → Storage |
DirectionDownload | Storage → Client |
DirectionBiDi | Bidirectional (sync, mirroring) |
Stream Pool
The Pool manages concurrent streams with shared resources, concurrency limits, and bandwidth throttling.
pool := stream.NewPool("uploads", stream.PoolConfig{
MaxStreams: 8,
ChunkSize: 16 * 1024 * 1024, // 16MB chunks
StreamTimeout: 30 * time.Minute,
IdleTimeout: 5 * time.Minute,
BackpressureMode: stream.BackpressureBlock,
})
defer pool.Close()
// Acquire a stream (blocks if pool is full).
s, err := pool.Acquire(ctx, stream.DirectionUpload, "media", "video.mp4")
if err != nil {
log.Fatal(err)
}
defer pool.Release(s)
// Check pool status.
fmt.Println("Active:", pool.ActiveCount())
snap := pool.Metrics.Snapshot()
fmt.Printf("Total: %d bytes, %d chunks\n", snap.TotalBytes, snap.TotalChunks)Pool via Trove
Every Trove instance has a default pool configured from PoolSize and ChunkSize:
t, _ := trove.Open(drv, trove.WithPoolSize(32))
// Use the integrated Stream() method.
s, _ := t.Stream(ctx, "bucket", "key", stream.DirectionUpload)
defer t.Pool().Release(s)Or provide a custom pool config:
t, _ := trove.Open(drv, trove.WithPoolConfig(stream.PoolConfig{
MaxStreams: 64,
ChunkSize: 32 * 1024 * 1024,
}))Chunk Pool
The ChunkPool manages reusable byte buffers to avoid allocation pressure during high-throughput transfers:
pool := stream.NewChunkPool(8 * 1024 * 1024) // 8MB buffers
buf := pool.Get() // Get a buffer from the pool.
defer pool.Put(buf) // Return it when done.
stats := pool.Stats()
fmt.Printf("Allocated: %d, Released: %d\n", stats.Allocated, stats.Released)Buffers are automatically managed when using Stream.ChunkPool():
buf := s.ChunkPool().Get()
copy(buf, data)
s.Write(&stream.Chunk{Index: idx, Data: buf, Size: n})
// The consumer is responsible for returning the buffer via ChunkPool().Put(chunk.Data).Backpressure
When a stream's chunk channel is full (consumer can't keep up), the backpressure strategy determines what happens:
| Strategy | Behavior |
|---|---|
BackpressureBlock | Blocks the sender until the consumer catches up (default) |
BackpressureDrop | Drops the oldest unacknowledged chunk |
BackpressureBuffer | Spills chunks to an overflow buffer |
BackpressureAdaptive | Blocks and signals chunk size should be adjusted |
s, _ := t.Stream(ctx, "bucket", "key", stream.DirectionUpload,
stream.WithBackpressure(stream.BackpressureDrop),
stream.WithChannelSize(32),
)Hooks
Register callbacks for stream lifecycle events:
s, _ := t.Stream(ctx, "bucket", "key", stream.DirectionUpload,
stream.WithOnProgress(func(p stream.Progress) {
fmt.Printf("%d%% complete\n", p.Percent)
}),
stream.WithOnChunk(func(c *stream.Chunk, ack stream.ChunkAck) {
log.Printf("chunk %d processed", c.Index)
}),
stream.WithOnComplete(func(info *driver.ObjectInfo) {
log.Printf("upload complete: %s (%d bytes)", info.Key, info.Size)
}),
stream.WithOnError(func(err error) {
log.Printf("stream error: %v", err)
}),
)Metrics
Both pools and individual streams expose real-time metrics:
Pool Metrics
snap := pool.Metrics.Snapshot()
fmt.Printf("Active: %d, Total bytes: %d, Failed: %d\n",
snap.ActiveStreams, snap.TotalBytes, snap.FailedStreams)Stream Metrics
m := s.Metrics().StreamSnapshot()
fmt.Printf("Sent: %d, Recv: %d, Throughput: %.0f bytes/s\n",
m.BytesSent, m.BytesRecv, m.Throughput)Resumable Transfers
Enable offset tracking for resumable uploads and downloads:
s, _ := t.Stream(ctx, "bucket", "large-file.zip", stream.DirectionUpload,
stream.WithResumable(),
)
// Check current offset.
fmt.Println("Offset:", s.Offset())
// Seek to a specific byte position.
s.Control(stream.ControlMsg{Type: stream.CtrlSeek, Offset: lastKnownOffset})RangeDriver Capability
Drivers that implement RangeDriver support byte-range reads for efficient partial downloads:
type RangeDriver interface {
driver.Driver
GetRange(ctx context.Context, bucket, key string, offset, length int64) (*ObjectReader, error)
}This enables resumable downloads starting from a specific byte offset without re-reading the entire object.
Stream Options Reference
| Option | Description |
|---|---|
WithChunkSize(n) | Set chunk size in bytes |
WithResumable() | Enable resumable transfer with offset tracking |
WithPool(name) | Assign stream to a named pool |
WithChannelSize(n) | Set the chunk channel buffer size |
WithBackpressure(mode) | Set backpressure strategy |
WithOnProgress(fn) | Register progress callback |
WithOnChunk(fn) | Register per-chunk callback |
WithOnComplete(fn) | Register completion callback |
WithOnError(fn) | Register error callback |