sqs-browse: new tool
Started working on a new tool to poll and browse an SQS queue. This is built using a TUI framework
This commit is contained in:
parent
e070505490
commit
5d1f4c78f4
11 changed files with 413 additions and 0 deletions
11
internal/sqs-browse/models/message.go
Normal file
11
internal/sqs-browse/models/message.go
Normal file
|
|
@ -0,0 +1,11 @@
|
|||
package models
|
||||
|
||||
import "time"
|
||||
|
||||
type Message struct {
|
||||
ID uint64
|
||||
ExtID string
|
||||
Queue string
|
||||
Received time.Time
|
||||
Data string
|
||||
}
|
||||
31
internal/sqs-browse/providers/memstore/memstore.go
Normal file
31
internal/sqs-browse/providers/memstore/memstore.go
Normal file
|
|
@ -0,0 +1,31 @@
|
|||
package memstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/lmika/awstools/internal/sqs-browse/models"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Store struct {
|
||||
messages []models.Message
|
||||
|
||||
mtx *sync.Mutex
|
||||
currSeqNo uint64
|
||||
}
|
||||
|
||||
func (s *Store) Save(ctx context.Context, msg *models.Message) error {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
s.currSeqNo++
|
||||
msg.ID = s.currSeqNo
|
||||
s.messages = append(s.messages, *msg)
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewStore() *Store {
|
||||
return &Store{
|
||||
messages: make([]models.Message, 0),
|
||||
mtx: new(sync.Mutex),
|
||||
}
|
||||
}
|
||||
63
internal/sqs-browse/providers/sqs/provider.go
Normal file
63
internal/sqs-browse/providers/sqs/provider.go
Normal file
|
|
@ -0,0 +1,63 @@
|
|||
package sqs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/service/sqs"
|
||||
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
|
||||
"github.com/lmika/awstools/internal/sqs-browse/models"
|
||||
"github.com/pkg/errors"
|
||||
"log"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Provider struct {
|
||||
client *sqs.Client
|
||||
}
|
||||
|
||||
func NewProvider(client *sqs.Client) *Provider {
|
||||
return &Provider{client: client}
|
||||
}
|
||||
|
||||
func (p *Provider) PollForNewMessages(ctx context.Context, queue string) ([]*models.Message, error) {
|
||||
out, err := p.client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
|
||||
QueueUrl: aws.String(queue),
|
||||
MaxNumberOfMessages: 10,
|
||||
WaitTimeSeconds: 20,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "unable to receive messages from queue %v", queue)
|
||||
}
|
||||
|
||||
if len(out.Messages) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
messagesToReturn := make([]*models.Message, 0, len(out.Messages))
|
||||
messagesToDelete := make([]types.DeleteMessageBatchRequestEntry, 0, len(out.Messages))
|
||||
for _, msg := range out.Messages {
|
||||
newLocalMessage := &models.Message{
|
||||
Queue: queue,
|
||||
ExtID: aws.ToString(msg.MessageId),
|
||||
Received: time.Now(),
|
||||
Data: aws.ToString(msg.Body),
|
||||
}
|
||||
messagesToReturn = append(messagesToReturn, newLocalMessage)
|
||||
|
||||
// Pull the message from the queue
|
||||
// TODO: should this be determined by the caller?
|
||||
messagesToDelete = append(messagesToDelete, types.DeleteMessageBatchRequestEntry{
|
||||
Id: msg.MessageId,
|
||||
ReceiptHandle: msg.ReceiptHandle,
|
||||
})
|
||||
}
|
||||
|
||||
if _, err := p.client.DeleteMessageBatch(ctx, &sqs.DeleteMessageBatchInput{
|
||||
QueueUrl: aws.String(queue),
|
||||
Entries: messagesToDelete,
|
||||
}); err != nil {
|
||||
log.Printf("error deleting messages from queue: %v", err)
|
||||
}
|
||||
|
||||
return messagesToReturn, nil
|
||||
}
|
||||
14
internal/sqs-browse/services/pollmessage/iface.go
Normal file
14
internal/sqs-browse/services/pollmessage/iface.go
Normal file
|
|
@ -0,0 +1,14 @@
|
|||
package pollmessage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/lmika/awstools/internal/sqs-browse/models"
|
||||
)
|
||||
|
||||
type MessageStore interface {
|
||||
Save(ctx context.Context, msg *models.Message) error
|
||||
}
|
||||
|
||||
type MessagePoller interface {
|
||||
PollForNewMessages(ctx context.Context, queue string) ([]*models.Message, error)
|
||||
}
|
||||
45
internal/sqs-browse/services/pollmessage/service.go
Normal file
45
internal/sqs-browse/services/pollmessage/service.go
Normal file
|
|
@ -0,0 +1,45 @@
|
|||
package pollmessage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/lmika/events"
|
||||
"github.com/pkg/errors"
|
||||
"log"
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
store MessageStore
|
||||
poller MessagePoller
|
||||
queue string
|
||||
bus *events.Bus
|
||||
}
|
||||
|
||||
func NewService(store MessageStore, poller MessagePoller, queue string, bus *events.Bus) *Service {
|
||||
return &Service{
|
||||
store: store,
|
||||
poller: poller,
|
||||
queue: queue,
|
||||
bus: bus,
|
||||
}
|
||||
}
|
||||
|
||||
// Poll starts polling for new messages and adding them to the message store
|
||||
func (s *Service) Poll(ctx context.Context) error {
|
||||
for ctx.Err() == nil {
|
||||
log.Printf("polling for new messages: %v", s.queue)
|
||||
newMsgs, err := s.poller.PollForNewMessages(ctx, s.queue)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to poll for messages")
|
||||
}
|
||||
|
||||
for _, msg := range newMsgs {
|
||||
if err := s.store.Save(ctx, msg); err != nil {
|
||||
log.Println("warn: unable to save new message %v", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
s.bus.Fire("new-messages", newMsgs)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
5
internal/sqs-browse/ui/events.go
Normal file
5
internal/sqs-browse/ui/events.go
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
package ui
|
||||
|
||||
import "github.com/lmika/awstools/internal/sqs-browse/models"
|
||||
|
||||
type NewMessagesEvent []*models.Message
|
||||
80
internal/sqs-browse/ui/model.go
Normal file
80
internal/sqs-browse/ui/model.go
Normal file
|
|
@ -0,0 +1,80 @@
|
|||
package ui
|
||||
|
||||
import (
|
||||
table "github.com/calyptia/go-bubble-table"
|
||||
"github.com/charmbracelet/bubbles/viewport"
|
||||
tea "github.com/charmbracelet/bubbletea"
|
||||
"github.com/charmbracelet/lipgloss"
|
||||
)
|
||||
|
||||
type uiModel struct {
|
||||
table table.Model
|
||||
viewport viewport.Model
|
||||
|
||||
tableRows []table.Row
|
||||
}
|
||||
|
||||
func NewModel() tea.Model {
|
||||
tbl := table.New([]string{"seq", "message"}, 100, 20)
|
||||
rows := make([]table.Row, 0)
|
||||
tbl.SetRows(rows)
|
||||
|
||||
vprt := viewport.New(100, 15)
|
||||
|
||||
model := uiModel{
|
||||
table: tbl,
|
||||
viewport: vprt,
|
||||
tableRows: rows,
|
||||
}
|
||||
|
||||
return model
|
||||
}
|
||||
|
||||
func (m uiModel) Init() tea.Cmd {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *uiModel) updateViewportToSelectedMessage() {
|
||||
if message, ok := m.table.SelectedRow().(messageTableRow); ok {
|
||||
m.viewport.SetContent(message.Data)
|
||||
} else {
|
||||
m.viewport.SetContent("(no message selected)")
|
||||
}
|
||||
}
|
||||
|
||||
func (m uiModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
|
||||
switch msg := msg.(type) {
|
||||
case NewMessagesEvent:
|
||||
for _, newMsg := range msg {
|
||||
m.tableRows = append(m.tableRows, messageTableRow(*newMsg))
|
||||
}
|
||||
m.table.SetRows(m.tableRows)
|
||||
m.updateViewportToSelectedMessage()
|
||||
|
||||
case tea.KeyMsg:
|
||||
|
||||
switch msg.String() {
|
||||
|
||||
case "ctrl+c", "q":
|
||||
return m, tea.Quit
|
||||
case "up", "i":
|
||||
m.table.GoUp()
|
||||
m.updateViewportToSelectedMessage()
|
||||
case "down", "k":
|
||||
m.table.GoDown()
|
||||
m.updateViewportToSelectedMessage()
|
||||
}
|
||||
}
|
||||
|
||||
updatedTable, tableMsgs := m.table.Update(nil)
|
||||
updatedViewport, viewportMsgs := m.viewport.Update(msg)
|
||||
|
||||
m.table = updatedTable
|
||||
m.viewport = updatedViewport
|
||||
|
||||
return m, tea.Batch(tableMsgs, viewportMsgs)
|
||||
}
|
||||
|
||||
func (m uiModel) View() string {
|
||||
return lipgloss.JoinVertical(lipgloss.Top, m.table.View(), m.viewport.View())
|
||||
}
|
||||
26
internal/sqs-browse/ui/tblmodel.go
Normal file
26
internal/sqs-browse/ui/tblmodel.go
Normal file
|
|
@ -0,0 +1,26 @@
|
|||
package ui
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/lmika/awstools/internal/sqs-browse/models"
|
||||
table "github.com/calyptia/go-bubble-table"
|
||||
"io"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type messageTableRow models.Message
|
||||
|
||||
func (mtr messageTableRow) Render(w io.Writer, model table.Model, index int) {
|
||||
firstLine := strings.SplitN(string(mtr.Data), "\n", 2)[0]
|
||||
|
||||
sb := strings.Builder{}
|
||||
sb.WriteString(fmt.Sprintf("%d", mtr.ID))
|
||||
sb.WriteString("\t")
|
||||
sb.WriteString(firstLine)
|
||||
|
||||
if index == model.Cursor() {
|
||||
fmt.Fprintln(w, model.Styles.SelectedRow.Render(sb.String()))
|
||||
} else {
|
||||
fmt.Fprintln(w, sb.String())
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue