diff options
Diffstat (limited to 'cache/subcache.go')
-rw-r--r-- | cache/subcache.go | 156 |
1 files changed, 123 insertions, 33 deletions
diff --git a/cache/subcache.go b/cache/subcache.go index 7b599b10..09e53c23 100644 --- a/cache/subcache.go +++ b/cache/subcache.go @@ -34,6 +34,7 @@ type Actions[EntityT entity.Interface] struct { ReadWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error) ReadAllWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan entity.StreamedEntity[EntityT] Remove func(repo repository.ClockedRepo, id entity.Id) error + RemoveAll func(repo repository.ClockedRepo) error MergeAll func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) <-chan entity.MergeResult } @@ -185,51 +186,98 @@ func (sc *SubCache[EntityT, ExcerptT, CacheT]) write() error { return f.Close() } -func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() error { - sc.excerpts = make(map[entity.Id]ExcerptT) +func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() <-chan BuildEvent { + out := make(chan BuildEvent) - allEntities := sc.actions.ReadAllWithResolver(sc.repo, sc.resolvers()) + go func() { + defer close(out) - index, err := sc.repo.GetIndex(sc.namespace) - if err != nil { - return err - } + out <- BuildEvent{ + Typename: sc.typename, + Event: BuildEventStarted, + } - // wipe the index just to be sure - err = index.Clear() - if err != nil { - return err - } + sc.excerpts = make(map[entity.Id]ExcerptT) - indexer, indexEnd := index.IndexBatch() + allEntities := sc.actions.ReadAllWithResolver(sc.repo, sc.resolvers()) - for e := range allEntities { - if e.Err != nil { - return e.Err + index, err := sc.repo.GetIndex(sc.namespace) + if err != nil { + out <- BuildEvent{ + Typename: sc.typename, + Err: err, + } + return + } + + // wipe the index just to be sure + err = index.Clear() + if err != nil { + out <- BuildEvent{ + Typename: sc.typename, + Err: err, + } + return } - cached := sc.makeCached(e.Entity, sc.entityUpdated) - sc.excerpts[e.Entity.Id()] = sc.makeExcerpt(cached) - // might as well keep them in memory - sc.cached[e.Entity.Id()] = cached + indexer, indexEnd := index.IndexBatch() + + for e := range allEntities { + if e.Err != nil { + out <- BuildEvent{ + Typename: sc.typename, + Err: e.Err, + } + return + } - indexData := sc.makeIndexData(cached) - if err := indexer(e.Entity.Id().String(), indexData); err != nil { - return err + cached := sc.makeCached(e.Entity, sc.entityUpdated) + sc.excerpts[e.Entity.Id()] = sc.makeExcerpt(cached) + // might as well keep them in memory + sc.cached[e.Entity.Id()] = cached + + indexData := sc.makeIndexData(cached) + if err := indexer(e.Entity.Id().String(), indexData); err != nil { + out <- BuildEvent{ + Typename: sc.typename, + Err: err, + } + return + } + + out <- BuildEvent{ + Typename: sc.typename, + Event: BuildEventProgress, + Progress: e.CurrentEntity, + Total: e.TotalEntities, + } } - } - err = indexEnd() - if err != nil { - return err - } + err = indexEnd() + if err != nil { + out <- BuildEvent{ + Typename: sc.typename, + Err: err, + } + return + } - err = sc.write() - if err != nil { - return err - } + err = sc.write() + if err != nil { + out <- BuildEvent{ + Typename: sc.typename, + Err: err, + } + return + } - return nil + out <- BuildEvent{ + Typename: sc.typename, + Event: BuildEventFinished, + } + }() + + return out } func (sc *SubCache[EntityT, ExcerptT, CacheT]) SetCacheSize(size int) { @@ -399,7 +447,49 @@ func (sc *SubCache[EntityT, ExcerptT, CacheT]) Remove(prefix string) error { delete(sc.excerpts, e.Id()) sc.lru.Remove(e.Id()) + index, err := sc.repo.GetIndex(sc.namespace) + if err != nil { + sc.mu.Unlock() + return err + } + + err = index.Remove(e.Id().String()) + sc.mu.Unlock() + if err != nil { + return err + } + + return sc.write() +} + +func (sc *SubCache[EntityT, ExcerptT, CacheT]) RemoveAll() error { + sc.mu.Lock() + + err := sc.actions.RemoveAll(sc.repo) + if err != nil { + sc.mu.Unlock() + return err + } + + for id, _ := range sc.cached { + delete(sc.cached, id) + sc.lru.Remove(id) + } + for id, _ := range sc.excerpts { + delete(sc.excerpts, id) + } + + index, err := sc.repo.GetIndex(sc.namespace) + if err != nil { + sc.mu.Unlock() + return err + } + + err = index.Clear() sc.mu.Unlock() + if err != nil { + return err + } return sc.write() } |