Started working on the event bus

This commit is contained in:
Leon Mika 2025-02-03 22:23:48 +11:00
parent fdd2ecc7fc
commit e2f159e980
9 changed files with 233 additions and 61 deletions

View file

@ -1,11 +1,13 @@
package handlers
import (
"bufio"
"errors"
"fmt"
"github.com/gofiber/fiber/v3"
"github.com/jackc/pgx/v5"
"lmika.dev/lmika/hugo-cms/models"
"lmika.dev/lmika/hugo-cms/providers/bus"
"lmika.dev/lmika/hugo-cms/services/sites"
"net/http"
"time"
@ -13,6 +15,7 @@ import (
type Site struct {
Site *sites.Service
Bus *bus.Bus
}
func (s *Site) Create(c fiber.Ctx) error {
@ -75,6 +78,41 @@ func (s *Site) Rebuild(c fiber.Ctx) error {
return c.Redirect().To(fmt.Sprintf("/sites/%v/posts", GetSite(c).ID))
}
func (s *Site) SSE(c fiber.Ctx) error {
siteOfInterest := GetSite(c)
c.Set("Content-Type", "text/event-stream")
c.Set("Cache-Control", "no-cache")
c.Set("Connection", "keep-alive")
c.Set("Transfer-Encoding", "chunked")
return c.SendStreamWriter(func(w *bufio.Writer) {
sub := s.Bus.Subscribe()
defer s.Bus.Unsubscribe(sub)
for e := range sub.C {
switch e.Type {
case models.EventSiteBuildingStart:
eventSite := e.Data.(models.Site)
if eventSite.ID == siteOfInterest.ID {
fmt.Fprintf(w, "event: site-build-status\n")
fmt.Fprintf(w, "data: Building\n")
}
case models.EventSiteBuildingDone:
eventSite := e.Data.(models.Site)
if eventSite.ID == siteOfInterest.ID {
fmt.Fprintf(w, "event: site-build-status\n")
fmt.Fprintf(w, "data: \n")
}
}
if err := w.Flush(); err != nil {
break
}
}
})
}
func (s *Site) WithSite() fiber.Handler {
return func(c fiber.Ctx) (err error) {
id := fiber.Params[int](c, "siteId")

10
main.go
View file

@ -14,6 +14,7 @@ import (
"lmika.dev/lmika/hugo-cms/assets"
"lmika.dev/lmika/hugo-cms/config"
"lmika.dev/lmika/hugo-cms/handlers"
"lmika.dev/lmika/hugo-cms/providers/bus"
"lmika.dev/lmika/hugo-cms/providers/db"
"lmika.dev/lmika/hugo-cms/providers/git"
"lmika.dev/lmika/hugo-cms/providers/hugo"
@ -77,15 +78,16 @@ func main() {
gitProvider := git.New()
themesProvider := themes.New()
netlifyProvider := netlify.New(cfg.NetlifyAuthToken)
bus := bus.New()
jobService := jobs.New()
siteBuilderService := sitebuilder.New(dbp, themesProvider, gitProvider, hugoProvider, netlifyProvider)
siteBuilderService := sitebuilder.New(dbp, themesProvider, gitProvider, hugoProvider, netlifyProvider, bus)
siteService := sites.NewService(cfg, dbp, themesProvider, siteBuilderService, jobService)
postService := posts.New(dbp, siteBuilderService, jobService)
indexHandlers := handlers.IndexHandler{}
siteHandlers := handlers.Site{Site: siteService}
siteHandlers := handlers.Site{Site: siteService, Bus: bus}
postHandlers := handlers.Post{Post: postService}
authHandlers := handlers.AuthHandler{UserService: userService}
@ -103,6 +105,9 @@ func main() {
cfg.EncryptedCookieKey = encryptcookie.GenerateKey(32)
}
bus.Start()
defer bus.Stop()
app := fiber.New(fiber.Config{
Views: tmplEngine,
PassLocalsToViews: true,
@ -134,6 +139,7 @@ func main() {
sr.Get("/settings", siteHandlers.Settings)
sr.Post("/settings", siteHandlers.SaveSettings)
sr.Get("/sse", siteHandlers.SSE)
jobService.Start()
defer jobService.Stop()

31
models/events.go Normal file
View file

@ -0,0 +1,31 @@
package models
import "container/list"
type EventType int
const (
// EventTypeSubscribe event type for the bus indicating a new subscription.
// Data is a (chan Sub) to send the new subscription
EventTypeSubscribe EventType = iota
// EventTypeUnsubscribe event type for the bus indicating to remove a subscription.
// Data is the Sub type
EventTypeUnsubscribe
// EventSiteBuildingStart indicates that the site has started being built. Data = site
EventSiteBuildingStart = 2
// EventSiteBuildingDone indicates that the site has finish building. Data = site
EventSiteBuildingDone = 3
)
type Event struct {
Type EventType
Data any
}
type Sub struct {
C chan Event
Elem *list.Element
}

View file

@ -5,14 +5,3 @@ import "context"
type Job struct {
Do func(ctx context.Context) error
}
func Jobs(jobs ...Job) Job {
return Job{Do: func(ctx context.Context) error {
for _, job := range jobs {
if err := job.Do(ctx); err != nil {
return err
}
}
return nil
}}
}

64
providers/bus/bus.go Normal file
View file

@ -0,0 +1,64 @@
package bus
import (
"container/list"
"lmika.dev/lmika/hugo-cms/models"
)
type Bus struct {
subs *list.List
eventQueue chan models.Event
}
func New() *Bus {
return &Bus{
subs: list.New(),
eventQueue: make(chan models.Event, 20),
}
}
func (b *Bus) Fire(event models.Event) {
b.eventQueue <- event
}
func (b *Bus) Start() {
go func() {
for e := range b.eventQueue {
switch e.Type {
case models.EventTypeSubscribe:
retChan := e.Data.(chan *models.Sub)
newSub := &models.Sub{C: make(chan models.Event, 1)}
newSub.Elem = b.subs.PushBack(newSub)
retChan <- newSub
case models.EventTypeUnsubscribe:
sub := e.Data.(*models.Sub)
close(sub.C)
b.subs.Remove(sub.Elem)
default:
for f := b.subs.Front(); f != nil; f = f.Next() {
sub := f.Value.(*models.Sub)
select {
case sub.C <- e:
default:
}
}
}
}
}()
}
func (b *Bus) Stop() {
close(b.eventQueue)
}
func (b *Bus) Subscribe() *models.Sub {
resChan := make(chan *models.Sub)
b.eventQueue <- models.Event{Type: models.EventTypeSubscribe, Data: resChan}
return <-resChan
}
func (b *Bus) Unsubscribe(sub *models.Sub) {
b.eventQueue <- models.Event{Type: models.EventTypeUnsubscribe, Data: sub}
}

View file

@ -15,11 +15,14 @@ import (
func (s *Service) WritePost(site models.Site, post models.Post) models.Job {
return models.Job{
Do: func(ctx context.Context) error {
s.signalSiteBuildingStarted(ctx, site)
defer s.signalSiteBuildingFinished(ctx, site)
rbn, err := s.fullRebuildNecessary(ctx, site)
if err != nil {
return err
} else if rbn {
return s.RebuildSite(site, site).Do(ctx)
return s.rebuildSite(ctx, site, site)
}
if err := s.writePost(site, post); err != nil {
@ -33,31 +36,24 @@ func (s *Service) WritePost(site models.Site, post models.Post) models.Job {
func (s *Service) WriteAllPosts(site models.Site) models.Job {
return models.Job{
Do: func(ctx context.Context) error {
var startId int64
now := time.Now()
for {
posts, err := s.db.ListPublishablePosts(ctx, int64(startId), site.ID, now)
if err != nil {
s.signalSiteBuildingStarted(ctx, site)
defer s.signalSiteBuildingFinished(ctx, site)
if err := s.writeAllPosts(ctx, site); err != nil {
return err
} else if len(posts) == 0 {
return nil
}
for _, post := range posts {
if err := s.writePost(site, post); err != nil {
return err
}
}
startId = posts[len(posts)-1].ID
}
return s.publish(ctx, site)
},
}
}
func (s *Service) DeletePost(site models.Site, post models.Post) models.Job {
return models.Jobs(
models.Job{
return models.Job{
Do: func(ctx context.Context) error {
s.signalSiteBuildingStarted(ctx, site)
defer s.signalSiteBuildingFinished(ctx, site)
themeMeta, ok := s.themes.Lookup(site.Theme)
if !ok {
return errors.New("theme not found")
@ -75,11 +71,29 @@ func (s *Service) DeletePost(site models.Site, post models.Post) models.Job {
if os.Remove(postFilename) != nil {
return nil
}
return s.publish(ctx, site)
},
}
}
func (s *Service) writeAllPosts(ctx context.Context, site models.Site) error {
var startId int64
now := time.Now()
for {
posts, err := s.db.ListPublishablePosts(ctx, int64(startId), site.ID, now)
if err != nil {
return err
} else if len(posts) == 0 {
return nil
},
},
s.Publish(site),
)
}
for _, post := range posts {
if err := s.writePost(site, post); err != nil {
return err
}
}
startId = posts[len(posts)-1].ID
}
}
func (s *Service) writePost(site models.Site, post models.Post) error {

View file

@ -8,6 +8,9 @@ import (
func (s *Service) Publish(site models.Site) models.Job {
return models.Job{
Do: func(ctx context.Context) error {
s.signalSiteBuildingStarted(ctx, site)
defer s.signalSiteBuildingFinished(ctx, site)
return s.publish(ctx, site)
},
}

View file

@ -4,6 +4,7 @@ import (
"context"
"errors"
"lmika.dev/lmika/hugo-cms/models"
"lmika.dev/lmika/hugo-cms/providers/bus"
"lmika.dev/lmika/hugo-cms/providers/db"
"lmika.dev/lmika/hugo-cms/providers/git"
"lmika.dev/lmika/hugo-cms/providers/hugo"
@ -20,6 +21,7 @@ type Service struct {
git *git.Provider
hugo *hugo.Provider
netlify *netlify.Provider
bus *bus.Bus
}
func New(
@ -28,6 +30,7 @@ func New(
git *git.Provider,
hugo *hugo.Provider,
netlify *netlify.Provider,
bus *bus.Bus,
) *Service {
return &Service{
db: db,
@ -35,33 +38,48 @@ func New(
git: git,
hugo: hugo,
netlify: netlify,
bus: bus,
}
}
func (s *Service) CreateNewSite(site models.Site) models.Job {
return models.Job{
Do: func(ctx context.Context) error {
s.signalSiteBuildingStarted(ctx, site)
defer s.signalSiteBuildingFinished(ctx, site)
return s.createSite(ctx, site)
},
}
}
func (s *Service) RebuildSite(oldSite, newSite models.Site) models.Job {
return models.Jobs(
models.Job{
return models.Job{
Do: func(ctx context.Context) error {
s.signalSiteBuildingStarted(ctx, newSite)
defer s.signalSiteBuildingFinished(ctx, newSite)
return s.rebuildSite(ctx, oldSite, newSite)
},
}
}
func (s *Service) rebuildSite(ctx context.Context, oldSite, newSite models.Site) error {
// Teardown the existing site
siteDir := s.hugo.SiteStagingDir(oldSite, hugo.BaseSiteDir)
if err := os.RemoveAll(siteDir); err != nil {
return err
}
return nil
},
},
s.CreateNewSite(newSite),
s.WriteAllPosts(newSite),
s.Publish(newSite),
)
if err := s.createSite(ctx, newSite); err != nil {
return err
}
if err := s.writeAllPosts(ctx, newSite); err != nil {
return err
}
return s.publish(ctx, newSite)
}
func (s *Service) fullRebuildNecessary(ctx context.Context, site models.Site) (bool, error) {
@ -122,3 +140,11 @@ func (s *Service) createSite(ctx context.Context, site models.Site) error {
}
return nil
}
func (s *Service) signalSiteBuildingStarted(ctx context.Context, site models.Site) {
s.bus.Fire(models.Event{Type: models.EventSiteBuildingStart, Data: site})
}
func (s *Service) signalSiteBuildingFinished(ctx context.Context, site models.Site) {
s.bus.Fire(models.Event{Type: models.EventSiteBuildingDone, Data: site})
}

View file

@ -7,6 +7,7 @@
<link rel="stylesheet" href="/assets/css/main.css">
<title>Hugo CMS</title>
<script src="https://unpkg.com/htmx.org@2.0.4"></script>
<script src="https://unpkg.com/htmx-ext-sse@2.2.2/sse.js"></script>
</head>
<body class="role-site">
<header>