diff --git a/handlers/site.go b/handlers/site.go index 422b4a4..425e4a0 100644 --- a/handlers/site.go +++ b/handlers/site.go @@ -1,11 +1,13 @@ package handlers import ( + "bufio" "errors" "fmt" "github.com/gofiber/fiber/v3" "github.com/jackc/pgx/v5" "lmika.dev/lmika/hugo-cms/models" + "lmika.dev/lmika/hugo-cms/providers/bus" "lmika.dev/lmika/hugo-cms/services/sites" "net/http" "time" @@ -13,6 +15,7 @@ import ( type Site struct { Site *sites.Service + Bus *bus.Bus } func (s *Site) Create(c fiber.Ctx) error { @@ -75,6 +78,41 @@ func (s *Site) Rebuild(c fiber.Ctx) error { return c.Redirect().To(fmt.Sprintf("/sites/%v/posts", GetSite(c).ID)) } +func (s *Site) SSE(c fiber.Ctx) error { + siteOfInterest := GetSite(c) + + c.Set("Content-Type", "text/event-stream") + c.Set("Cache-Control", "no-cache") + c.Set("Connection", "keep-alive") + c.Set("Transfer-Encoding", "chunked") + + return c.SendStreamWriter(func(w *bufio.Writer) { + sub := s.Bus.Subscribe() + defer s.Bus.Unsubscribe(sub) + + for e := range sub.C { + switch e.Type { + case models.EventSiteBuildingStart: + eventSite := e.Data.(models.Site) + if eventSite.ID == siteOfInterest.ID { + fmt.Fprintf(w, "event: site-build-status\n") + fmt.Fprintf(w, "data: Building\n") + } + case models.EventSiteBuildingDone: + eventSite := e.Data.(models.Site) + if eventSite.ID == siteOfInterest.ID { + fmt.Fprintf(w, "event: site-build-status\n") + fmt.Fprintf(w, "data: \n") + } + } + + if err := w.Flush(); err != nil { + break + } + } + }) +} + func (s *Site) WithSite() fiber.Handler { return func(c fiber.Ctx) (err error) { id := fiber.Params[int](c, "siteId") diff --git a/main.go b/main.go index 9fff438..d267d7e 100644 --- a/main.go +++ b/main.go @@ -14,6 +14,7 @@ import ( "lmika.dev/lmika/hugo-cms/assets" "lmika.dev/lmika/hugo-cms/config" "lmika.dev/lmika/hugo-cms/handlers" + "lmika.dev/lmika/hugo-cms/providers/bus" "lmika.dev/lmika/hugo-cms/providers/db" "lmika.dev/lmika/hugo-cms/providers/git" "lmika.dev/lmika/hugo-cms/providers/hugo" @@ -77,15 +78,16 @@ func main() { gitProvider := git.New() themesProvider := themes.New() netlifyProvider := netlify.New(cfg.NetlifyAuthToken) + bus := bus.New() jobService := jobs.New() - siteBuilderService := sitebuilder.New(dbp, themesProvider, gitProvider, hugoProvider, netlifyProvider) + siteBuilderService := sitebuilder.New(dbp, themesProvider, gitProvider, hugoProvider, netlifyProvider, bus) siteService := sites.NewService(cfg, dbp, themesProvider, siteBuilderService, jobService) postService := posts.New(dbp, siteBuilderService, jobService) indexHandlers := handlers.IndexHandler{} - siteHandlers := handlers.Site{Site: siteService} + siteHandlers := handlers.Site{Site: siteService, Bus: bus} postHandlers := handlers.Post{Post: postService} authHandlers := handlers.AuthHandler{UserService: userService} @@ -103,6 +105,9 @@ func main() { cfg.EncryptedCookieKey = encryptcookie.GenerateKey(32) } + bus.Start() + defer bus.Stop() + app := fiber.New(fiber.Config{ Views: tmplEngine, PassLocalsToViews: true, @@ -134,6 +139,7 @@ func main() { sr.Get("/settings", siteHandlers.Settings) sr.Post("/settings", siteHandlers.SaveSettings) + sr.Get("/sse", siteHandlers.SSE) jobService.Start() defer jobService.Stop() diff --git a/models/events.go b/models/events.go new file mode 100644 index 0000000..c8ca882 --- /dev/null +++ b/models/events.go @@ -0,0 +1,31 @@ +package models + +import "container/list" + +type EventType int + +const ( + // EventTypeSubscribe event type for the bus indicating a new subscription. + // Data is a (chan Sub) to send the new subscription + EventTypeSubscribe EventType = iota + + // EventTypeUnsubscribe event type for the bus indicating to remove a subscription. + // Data is the Sub type + EventTypeUnsubscribe + + // EventSiteBuildingStart indicates that the site has started being built. Data = site + EventSiteBuildingStart = 2 + + // EventSiteBuildingDone indicates that the site has finish building. Data = site + EventSiteBuildingDone = 3 +) + +type Event struct { + Type EventType + Data any +} + +type Sub struct { + C chan Event + Elem *list.Element +} diff --git a/models/job.go b/models/job.go index fa6b040..c082b20 100644 --- a/models/job.go +++ b/models/job.go @@ -5,14 +5,3 @@ import "context" type Job struct { Do func(ctx context.Context) error } - -func Jobs(jobs ...Job) Job { - return Job{Do: func(ctx context.Context) error { - for _, job := range jobs { - if err := job.Do(ctx); err != nil { - return err - } - } - return nil - }} -} diff --git a/providers/bus/bus.go b/providers/bus/bus.go new file mode 100644 index 0000000..ac79b52 --- /dev/null +++ b/providers/bus/bus.go @@ -0,0 +1,64 @@ +package bus + +import ( + "container/list" + "lmika.dev/lmika/hugo-cms/models" +) + +type Bus struct { + subs *list.List + eventQueue chan models.Event +} + +func New() *Bus { + return &Bus{ + subs: list.New(), + eventQueue: make(chan models.Event, 20), + } +} + +func (b *Bus) Fire(event models.Event) { + b.eventQueue <- event +} + +func (b *Bus) Start() { + go func() { + for e := range b.eventQueue { + switch e.Type { + case models.EventTypeSubscribe: + retChan := e.Data.(chan *models.Sub) + + newSub := &models.Sub{C: make(chan models.Event, 1)} + newSub.Elem = b.subs.PushBack(newSub) + + retChan <- newSub + case models.EventTypeUnsubscribe: + sub := e.Data.(*models.Sub) + close(sub.C) + b.subs.Remove(sub.Elem) + default: + for f := b.subs.Front(); f != nil; f = f.Next() { + sub := f.Value.(*models.Sub) + select { + case sub.C <- e: + default: + } + } + } + } + }() +} + +func (b *Bus) Stop() { + close(b.eventQueue) +} + +func (b *Bus) Subscribe() *models.Sub { + resChan := make(chan *models.Sub) + b.eventQueue <- models.Event{Type: models.EventTypeSubscribe, Data: resChan} + return <-resChan +} + +func (b *Bus) Unsubscribe(sub *models.Sub) { + b.eventQueue <- models.Event{Type: models.EventTypeUnsubscribe, Data: sub} +} diff --git a/services/sitebuilder/posts.go b/services/sitebuilder/posts.go index 30e44ef..72ac747 100644 --- a/services/sitebuilder/posts.go +++ b/services/sitebuilder/posts.go @@ -15,11 +15,14 @@ import ( func (s *Service) WritePost(site models.Site, post models.Post) models.Job { return models.Job{ Do: func(ctx context.Context) error { + s.signalSiteBuildingStarted(ctx, site) + defer s.signalSiteBuildingFinished(ctx, site) + rbn, err := s.fullRebuildNecessary(ctx, site) if err != nil { return err } else if rbn { - return s.RebuildSite(site, site).Do(ctx) + return s.rebuildSite(ctx, site, site) } if err := s.writePost(site, post); err != nil { @@ -33,53 +36,64 @@ func (s *Service) WritePost(site models.Site, post models.Post) models.Job { func (s *Service) WriteAllPosts(site models.Site) models.Job { return models.Job{ Do: func(ctx context.Context) error { - var startId int64 - now := time.Now() - for { - posts, err := s.db.ListPublishablePosts(ctx, int64(startId), site.ID, now) - if err != nil { - return err - } else if len(posts) == 0 { - return nil - } + s.signalSiteBuildingStarted(ctx, site) + defer s.signalSiteBuildingFinished(ctx, site) - for _, post := range posts { - if err := s.writePost(site, post); err != nil { - return err - } - } - startId = posts[len(posts)-1].ID + if err := s.writeAllPosts(ctx, site); err != nil { + return err } + + return s.publish(ctx, site) }, } } func (s *Service) DeletePost(site models.Site, post models.Post) models.Job { - return models.Jobs( - models.Job{ - Do: func(ctx context.Context) error { - themeMeta, ok := s.themes.Lookup(site.Theme) - if !ok { - return errors.New("theme not found") - } + return models.Job{ + Do: func(ctx context.Context) error { + s.signalSiteBuildingStarted(ctx, site) + defer s.signalSiteBuildingFinished(ctx, site) - postFilename := s.postFilename(site, themeMeta, post) + themeMeta, ok := s.themes.Lookup(site.Theme) + if !ok { + return errors.New("theme not found") + } - if _, err := os.Stat(postFilename); err != nil { - if errors.Is(err, os.ErrNotExist) { - return nil - } - return err - } + postFilename := s.postFilename(site, themeMeta, post) - if os.Remove(postFilename) != nil { + if _, err := os.Stat(postFilename); err != nil { + if errors.Is(err, os.ErrNotExist) { return nil } + return err + } + + if os.Remove(postFilename) != nil { return nil - }, + } + return s.publish(ctx, site) }, - s.Publish(site), - ) + } +} + +func (s *Service) writeAllPosts(ctx context.Context, site models.Site) error { + var startId int64 + now := time.Now() + for { + posts, err := s.db.ListPublishablePosts(ctx, int64(startId), site.ID, now) + if err != nil { + return err + } else if len(posts) == 0 { + return nil + } + + for _, post := range posts { + if err := s.writePost(site, post); err != nil { + return err + } + } + startId = posts[len(posts)-1].ID + } } func (s *Service) writePost(site models.Site, post models.Post) error { diff --git a/services/sitebuilder/publish.go b/services/sitebuilder/publish.go index 550ef3d..509d86e 100644 --- a/services/sitebuilder/publish.go +++ b/services/sitebuilder/publish.go @@ -8,6 +8,9 @@ import ( func (s *Service) Publish(site models.Site) models.Job { return models.Job{ Do: func(ctx context.Context) error { + s.signalSiteBuildingStarted(ctx, site) + defer s.signalSiteBuildingFinished(ctx, site) + return s.publish(ctx, site) }, } diff --git a/services/sitebuilder/service.go b/services/sitebuilder/service.go index 79bc879..914f901 100644 --- a/services/sitebuilder/service.go +++ b/services/sitebuilder/service.go @@ -4,6 +4,7 @@ import ( "context" "errors" "lmika.dev/lmika/hugo-cms/models" + "lmika.dev/lmika/hugo-cms/providers/bus" "lmika.dev/lmika/hugo-cms/providers/db" "lmika.dev/lmika/hugo-cms/providers/git" "lmika.dev/lmika/hugo-cms/providers/hugo" @@ -20,6 +21,7 @@ type Service struct { git *git.Provider hugo *hugo.Provider netlify *netlify.Provider + bus *bus.Bus } func New( @@ -28,6 +30,7 @@ func New( git *git.Provider, hugo *hugo.Provider, netlify *netlify.Provider, + bus *bus.Bus, ) *Service { return &Service{ db: db, @@ -35,33 +38,48 @@ func New( git: git, hugo: hugo, netlify: netlify, + bus: bus, } } func (s *Service) CreateNewSite(site models.Site) models.Job { return models.Job{ Do: func(ctx context.Context) error { + s.signalSiteBuildingStarted(ctx, site) + defer s.signalSiteBuildingFinished(ctx, site) + return s.createSite(ctx, site) }, } } func (s *Service) RebuildSite(oldSite, newSite models.Site) models.Job { - return models.Jobs( - models.Job{ - Do: func(ctx context.Context) error { - // Teardown the existing site - siteDir := s.hugo.SiteStagingDir(oldSite, hugo.BaseSiteDir) - if err := os.RemoveAll(siteDir); err != nil { - return err - } - return nil - }, + return models.Job{ + Do: func(ctx context.Context) error { + s.signalSiteBuildingStarted(ctx, newSite) + defer s.signalSiteBuildingFinished(ctx, newSite) + + return s.rebuildSite(ctx, oldSite, newSite) }, - s.CreateNewSite(newSite), - s.WriteAllPosts(newSite), - s.Publish(newSite), - ) + } +} + +func (s *Service) rebuildSite(ctx context.Context, oldSite, newSite models.Site) error { + // Teardown the existing site + siteDir := s.hugo.SiteStagingDir(oldSite, hugo.BaseSiteDir) + if err := os.RemoveAll(siteDir); err != nil { + return err + } + + if err := s.createSite(ctx, newSite); err != nil { + return err + } + + if err := s.writeAllPosts(ctx, newSite); err != nil { + return err + } + + return s.publish(ctx, newSite) } func (s *Service) fullRebuildNecessary(ctx context.Context, site models.Site) (bool, error) { @@ -122,3 +140,11 @@ func (s *Service) createSite(ctx context.Context, site models.Site) error { } return nil } + +func (s *Service) signalSiteBuildingStarted(ctx context.Context, site models.Site) { + s.bus.Fire(models.Event{Type: models.EventSiteBuildingStart, Data: site}) +} + +func (s *Service) signalSiteBuildingFinished(ctx context.Context, site models.Site) { + s.bus.Fire(models.Event{Type: models.EventSiteBuildingDone, Data: site}) +} diff --git a/templates/layouts/site.html b/templates/layouts/site.html index a946e46..45bd5dc 100644 --- a/templates/layouts/site.html +++ b/templates/layouts/site.html @@ -7,6 +7,7 @@