Trove

Streaming Uploads

Use the streaming engine for chunked uploads with progress tracking, pause/resume, and backpressure.

Trove's streaming engine provides managed, chunked data transfers with concurrency control, backpressure, buffer pooling, and resumable transfers. This guide covers creating pools, acquiring streams, tracking progress, and handling pause/resume.

Pool Creation

A Pool manages concurrent streams with shared resources and concurrency limits.

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/xraph/trove"
    "github.com/xraph/trove/stream"
    "github.com/xraph/trove/drivers/memdriver"
)

ctx := context.Background()

// Create a standalone pool with custom configuration.
pool := stream.NewPool("uploads", stream.PoolConfig{
    MaxStreams:        8,                        // Max concurrent streams.
    MaxBandwidth:     50 * 1024 * 1024,          // 50 MB/s total throughput limit.
    ChunkSize:        16 * 1024 * 1024,          // 16MB chunks.
    BufferCount:      32,                        // Pre-allocate 32 buffers.
    StreamTimeout:    30 * time.Minute,          // Max stream lifetime.
    IdleTimeout:      5 * time.Minute,           // Close idle streams.
    BackpressureMode: stream.BackpressureBlock,  // Block when consumer is slow.
})
defer pool.Close()

Default Pool Config

Use DefaultPoolConfig() for sensible defaults:

cfg := stream.DefaultPoolConfig()
// cfg.MaxStreams       = 16
// cfg.ChunkSize       = 8 * 1024 * 1024  (8MB)
// cfg.BufferCount     = 32
// cfg.StreamTimeout   = 30 * time.Minute
// cfg.IdleTimeout     = 5 * time.Minute
// cfg.BackpressureMode = BackpressureBlock

pool := stream.NewPool("main", cfg)

Using the Integrated Pool

Every Trove instance has a built-in pool. Configure it at initialization:

drv := memdriver.New()

// Set pool size and chunk size via options.
t, _ := trove.Open(drv, trove.WithPoolSize(32))
defer t.Close(ctx)

// Or provide a full pool config.
t, _ = trove.Open(drv, trove.WithPoolConfig(stream.PoolConfig{
    MaxStreams: 64,
    ChunkSize:  32 * 1024 * 1024,
}))

Acquiring a Stream

From a Standalone Pool

s, err := pool.Acquire(ctx, stream.DirectionUpload, "media", "video.mp4",
    stream.WithChunkSize(4 * 1024 * 1024),
    stream.WithResumable(),
)
if err != nil {
    log.Fatal(err)
}
defer pool.Release(s)

From Trove's Integrated Pool

t.CreateBucket(ctx, "media")

s, err := t.Stream(ctx, "media", "video.mp4", stream.DirectionUpload,
    stream.WithChunkSize(4 * 1024 * 1024),
    stream.WithResumable(),
    stream.WithOnProgress(func(p stream.Progress) {
        fmt.Printf("\r%d%% (%.0f bytes/s)", p.Percent, p.Speed)
    }),
)
if err != nil {
    log.Fatal(err)
}
defer t.Pool().Release(s)

Stream Options

OptionDescription
WithChunkSize(n)Chunk size in bytes
WithResumable()Enable resumable transfer with offset tracking
WithChannelSize(n)Chunk channel buffer size
WithBackpressure(mode)Backpressure strategy
WithOnProgress(fn)Progress callback
WithOnChunk(fn)Per-chunk callback
WithOnComplete(fn)Completion callback
WithOnError(fn)Error callback

Stream Lifecycle

Every stream follows a state machine:

idle --> active --> paused <--> active --> completing --> completed
                                     \--> failed
                                     \--> cancelled
StateDescription
idleCreated but not started
activeActively transferring data
pausedTemporarily suspended (resumable)
completingFinal chunk sent, awaiting confirmation
completedTransfer finished successfully
failedUnrecoverable error encountered
cancelledExplicitly cancelled

State Transitions

// Start the stream (idle -> active).
s.Start()

// Write chunks.
buf := s.ChunkPool().Get()
n := copy(buf, data)
s.Write(&stream.Chunk{Index: 0, Data: buf, Size: n})

// Complete the stream (active -> completing -> completed).
s.Complete(&driver.ObjectInfo{Key: "video.mp4", Size: totalSize})

Progress Tracking

Register a progress callback to monitor transfer status:

s, _ := t.Stream(ctx, "media", "large-file.zip", stream.DirectionUpload,
    stream.WithOnProgress(func(p stream.Progress) {
        fmt.Printf(
            "\rProgress: %d%% | Sent: %d/%d bytes | Speed: %.1f MB/s",
            p.Percent,
            p.BytesSent,
            p.TotalSize,
            p.Speed / (1024 * 1024),
        )
    }),
    stream.WithOnComplete(func(info *driver.ObjectInfo) {
        fmt.Printf("\nUpload complete: %s (%d bytes)\n", info.Key, info.Size)
    }),
    stream.WithOnError(func(err error) {
        fmt.Printf("\nUpload failed: %v\n", err)
    }),
)

The Progress struct contains:

FieldTypeDescription
BytesSentint64Bytes uploaded so far
BytesRecvint64Bytes downloaded so far
TotalSizeint64Total expected size
PercentintCompletion percentage (0--100)
Speedfloat64Current throughput in bytes/sec

Pause and Resume

Control stream execution with ControlMsg:

// Pause the stream (active -> paused).
s.Control(stream.ControlMsg{Type: stream.CtrlPause})
fmt.Println("Stream paused at offset:", s.Offset())

// Resume the stream (paused -> active).
s.Control(stream.ControlMsg{Type: stream.CtrlResume})

// Cancel the stream entirely (any state -> cancelled).
s.Control(stream.ControlMsg{Type: stream.CtrlCancel})

Resumable Transfers

Enable offset tracking with WithResumable() so transfers can resume from where they left off:

s, _ := t.Stream(ctx, "media", "large-file.zip", stream.DirectionUpload,
    stream.WithResumable(),
)

// Check current offset.
fmt.Println("Current offset:", s.Offset())

// Seek to a specific byte position (e.g., after a reconnection).
s.Control(stream.ControlMsg{Type: stream.CtrlSeek, Offset: lastKnownOffset})

ChunkPool for Memory Efficiency

The ChunkPool manages reusable byte buffers to minimize allocation pressure during high-throughput transfers.

// Get a buffer from the stream's chunk pool.
buf := s.ChunkPool().Get()

// Fill the buffer with data.
n, err := file.Read(buf)
if err != nil && err != io.EOF {
    log.Fatal(err)
}

// Write the chunk to the stream.
s.Write(&stream.Chunk{
    Index: chunkIndex,
    Data:  buf,
    Size:  n,
})

// The consumer returns the buffer via ChunkPool().Put(chunk.Data).

Standalone ChunkPool

chunkPool := stream.NewChunkPool(8 * 1024 * 1024) // 8MB buffers

buf := chunkPool.Get()
defer chunkPool.Put(buf)

stats := chunkPool.Stats()
fmt.Printf("Allocated: %d, Released: %d\n", stats.Allocated, stats.Released)

Backpressure Modes

When a stream's chunk channel fills up (consumer cannot keep up), the backpressure strategy determines behavior:

ModeBehavior
BackpressureBlockBlocks the sender until the consumer catches up (default)
BackpressureDropDrops the oldest unacknowledged chunk
BackpressureBufferSpills chunks to an overflow buffer
BackpressureAdaptiveBlocks and signals that chunk size should be adjusted
// Use drop mode for real-time streaming where data loss is acceptable.
s, _ := t.Stream(ctx, "live", "camera-feed.ts", stream.DirectionUpload,
    stream.WithBackpressure(stream.BackpressureDrop),
    stream.WithChannelSize(32),
)

// Use buffer mode for guaranteed delivery with overflow tolerance.
s, _ = t.Stream(ctx, "uploads", "backup.tar.gz", stream.DirectionUpload,
    stream.WithBackpressure(stream.BackpressureBuffer),
)

Per-Chunk Callbacks

Track individual chunk processing:

s, _ := t.Stream(ctx, "media", "video.mp4", stream.DirectionUpload,
    stream.WithOnChunk(func(c *stream.Chunk, ack stream.ChunkAck) {
        log.Printf("Chunk %d processed: %d bytes", c.Index, c.Size)
    }),
)

Pool Metrics

Monitor pool health and throughput:

// Active stream count.
fmt.Println("Active streams:", pool.ActiveCount())

// Snapshot of pool-level metrics.
snap := pool.Metrics.Snapshot()
fmt.Printf("Total bytes:    %d\n", snap.TotalBytes)
fmt.Printf("Total chunks:   %d\n", snap.TotalChunks)
fmt.Printf("Active streams: %d\n", snap.ActiveStreams)
fmt.Printf("Failed streams: %d\n", snap.FailedStreams)

// ChunkPool statistics.
cpStats := pool.ChunkPool().Stats()
fmt.Printf("Buffers allocated: %d\n", cpStats.Allocated)
fmt.Printf("Buffers released:  %d\n", cpStats.Released)

Stream-Level Metrics

m := s.Metrics().StreamSnapshot()
fmt.Printf("Sent: %d bytes, Recv: %d bytes, Throughput: %.0f bytes/s\n",
    m.BytesSent, m.BytesRecv, m.Throughput)

Complete Upload Example

package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "os"

    "github.com/xraph/trove"
    "github.com/xraph/trove/driver"
    "github.com/xraph/trove/stream"
    "github.com/xraph/trove/drivers/localdriver"
)

func main() {
    ctx := context.Background()

    drv := localdriver.New()
    drv.Open(ctx, "file:///tmp/trove-stream")

    t, _ := trove.Open(drv, trove.WithPoolSize(8))
    defer t.Close(ctx)

    t.CreateBucket(ctx, "uploads")

    // Open a local file to upload.
    file, err := os.Open("/path/to/large-file.zip")
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    stat, _ := file.Stat()

    // Acquire a managed upload stream.
    s, err := t.Stream(ctx, "uploads", "large-file.zip", stream.DirectionUpload,
        stream.WithChunkSize(4 * 1024 * 1024),
        stream.WithResumable(),
        stream.WithOnProgress(func(p stream.Progress) {
            fmt.Printf("\r%d%% complete (%.1f MB/s)", p.Percent, p.Speed/(1024*1024))
        }),
        stream.WithOnComplete(func(info *driver.ObjectInfo) {
            fmt.Printf("\nDone: %s\n", info.Key)
        }),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer t.Pool().Release(s)

    // Start the stream.
    s.Start()

    // Write chunks from the file.
    idx := 0
    for {
        buf := s.ChunkPool().Get()
        n, err := file.Read(buf)
        if n > 0 {
            s.Write(&stream.Chunk{Index: idx, Data: buf[:n], Size: n})
            idx++
        }
        if err == io.EOF {
            break
        }
        if err != nil {
            s.Fail(err)
            return
        }
    }

    // Complete the upload.
    s.Complete(&driver.ObjectInfo{Key: "large-file.zip", Size: stat.Size()})
}

Cleanup

Always close the pool when done. This waits for active streams to finish and releases resources.

if err := pool.Close(); err != nil {
    log.Printf("pool close error: %v", err)
}

When using Trove's integrated pool, t.Close(ctx) handles pool cleanup automatically.

On this page