146 lines
3 KiB
Go
146 lines
3 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
gonanoid "github.com/matoous/go-nanoid/v2"
|
|
"golang.org/x/sync/errgroup"
|
|
"io"
|
|
"mime"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
)
|
|
|
|
type chunker struct {
|
|
gc *gokapiClient
|
|
parallelChunks int
|
|
chunkSize int
|
|
maxDownloads int
|
|
maxDays int
|
|
}
|
|
|
|
func newChunker(gc *gokapiClient, config Config) *chunker {
|
|
return &chunker{
|
|
gc: gc,
|
|
parallelChunks: config.ParallelChunks,
|
|
chunkSize: config.ChunkSize,
|
|
maxDownloads: config.MaxDownloads,
|
|
maxDays: config.MaxDays,
|
|
}
|
|
}
|
|
|
|
func (c *chunker) UploadFile(ctx context.Context, filename string, progress func(ChunkReport)) (UploadResponse, error) {
|
|
f, err := os.Open(filename)
|
|
if err != nil {
|
|
return UploadResponse{}, err
|
|
}
|
|
defer f.Close()
|
|
|
|
fstat, err := f.Stat()
|
|
if err != nil {
|
|
return UploadResponse{}, err
|
|
}
|
|
fname := fstat.Name()
|
|
|
|
fi := uploadInfo{
|
|
chunkID: gonanoid.Must(12),
|
|
filename: fname,
|
|
totalSize: fstat.Size(),
|
|
contentType: mime.TypeByExtension(filepath.Ext(fname)),
|
|
allowedDownloads: c.maxDownloads,
|
|
expiryDays: c.maxDays,
|
|
password: "",
|
|
}
|
|
|
|
return c.upload(ctx, fi, f, progress)
|
|
}
|
|
|
|
func (c *chunker) upload(ctx context.Context, fi uploadInfo, r io.ReaderAt, progress func(ChunkReport)) (UploadResponse, error) {
|
|
bufPool := sync.Pool{
|
|
New: func() interface{} {
|
|
return make([]byte, c.chunkSize)
|
|
},
|
|
}
|
|
|
|
chunks := int(fi.totalSize/int64(c.chunkSize) + 1)
|
|
uploaders := c.parallelChunks
|
|
if chunks < uploaders {
|
|
uploaders = 1
|
|
}
|
|
|
|
chunkUploaded := make(chan uploadedChunk)
|
|
doneChunkReport := make(chan struct{})
|
|
go func() {
|
|
defer close(doneChunkReport)
|
|
|
|
uploadedChunks := 0
|
|
uploadedBytes := 0
|
|
|
|
progress(ChunkReport{
|
|
UploadedChunks: 0,
|
|
UploadedBytes: 0,
|
|
TotalChunks: chunks,
|
|
TotalSize: fi.totalSize,
|
|
})
|
|
|
|
for r := range chunkUploaded {
|
|
uploadedChunks += 1
|
|
uploadedBytes += r.ChunkSize
|
|
|
|
progress(ChunkReport{
|
|
UploadedChunks: uploadedChunks,
|
|
UploadedBytes: int64(uploadedBytes),
|
|
TotalChunks: chunks,
|
|
TotalSize: fi.totalSize,
|
|
})
|
|
}
|
|
}()
|
|
|
|
errGroup, egctx := errgroup.WithContext(ctx)
|
|
errGroup.SetLimit(uploaders)
|
|
|
|
for i := 0; i < chunks; i++ {
|
|
errGroup.Go(func() error {
|
|
offset := int64(i * c.chunkSize)
|
|
|
|
buf := bufPool.Get().([]byte)
|
|
defer bufPool.Put(buf)
|
|
|
|
thisBuf := buf
|
|
if offset+int64(c.chunkSize) > fi.totalSize {
|
|
thisBuf = buf[:fi.totalSize-offset]
|
|
}
|
|
|
|
n, err := r.ReadAt(thisBuf, offset)
|
|
if err != nil {
|
|
return err
|
|
} else if n != len(thisBuf) {
|
|
return fmt.Errorf("chunk %d: expected %d bytes but only read %d", i, len(thisBuf), n)
|
|
}
|
|
|
|
if err := c.gc.uploadChunk(egctx, fi, offset, bytes.NewReader(thisBuf)); err != nil {
|
|
return err
|
|
}
|
|
|
|
chunkUploaded <- uploadedChunk{ChunkSize: len(thisBuf)}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
if err := errGroup.Wait(); err != nil {
|
|
return UploadResponse{}, err
|
|
}
|
|
|
|
close(chunkUploaded)
|
|
<-doneChunkReport
|
|
|
|
return c.gc.finalizeChunk(ctx, fi)
|
|
}
|
|
|
|
type uploadedChunk struct {
|
|
ChunkSize int
|
|
}
|