From 4f7058bf36085fa658ec1c3b69531ff973185267 Mon Sep 17 00:00:00 2001 From: Leon Mika Date: Mon, 23 Feb 2026 21:35:12 +1100 Subject: [PATCH] Have got asynchronous publishing working --- _test-site/posts/2026/02/23-be-a-comma.md | 10 +++++ main.go | 6 ++- services/posts/create.go | 5 +-- services/posts/delete.go | 45 +++++++++++++++++------ services/posts/list.go | 9 +---- services/posts/service.go | 4 +- services/publisher/pqueue.go | 44 ++++++++++++++++++++++ views/_common/nav.html | 4 +- views/posts/index.html | 4 +- 9 files changed, 100 insertions(+), 31 deletions(-) create mode 100644 _test-site/posts/2026/02/23-be-a-comma.md create mode 100644 services/publisher/pqueue.go diff --git a/_test-site/posts/2026/02/23-be-a-comma.md b/_test-site/posts/2026/02/23-be-a-comma.md new file mode 100644 index 0000000..102c87c --- /dev/null +++ b/_test-site/posts/2026/02/23-be-a-comma.md @@ -0,0 +1,10 @@ +--- +id: Uk11zptnUi3A +title: "" +date: 2026-02-23T10:28:37Z +tags: [] +slug: /2026/02/23/be-a-comma +--- +Be a comma than a full stop. + +Also, this will be deleted soon. \ No newline at end of file diff --git a/main.go b/main.go index 1870e63..491e4a4 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "html" "html/template" "log" @@ -27,8 +28,11 @@ func main() { defer dbp.Close() publisherSvc := publisher.New(dbp) + publisherQueue := publisher.NewQueue(publisherSvc) - postService := posts.New(dbp, publisherSvc) + publisherQueue.Start(context.Background()) + + postService := posts.New(dbp, publisherQueue) //user, err := dbp.SelectUserByUsername(context.Background(), "testuser") //if err != nil { diff --git a/services/posts/create.go b/services/posts/create.go index 191dbbc..6d4cd94 100644 --- a/services/posts/create.go +++ b/services/posts/create.go @@ -34,10 +34,7 @@ func (s *Service) PublishPost(ctx context.Context, params CreatePostParams) (*mo return nil, err } - // TODO: do on separate thread - if err := s.publisher.Publish(ctx, site); err != nil { - return nil, err - } + s.publisher.Queue(site) return post, nil } diff --git a/services/posts/delete.go b/services/posts/delete.go index 6d53690..df220ef 100644 --- a/services/posts/delete.go +++ b/services/posts/delete.go @@ -12,16 +12,9 @@ const ( ) func (s *Service) DeletePost(ctx context.Context, pid int64, hardDelete bool) error { - site, ok := models.GetSite(ctx) - if !ok { - return models.SiteRequiredError - } - - post, err := s.db.SelectPost(ctx, pid) + post, site, err := s.fetchPostAndSite(ctx, pid) if err != nil { return err - } else if post.SiteID != site.ID { - return models.NotFoundError } if hardDelete && post.DeletedAt.Unix() > 0 { @@ -30,17 +23,45 @@ func (s *Service) DeletePost(ctx context.Context, pid int64, hardDelete bool) er return models.DeleteDebounceError } - return s.db.HardDeletePost(ctx, post.ID) + if err := s.db.HardDeletePost(ctx, post.ID); err != nil { + return err + } + } else { + if err := s.db.SoftDeletePost(ctx, post.ID); err != nil { + return err + } } - return s.db.SoftDeletePost(ctx, post.ID) + s.publisher.Queue(site) + + return nil } func (s *Service) RestorePost(ctx context.Context, pid int64) error { - post, err := s.db.SelectPost(ctx, pid) + post, site, err := s.fetchPostAndSite(ctx, pid) if err != nil { return err } - return s.db.RestorePost(ctx, post.ID) + if err := s.db.RestorePost(ctx, post.ID); err != nil { + return err + } + + s.publisher.Queue(site) + return nil +} + +func (s *Service) fetchPostAndSite(ctx context.Context, pid int64) (*models.Post, models.Site, error) { + site, ok := models.GetSite(ctx) + if !ok { + return nil, models.Site{}, models.SiteRequiredError + } + + post, err := s.db.SelectPost(ctx, pid) + if err != nil { + return nil, models.Site{}, err + } else if post.SiteID != site.ID { + return nil, models.Site{}, models.NotFoundError + } + return post, site, nil } diff --git a/services/posts/list.go b/services/posts/list.go index 69ef8db..e94b24b 100644 --- a/services/posts/list.go +++ b/services/posts/list.go @@ -21,16 +21,9 @@ func (s *Service) ListPosts(ctx context.Context, showDeleted bool) ([]*models.Po } func (s *Service) GetPost(ctx context.Context, pid int64) (*models.Post, error) { - site, ok := models.GetSite(ctx) - if !ok { - return nil, models.SiteRequiredError - } - - post, err := s.db.SelectPost(ctx, pid) + post, _, err := s.fetchPostAndSite(ctx, pid) if err != nil { return nil, err - } else if post.SiteID != site.ID { - return nil, models.NotFoundError } return post, nil diff --git a/services/posts/service.go b/services/posts/service.go index c0b19d7..b7bb433 100644 --- a/services/posts/service.go +++ b/services/posts/service.go @@ -7,10 +7,10 @@ import ( type Service struct { db *db.Provider - publisher *publisher.Publisher + publisher *publisher.Queue } -func New(db *db.Provider, publisher *publisher.Publisher) *Service { +func New(db *db.Provider, publisher *publisher.Queue) *Service { return &Service{ db: db, publisher: publisher, diff --git a/services/publisher/pqueue.go b/services/publisher/pqueue.go new file mode 100644 index 0000000..38608f3 --- /dev/null +++ b/services/publisher/pqueue.go @@ -0,0 +1,44 @@ +package publisher + +import ( + "context" + "log" + + "lmika.dev/lmika/weiro/models" +) + +type Queue struct { + publisher *Publisher + pending chan models.Site +} + +func NewQueue(publisher *Publisher) *Queue { + return &Queue{ + publisher: publisher, + pending: make(chan models.Site, 1), + } +} + +func (q *Queue) Queue(site models.Site) bool { + select { + case q.pending <- site: + return true + default: + return false + } +} + +func (q *Queue) Start(ctx context.Context) { + go func() { + for { + select { + case site := <-q.pending: + if err := q.publisher.Publish(ctx, site); err != nil { + log.Printf("error publishing site: %v", err) + } + case <-ctx.Done(): + return + } + } + }() +} diff --git a/views/_common/nav.html b/views/_common/nav.html index 9446f20..6ae9613 100644 --- a/views/_common/nav.html +++ b/views/_common/nav.html @@ -1,6 +1,6 @@