send2gokapi/chunker.go

146 lines
3 KiB
Go

package main
import (
"bytes"
"context"
"fmt"
gonanoid "github.com/matoous/go-nanoid/v2"
"golang.org/x/sync/errgroup"
"io"
"mime"
"os"
"path/filepath"
"sync"
)
type chunker struct {
gc *gokapiClient
parallelChunks int
chunkSize int
maxDownloads int
maxDays int
}
func newChunker(gc *gokapiClient, config Config) *chunker {
return &chunker{
gc: gc,
parallelChunks: config.ParallelChunks,
chunkSize: config.ChunkSize,
maxDownloads: config.MaxDownloads,
maxDays: config.MaxDays,
}
}
func (c *chunker) UploadFile(ctx context.Context, filename string, progress func(ChunkReport)) (UploadResponse, error) {
f, err := os.Open(filename)
if err != nil {
return UploadResponse{}, err
}
defer f.Close()
fstat, err := f.Stat()
if err != nil {
return UploadResponse{}, err
}
fname := fstat.Name()
fi := uploadInfo{
chunkID: gonanoid.Must(12),
filename: fname,
totalSize: fstat.Size(),
contentType: mime.TypeByExtension(filepath.Ext(fname)),
allowedDownloads: c.maxDownloads,
expiryDays: c.maxDays,
password: "",
}
return c.upload(ctx, fi, f, progress)
}
func (c *chunker) upload(ctx context.Context, fi uploadInfo, r io.ReaderAt, progress func(ChunkReport)) (UploadResponse, error) {
bufPool := sync.Pool{
New: func() interface{} {
return make([]byte, c.chunkSize)
},
}
chunks := int(fi.totalSize/int64(c.chunkSize) + 1)
uploaders := c.parallelChunks
if chunks < uploaders {
uploaders = 1
}
chunkUploaded := make(chan uploadedChunk)
doneChunkReport := make(chan struct{})
go func() {
defer close(doneChunkReport)
uploadedChunks := 0
uploadedBytes := 0
progress(ChunkReport{
UploadedChunks: 0,
UploadedBytes: 0,
TotalChunks: chunks,
TotalSize: fi.totalSize,
})
for r := range chunkUploaded {
uploadedChunks += 1
uploadedBytes += r.ChunkSize
progress(ChunkReport{
UploadedChunks: uploadedChunks,
UploadedBytes: int64(uploadedBytes),
TotalChunks: chunks,
TotalSize: fi.totalSize,
})
}
}()
errGroup, egctx := errgroup.WithContext(ctx)
errGroup.SetLimit(uploaders)
for i := 0; i < chunks; i++ {
errGroup.Go(func() error {
offset := int64(i * c.chunkSize)
buf := bufPool.Get().([]byte)
defer bufPool.Put(buf)
thisBuf := buf
if offset+int64(c.chunkSize) > fi.totalSize {
thisBuf = buf[:fi.totalSize-offset]
}
n, err := r.ReadAt(thisBuf, offset)
if err != nil {
return err
} else if n != len(thisBuf) {
return fmt.Errorf("chunk %d: expected %d bytes but only read %d", i, len(thisBuf), n)
}
if err := c.gc.uploadChunk(egctx, fi, offset, bytes.NewReader(thisBuf)); err != nil {
return err
}
chunkUploaded <- uploadedChunk{ChunkSize: len(thisBuf)}
return nil
})
}
if err := errGroup.Wait(); err != nil {
return UploadResponse{}, err
}
close(chunkUploaded)
<-doneChunkReport
return c.gc.finalizeChunk(ctx, fi)
}
type uploadedChunk struct {
ChunkSize int
}