138 lines
2.8 KiB
Go
138 lines
2.8 KiB
Go
|
package main
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"context"
|
||
|
"fmt"
|
||
|
gonanoid "github.com/matoous/go-nanoid/v2"
|
||
|
"golang.org/x/sync/errgroup"
|
||
|
"io"
|
||
|
"mime"
|
||
|
"os"
|
||
|
"path/filepath"
|
||
|
"sync"
|
||
|
)
|
||
|
|
||
|
type chunker struct {
|
||
|
gc *gokapiClient
|
||
|
parallelChunks int
|
||
|
chunkSize int
|
||
|
}
|
||
|
|
||
|
func newChunker(gc *gokapiClient, parallelChunks, chunkSize int) *chunker {
|
||
|
return &chunker{
|
||
|
gc: gc,
|
||
|
parallelChunks: parallelChunks,
|
||
|
chunkSize: chunkSize,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *chunker) UploadFile(ctx context.Context, filename string, progress func(ChunkReport)) (UploadResponse, error) {
|
||
|
f, err := os.Open(filename)
|
||
|
if err != nil {
|
||
|
return UploadResponse{}, err
|
||
|
}
|
||
|
defer f.Close()
|
||
|
|
||
|
fstat, err := f.Stat()
|
||
|
if err != nil {
|
||
|
return UploadResponse{}, err
|
||
|
}
|
||
|
fname := fstat.Name()
|
||
|
|
||
|
fi := uploadInfo{
|
||
|
chunkID: gonanoid.Must(12),
|
||
|
filename: fname,
|
||
|
totalSize: fstat.Size(),
|
||
|
contentType: mime.TypeByExtension(filepath.Ext(fname)),
|
||
|
allowedDownloads: 5,
|
||
|
expiryDays: 7,
|
||
|
password: "",
|
||
|
}
|
||
|
|
||
|
return c.upload(ctx, fi, f, progress)
|
||
|
}
|
||
|
|
||
|
func (c *chunker) upload(ctx context.Context, fi uploadInfo, r io.ReaderAt, progress func(ChunkReport)) (UploadResponse, error) {
|
||
|
bufPool := sync.Pool{
|
||
|
New: func() interface{} {
|
||
|
return make([]byte, c.chunkSize)
|
||
|
},
|
||
|
}
|
||
|
|
||
|
chunks := int(fi.totalSize/int64(c.chunkSize) + 1)
|
||
|
|
||
|
chunkUploaded := make(chan uploadedChunk)
|
||
|
doneChunkReport := make(chan struct{})
|
||
|
go func() {
|
||
|
defer close(doneChunkReport)
|
||
|
|
||
|
uploadedChunks := 0
|
||
|
uploadedBytes := 0
|
||
|
|
||
|
progress(ChunkReport{
|
||
|
UploadedChunks: 0,
|
||
|
UploadedBytes: 0,
|
||
|
TotalChunks: chunks,
|
||
|
TotalSize: fi.totalSize,
|
||
|
})
|
||
|
|
||
|
for r := range chunkUploaded {
|
||
|
uploadedChunks += 1
|
||
|
uploadedBytes += r.ChunkSize
|
||
|
|
||
|
progress(ChunkReport{
|
||
|
UploadedChunks: uploadedChunks,
|
||
|
UploadedBytes: int64(uploadedBytes),
|
||
|
TotalChunks: chunks,
|
||
|
TotalSize: fi.totalSize,
|
||
|
})
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
errGroup, egctx := errgroup.WithContext(ctx)
|
||
|
errGroup.SetLimit(c.parallelChunks)
|
||
|
|
||
|
for i := 0; i < chunks; i++ {
|
||
|
errGroup.Go(func() error {
|
||
|
offset := int64(i * c.chunkSize)
|
||
|
|
||
|
buf := bufPool.Get().([]byte)
|
||
|
defer bufPool.Put(buf)
|
||
|
|
||
|
thisBuf := buf
|
||
|
if offset+int64(c.chunkSize) > fi.totalSize {
|
||
|
thisBuf = buf[:fi.totalSize-offset]
|
||
|
}
|
||
|
|
||
|
n, err := r.ReadAt(thisBuf, offset)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
} else if n != len(thisBuf) {
|
||
|
return fmt.Errorf("chunk %d: expected %d bytes but only read %d", i, len(thisBuf), n)
|
||
|
}
|
||
|
|
||
|
if err := c.gc.uploadChunk(egctx, fi, offset, bytes.NewReader(thisBuf)); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
chunkUploaded <- uploadedChunk{ChunkSize: len(thisBuf)}
|
||
|
|
||
|
return nil
|
||
|
})
|
||
|
}
|
||
|
|
||
|
if err := errGroup.Wait(); err != nil {
|
||
|
return UploadResponse{}, err
|
||
|
}
|
||
|
|
||
|
close(chunkUploaded)
|
||
|
<-doneChunkReport
|
||
|
|
||
|
return c.gc.finalizeChunk(ctx, fi)
|
||
|
}
|
||
|
|
||
|
type uploadedChunk struct {
|
||
|
ChunkSize int
|
||
|
}
|