diff --git a/cmd/sqs-drain/main.go b/cmd/sqs-drain/main.go index 47c1811..cc65159 100644 --- a/cmd/sqs-drain/main.go +++ b/cmd/sqs-drain/main.go @@ -1,18 +1,97 @@ package main import ( + "context" "flag" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" + "github.com/pkg/errors" + "log" + "os" + "path/filepath" + "time" "github.com/lmika/gopkgs/cli" ) func main() { - flagQueue := flag.String("q", "", "queue to drain") + flagQueue := flag.String("q", "", "URL of queue to drain") + flagDir := flag.String("dir", "", "directory to save messages") flag.Parse() if *flagQueue == "" { cli.Fatalf("-q flag needs to be specified") } - + ctx := context.Background() + cfg, err := config.LoadDefaultConfig(ctx) + if err != nil { + cli.Fatalf("cannot load AWS config: %v", err) + } + + outDir := *flagDir + if outDir == "" { + outDir = "out-" + time.Now().Format("20060102150405") + } + + if err := os.MkdirAll(outDir, 0755); err != nil { + cli.Fatalf("unable to create out dir: %v", err) + } + + client := sqs.NewFromConfig(cfg) + msgCount := 0 + for { + out, err := client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ + QueueUrl: aws.String(*flagQueue), + MaxNumberOfMessages: 10, + WaitTimeSeconds: 1, + }) + if err != nil { + log.Fatalf("error receiving messages: %v", err) + break + } else if len(out.Messages) == 0 { + break + } + + messagesToDelete := make([]types.DeleteMessageBatchRequestEntry, 0, 10) + for _, msg := range out.Messages { + if err := handleMessage(ctx, outDir, msg); err == nil { + messagesToDelete = append(messagesToDelete, types.DeleteMessageBatchRequestEntry{ + Id: msg.MessageId, + ReceiptHandle: msg.ReceiptHandle, + }) + msgCount += 1 + } else { + log.Println(err) + } + } + if len(messagesToDelete) == 0 { + log.Printf("no messages handled, terminating") + break + } + + if _, err := client.DeleteMessageBatch(ctx, &sqs.DeleteMessageBatchInput{ + QueueUrl: aws.String(*flagQueue), + Entries: messagesToDelete, + }); err != nil { + log.Printf("error deleting messages from queue: %v", err) + break + } + } + + log.Printf("Handled %v messages", msgCount) +} + +func handleMessage(ctx context.Context, outDir string, msg types.Message) error { + outFile := filepath.Join(outDir, aws.ToString(msg.MessageId) + ".json") + msgBody := aws.ToString(msg.Body) + + log.Printf("%v -> %v", aws.ToString(msg.MessageId), outFile) + if err := os.WriteFile(outFile, []byte(msgBody), 0644); err != nil { + return errors.Wrapf(err, "unable to write message %v to file %v", msg.MessageId, outFile) + } + + return nil } diff --git a/go.mod b/go.mod index 5eb606b..cc2c34a 100644 --- a/go.mod +++ b/go.mod @@ -4,5 +4,18 @@ go 1.17 require ( github.com/aws/aws-sdk-go-v2 v1.13.0 // indirect + github.com/aws/aws-sdk-go-v2/config v1.13.1 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.8.0 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.10.0 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.4 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.2.0 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.3.5 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.7.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sqs v1.16.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.9.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.14.0 // indirect + github.com/aws/smithy-go v1.10.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/lmika/gopkgs v0.0.0-20211210041137-0dc91e939890 // indirect + github.com/pkg/errors v0.9.1 // indirect ) diff --git a/go.sum b/go.sum index ff1708f..70d753c 100644 --- a/go.sum +++ b/go.sum @@ -1,13 +1,38 @@ github.com/aws/aws-sdk-go-v2 v1.13.0 h1:1XIXAfxsEmbhbj5ry3D3vX+6ZcUYvIqSm4CWWEuGZCA= github.com/aws/aws-sdk-go-v2 v1.13.0/go.mod h1:L6+ZpqHaLbAaxsqV0L4cvxZY7QupWJB4fhkf8LXvC7w= +github.com/aws/aws-sdk-go-v2/config v1.13.1 h1:yLv8bfNoT4r+UvUKQKqRtdnvuWGMK5a82l4ru9Jvnuo= +github.com/aws/aws-sdk-go-v2/config v1.13.1/go.mod h1:Ba5Z4yL/UGbjQUzsiaN378YobhFo0MLfueXGiOsYtEs= +github.com/aws/aws-sdk-go-v2/credentials v1.8.0 h1:8Ow0WcyDesGNL0No11jcgb1JAtE+WtubqXjgxau+S0o= +github.com/aws/aws-sdk-go-v2/credentials v1.8.0/go.mod h1:gnMo58Vwx3Mu7hj1wpcG8DI0s57c9o42UQ6wgTQT5to= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.10.0 h1:NITDuUZO34mqtOwFWZiXo7yAHj7kf+XPE+EiKuCBNUI= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.10.0/go.mod h1:I6/fHT/fH460v09eg2gVrd8B/IqskhNdpcLH0WNO3QI= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.4 h1:CRiQJ4E2RhfDdqbie1ZYDo8QtIo75Mk7oTdJSfwJTMQ= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.4/go.mod h1:XHgQ7Hz2WY2GAn//UXHofLfPXWh+s62MbMOijrg12Lw= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.2.0 h1:3ADoioDMOtF4uiK59vCpplpCwugEU+v4ZFD29jDL3RQ= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.2.0/go.mod h1:BsCSJHx5DnDXIrOcqB8KN1/B+hXLG/bi4Y6Vjcx/x9E= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.5 h1:ixotxbfTCFpqbuwFv/RcZwyzhkxPSYDYEMcj4niB5Uk= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.5/go.mod h1:R3sWUqPcfXSiF/LSFJhjyJmpg9uV6yP2yv3YZZjldVI= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.7.0 h1:4QAOB3KrvI1ApJK14sliGr3Ie2pjyvNypn/lfzDHfUw= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.7.0/go.mod h1:K/qPe6AP2TGYv4l6n7c88zh9jWBDf6nHhvg1fx/EWfU= +github.com/aws/aws-sdk-go-v2/service/sqs v1.16.0 h1:dzWS4r8E9bA0TesHM40FSAtedwpTVCuTsLI8EziSqyk= +github.com/aws/aws-sdk-go-v2/service/sqs v1.16.0/go.mod h1:IBTQMG8mtyj37OWg7vIXcg714Ntcb/LlYou/rZpvV1k= +github.com/aws/aws-sdk-go-v2/service/sso v1.9.0 h1:1qLJeQGBmNQW3mBNzK2CFmrQNmoXWrscPqsrAaU1aTA= +github.com/aws/aws-sdk-go-v2/service/sso v1.9.0/go.mod h1:vCV4glupK3tR7pw7ks7Y4jYRL86VvxS+g5qk04YeWrU= +github.com/aws/aws-sdk-go-v2/service/sts v1.14.0 h1:ksiDXhvNYg0D2/UFkLejsaz3LqpW5yjNQ8Nx9Sn2c0E= +github.com/aws/aws-sdk-go-v2/service/sts v1.14.0/go.mod h1:u0xMJKDvvfocRjiozsoZglVNXRG19043xzp3r2ivLIk= +github.com/aws/smithy-go v1.10.0 h1:gsoZQMNHnX+PaghNw4ynPsyGP7aUCqx5sY2dlPQsZ0w= github.com/aws/smithy-go v1.10.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/lmika/gopkgs v0.0.0-20211210041137-0dc91e939890 h1:mwl/exYV/WkBMeShqK7q+B2w2r+b0vP1TSA7clBn9kI= github.com/lmika/gopkgs v0.0.0-20211210041137-0dc91e939890/go.mod h1:FH6OJSvYcJ9xY8CGs9yGgR89kMCK1UimuUQ6kE5YuJQ= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=