package main import ( "bytes" "context" "fmt" gonanoid "github.com/matoous/go-nanoid/v2" "golang.org/x/sync/errgroup" "io" "mime" "os" "path/filepath" "sync" ) type chunker struct { gc *gokapiClient parallelChunks int chunkSize int maxDownloads int maxDays int } func newChunker(gc *gokapiClient, config Config) *chunker { return &chunker{ gc: gc, parallelChunks: config.ParallelChunks, chunkSize: config.ChunkSize, maxDownloads: config.MaxDownloads, maxDays: config.MaxDays, } } func (c *chunker) UploadFile(ctx context.Context, filename string, progress func(ChunkReport)) (UploadResponse, error) { f, err := os.Open(filename) if err != nil { return UploadResponse{}, err } defer f.Close() fstat, err := f.Stat() if err != nil { return UploadResponse{}, err } fname := fstat.Name() fi := uploadInfo{ chunkID: gonanoid.Must(12), filename: fname, totalSize: fstat.Size(), contentType: mime.TypeByExtension(filepath.Ext(fname)), allowedDownloads: c.maxDownloads, expiryDays: c.maxDays, password: "", } return c.upload(ctx, fi, f, progress) } func (c *chunker) upload(ctx context.Context, fi uploadInfo, r io.ReaderAt, progress func(ChunkReport)) (UploadResponse, error) { bufPool := sync.Pool{ New: func() interface{} { return make([]byte, c.chunkSize) }, } chunks := int(fi.totalSize/int64(c.chunkSize) + 1) uploaders := c.parallelChunks if chunks < uploaders { uploaders = 1 } chunkUploaded := make(chan uploadedChunk) doneChunkReport := make(chan struct{}) go func() { defer close(doneChunkReport) uploadedChunks := 0 uploadedBytes := 0 progress(ChunkReport{ UploadedChunks: 0, UploadedBytes: 0, TotalChunks: chunks, TotalSize: fi.totalSize, }) for r := range chunkUploaded { uploadedChunks += 1 uploadedBytes += r.ChunkSize progress(ChunkReport{ UploadedChunks: uploadedChunks, UploadedBytes: int64(uploadedBytes), TotalChunks: chunks, TotalSize: fi.totalSize, }) } }() errGroup, egctx := errgroup.WithContext(ctx) errGroup.SetLimit(uploaders) for i := 0; i < chunks; i++ { errGroup.Go(func() error { offset := int64(i * c.chunkSize) buf := bufPool.Get().([]byte) defer bufPool.Put(buf) thisBuf := buf if offset+int64(c.chunkSize) > fi.totalSize { thisBuf = buf[:fi.totalSize-offset] } n, err := r.ReadAt(thisBuf, offset) if err != nil { return err } else if n != len(thisBuf) { return fmt.Errorf("chunk %d: expected %d bytes but only read %d", i, len(thisBuf), n) } if err := c.gc.uploadChunk(egctx, fi, offset, bytes.NewReader(thisBuf)); err != nil { return err } chunkUploaded <- uploadedChunk{ChunkSize: len(thisBuf)} return nil }) } if err := errGroup.Wait(); err != nil { return UploadResponse{}, err } close(chunkUploaded) <-doneChunkReport return c.gc.finalizeChunk(ctx, fi) } type uploadedChunk struct { ChunkSize int }