Streaming Uploads
Use the streaming engine for chunked uploads with progress tracking, pause/resume, and backpressure.
Trove's streaming engine provides managed, chunked data transfers with concurrency control, backpressure, buffer pooling, and resumable transfers. This guide covers creating pools, acquiring streams, tracking progress, and handling pause/resume.
Pool Creation
A Pool manages concurrent streams with shared resources and concurrency limits.
import (
"context"
"fmt"
"log"
"time"
"github.com/xraph/trove"
"github.com/xraph/trove/stream"
"github.com/xraph/trove/drivers/memdriver"
)
ctx := context.Background()
// Create a standalone pool with custom configuration.
pool := stream.NewPool("uploads", stream.PoolConfig{
MaxStreams: 8, // Max concurrent streams.
MaxBandwidth: 50 * 1024 * 1024, // 50 MB/s total throughput limit.
ChunkSize: 16 * 1024 * 1024, // 16MB chunks.
BufferCount: 32, // Pre-allocate 32 buffers.
StreamTimeout: 30 * time.Minute, // Max stream lifetime.
IdleTimeout: 5 * time.Minute, // Close idle streams.
BackpressureMode: stream.BackpressureBlock, // Block when consumer is slow.
})
defer pool.Close()Default Pool Config
Use DefaultPoolConfig() for sensible defaults:
cfg := stream.DefaultPoolConfig()
// cfg.MaxStreams = 16
// cfg.ChunkSize = 8 * 1024 * 1024 (8MB)
// cfg.BufferCount = 32
// cfg.StreamTimeout = 30 * time.Minute
// cfg.IdleTimeout = 5 * time.Minute
// cfg.BackpressureMode = BackpressureBlock
pool := stream.NewPool("main", cfg)Using the Integrated Pool
Every Trove instance has a built-in pool. Configure it at initialization:
drv := memdriver.New()
// Set pool size and chunk size via options.
t, _ := trove.Open(drv, trove.WithPoolSize(32))
defer t.Close(ctx)
// Or provide a full pool config.
t, _ = trove.Open(drv, trove.WithPoolConfig(stream.PoolConfig{
MaxStreams: 64,
ChunkSize: 32 * 1024 * 1024,
}))Acquiring a Stream
From a Standalone Pool
s, err := pool.Acquire(ctx, stream.DirectionUpload, "media", "video.mp4",
stream.WithChunkSize(4 * 1024 * 1024),
stream.WithResumable(),
)
if err != nil {
log.Fatal(err)
}
defer pool.Release(s)From Trove's Integrated Pool
t.CreateBucket(ctx, "media")
s, err := t.Stream(ctx, "media", "video.mp4", stream.DirectionUpload,
stream.WithChunkSize(4 * 1024 * 1024),
stream.WithResumable(),
stream.WithOnProgress(func(p stream.Progress) {
fmt.Printf("\r%d%% (%.0f bytes/s)", p.Percent, p.Speed)
}),
)
if err != nil {
log.Fatal(err)
}
defer t.Pool().Release(s)Stream Options
| Option | Description |
|---|---|
WithChunkSize(n) | Chunk size in bytes |
WithResumable() | Enable resumable transfer with offset tracking |
WithChannelSize(n) | Chunk channel buffer size |
WithBackpressure(mode) | Backpressure strategy |
WithOnProgress(fn) | Progress callback |
WithOnChunk(fn) | Per-chunk callback |
WithOnComplete(fn) | Completion callback |
WithOnError(fn) | Error callback |
Stream Lifecycle
Every stream follows a state machine:
idle --> active --> paused <--> active --> completing --> completed
\--> failed
\--> cancelled| State | Description |
|---|---|
idle | Created but not started |
active | Actively transferring data |
paused | Temporarily suspended (resumable) |
completing | Final chunk sent, awaiting confirmation |
completed | Transfer finished successfully |
failed | Unrecoverable error encountered |
cancelled | Explicitly cancelled |
State Transitions
// Start the stream (idle -> active).
s.Start()
// Write chunks.
buf := s.ChunkPool().Get()
n := copy(buf, data)
s.Write(&stream.Chunk{Index: 0, Data: buf, Size: n})
// Complete the stream (active -> completing -> completed).
s.Complete(&driver.ObjectInfo{Key: "video.mp4", Size: totalSize})Progress Tracking
Register a progress callback to monitor transfer status:
s, _ := t.Stream(ctx, "media", "large-file.zip", stream.DirectionUpload,
stream.WithOnProgress(func(p stream.Progress) {
fmt.Printf(
"\rProgress: %d%% | Sent: %d/%d bytes | Speed: %.1f MB/s",
p.Percent,
p.BytesSent,
p.TotalSize,
p.Speed / (1024 * 1024),
)
}),
stream.WithOnComplete(func(info *driver.ObjectInfo) {
fmt.Printf("\nUpload complete: %s (%d bytes)\n", info.Key, info.Size)
}),
stream.WithOnError(func(err error) {
fmt.Printf("\nUpload failed: %v\n", err)
}),
)The Progress struct contains:
| Field | Type | Description |
|---|---|---|
BytesSent | int64 | Bytes uploaded so far |
BytesRecv | int64 | Bytes downloaded so far |
TotalSize | int64 | Total expected size |
Percent | int | Completion percentage (0--100) |
Speed | float64 | Current throughput in bytes/sec |
Pause and Resume
Control stream execution with ControlMsg:
// Pause the stream (active -> paused).
s.Control(stream.ControlMsg{Type: stream.CtrlPause})
fmt.Println("Stream paused at offset:", s.Offset())
// Resume the stream (paused -> active).
s.Control(stream.ControlMsg{Type: stream.CtrlResume})
// Cancel the stream entirely (any state -> cancelled).
s.Control(stream.ControlMsg{Type: stream.CtrlCancel})Resumable Transfers
Enable offset tracking with WithResumable() so transfers can resume from where they left off:
s, _ := t.Stream(ctx, "media", "large-file.zip", stream.DirectionUpload,
stream.WithResumable(),
)
// Check current offset.
fmt.Println("Current offset:", s.Offset())
// Seek to a specific byte position (e.g., after a reconnection).
s.Control(stream.ControlMsg{Type: stream.CtrlSeek, Offset: lastKnownOffset})ChunkPool for Memory Efficiency
The ChunkPool manages reusable byte buffers to minimize allocation pressure during high-throughput transfers.
// Get a buffer from the stream's chunk pool.
buf := s.ChunkPool().Get()
// Fill the buffer with data.
n, err := file.Read(buf)
if err != nil && err != io.EOF {
log.Fatal(err)
}
// Write the chunk to the stream.
s.Write(&stream.Chunk{
Index: chunkIndex,
Data: buf,
Size: n,
})
// The consumer returns the buffer via ChunkPool().Put(chunk.Data).Standalone ChunkPool
chunkPool := stream.NewChunkPool(8 * 1024 * 1024) // 8MB buffers
buf := chunkPool.Get()
defer chunkPool.Put(buf)
stats := chunkPool.Stats()
fmt.Printf("Allocated: %d, Released: %d\n", stats.Allocated, stats.Released)Backpressure Modes
When a stream's chunk channel fills up (consumer cannot keep up), the backpressure strategy determines behavior:
| Mode | Behavior |
|---|---|
BackpressureBlock | Blocks the sender until the consumer catches up (default) |
BackpressureDrop | Drops the oldest unacknowledged chunk |
BackpressureBuffer | Spills chunks to an overflow buffer |
BackpressureAdaptive | Blocks and signals that chunk size should be adjusted |
// Use drop mode for real-time streaming where data loss is acceptable.
s, _ := t.Stream(ctx, "live", "camera-feed.ts", stream.DirectionUpload,
stream.WithBackpressure(stream.BackpressureDrop),
stream.WithChannelSize(32),
)
// Use buffer mode for guaranteed delivery with overflow tolerance.
s, _ = t.Stream(ctx, "uploads", "backup.tar.gz", stream.DirectionUpload,
stream.WithBackpressure(stream.BackpressureBuffer),
)Per-Chunk Callbacks
Track individual chunk processing:
s, _ := t.Stream(ctx, "media", "video.mp4", stream.DirectionUpload,
stream.WithOnChunk(func(c *stream.Chunk, ack stream.ChunkAck) {
log.Printf("Chunk %d processed: %d bytes", c.Index, c.Size)
}),
)Pool Metrics
Monitor pool health and throughput:
// Active stream count.
fmt.Println("Active streams:", pool.ActiveCount())
// Snapshot of pool-level metrics.
snap := pool.Metrics.Snapshot()
fmt.Printf("Total bytes: %d\n", snap.TotalBytes)
fmt.Printf("Total chunks: %d\n", snap.TotalChunks)
fmt.Printf("Active streams: %d\n", snap.ActiveStreams)
fmt.Printf("Failed streams: %d\n", snap.FailedStreams)
// ChunkPool statistics.
cpStats := pool.ChunkPool().Stats()
fmt.Printf("Buffers allocated: %d\n", cpStats.Allocated)
fmt.Printf("Buffers released: %d\n", cpStats.Released)Stream-Level Metrics
m := s.Metrics().StreamSnapshot()
fmt.Printf("Sent: %d bytes, Recv: %d bytes, Throughput: %.0f bytes/s\n",
m.BytesSent, m.BytesRecv, m.Throughput)Complete Upload Example
package main
import (
"context"
"fmt"
"io"
"log"
"os"
"github.com/xraph/trove"
"github.com/xraph/trove/driver"
"github.com/xraph/trove/stream"
"github.com/xraph/trove/drivers/localdriver"
)
func main() {
ctx := context.Background()
drv := localdriver.New()
drv.Open(ctx, "file:///tmp/trove-stream")
t, _ := trove.Open(drv, trove.WithPoolSize(8))
defer t.Close(ctx)
t.CreateBucket(ctx, "uploads")
// Open a local file to upload.
file, err := os.Open("/path/to/large-file.zip")
if err != nil {
log.Fatal(err)
}
defer file.Close()
stat, _ := file.Stat()
// Acquire a managed upload stream.
s, err := t.Stream(ctx, "uploads", "large-file.zip", stream.DirectionUpload,
stream.WithChunkSize(4 * 1024 * 1024),
stream.WithResumable(),
stream.WithOnProgress(func(p stream.Progress) {
fmt.Printf("\r%d%% complete (%.1f MB/s)", p.Percent, p.Speed/(1024*1024))
}),
stream.WithOnComplete(func(info *driver.ObjectInfo) {
fmt.Printf("\nDone: %s\n", info.Key)
}),
)
if err != nil {
log.Fatal(err)
}
defer t.Pool().Release(s)
// Start the stream.
s.Start()
// Write chunks from the file.
idx := 0
for {
buf := s.ChunkPool().Get()
n, err := file.Read(buf)
if n > 0 {
s.Write(&stream.Chunk{Index: idx, Data: buf[:n], Size: n})
idx++
}
if err == io.EOF {
break
}
if err != nil {
s.Fail(err)
return
}
}
// Complete the upload.
s.Complete(&driver.ObjectInfo{Key: "large-file.zip", Size: stat.Size()})
}Cleanup
Always close the pool when done. This waits for active streams to finish and releases resources.
if err := pool.Close(); err != nil {
log.Printf("pool close error: %v", err)
}When using Trove's integrated pool, t.Close(ctx) handles pool cleanup automatically.