diff options
Diffstat (limited to 'cache/subcache.go')
-rw-r--r-- | cache/subcache.go | 160 |
1 files changed, 122 insertions, 38 deletions
diff --git a/cache/subcache.go b/cache/subcache.go index 1737da43..0678988a 100644 --- a/cache/subcache.go +++ b/cache/subcache.go @@ -4,18 +4,18 @@ import ( "bytes" "encoding/gob" "fmt" - "os" "sync" "github.com/pkg/errors" - "github.com/MichaelMure/git-bug/entities/bug" + "github.com/MichaelMure/git-bug/entities/identity" "github.com/MichaelMure/git-bug/entity" "github.com/MichaelMure/git-bug/repository" ) type Excerpt interface { Id() entity.Id + setId(id entity.Id) } type CacheEntity interface { @@ -25,15 +25,27 @@ type CacheEntity interface { type getUserIdentityFunc func() (*IdentityCache, error) +// Actions expose a number of action functions on Entities, to give upper layers (cache) a way to normalize interactions. +// Note: ideally this wouldn't exist, the cache layer would assume that everything is an entity/dag, and directly use the +// functions from this package, but right now identities are not using that framework. +type Actions[EntityT entity.Interface] struct { + ReadWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error) + ReadAllWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan entity.StreamedEntity[EntityT] + Remove func(repo repository.ClockedRepo, id entity.Id) error + MergeAll func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) <-chan entity.MergeResult +} + +var _ cacheMgmt = &SubCache[entity.Interface, Excerpt, CacheEntity]{} + type SubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity] struct { repo repository.ClockedRepo resolvers func() entity.Resolvers - getUserIdentity getUserIdentityFunc - readWithResolver func(repository.ClockedRepo, entity.Resolvers, entity.Id) (EntityT, error) - makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT - makeExcerpt func(EntityT) ExcerptT - makeIndex func(CacheT) []string + getUserIdentity getUserIdentityFunc + makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT + makeExcerpt func(CacheT) ExcerptT + makeIndexData func(CacheT) []string + actions Actions[EntityT] typename string namespace string @@ -50,14 +62,19 @@ func NewSubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity] repo repository.ClockedRepo, resolvers func() entity.Resolvers, getUserIdentity getUserIdentityFunc, makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT, - makeExcerpt func(EntityT) ExcerptT, - makeIndex func(CacheT) []string, + makeExcerpt func(CacheT) ExcerptT, + makeIndexData func(CacheT) []string, + actions Actions[EntityT], typename, namespace string, version uint, maxLoaded int) *SubCache[EntityT, ExcerptT, CacheT] { return &SubCache[EntityT, ExcerptT, CacheT]{ repo: repo, resolvers: resolvers, getUserIdentity: getUserIdentity, + makeCached: makeCached, + makeExcerpt: makeExcerpt, + makeIndexData: makeIndexData, + actions: actions, typename: typename, namespace: namespace, version: version, @@ -98,6 +115,12 @@ func (sc *SubCache[EntityT, ExcerptT, CacheT]) Load() error { return fmt.Errorf("unknown %s cache format version %v", sc.namespace, aux.Version) } + // the id is not serialized in the excerpt itself (non-exported field in go, long story ...), + // so we fix it here, which doubles as enforcing coherency. + for id, excerpt := range aux.Excerpts { + excerpt.setId(id) + } + sc.excerpts = aux.Excerpts index, err := sc.repo.GetIndex(sc.typename) @@ -118,7 +141,7 @@ func (sc *SubCache[EntityT, ExcerptT, CacheT]) Load() error { } // Write will serialize on disk the entity cache file -func (sc *SubCache[EntityT, ExcerptT, CacheT]) Write() error { +func (sc *SubCache[EntityT, ExcerptT, CacheT]) write() error { sc.mu.RLock() defer sc.mu.RUnlock() @@ -155,9 +178,7 @@ func (sc *SubCache[EntityT, ExcerptT, CacheT]) Write() error { func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() error { sc.excerpts = make(map[entity.Id]ExcerptT) - sc.readWithResolver - - allBugs := bug.ReadAllWithResolver(c.repo, c.resolvers) + allEntities := sc.actions.ReadAllWithResolver(sc.repo, sc.resolvers()) index, err := sc.repo.GetIndex(sc.typename) if err != nil { @@ -172,15 +193,17 @@ func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() error { indexer, indexEnd := index.IndexBatch() - for b := range allBugs { - if b.Err != nil { - return b.Err + for e := range allEntities { + if e.Err != nil { + return e.Err } - snap := b.Bug.Compile() - c.bugExcerpts[b.Bug.Id()] = NewBugExcerpt(b.Bug, snap) + // TODO: doesn't actually record in cache, should we? + cached := sc.makeCached(e.Entity, sc.entityUpdated) + sc.excerpts[e.Entity.Id()] = sc.makeExcerpt(cached) - if err := indexer(snap); err != nil { + indexData := sc.makeIndexData(cached) + if err := indexer(e.Entity.Id().String(), indexData); err != nil { return err } } @@ -190,10 +213,19 @@ func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() error { return err } - _, _ = fmt.Fprintln(os.Stderr, "Done.") + err = sc.write() + if err != nil { + return err + } + return nil } +func (sc *SubCache[EntityT, ExcerptT, CacheT]) SetCacheSize(size int) { + sc.maxLoaded = size + sc.evictIfNeeded() +} + func (sc *SubCache[EntityT, ExcerptT, CacheT]) Close() error { sc.mu.Lock() defer sc.mu.Unlock() @@ -229,7 +261,7 @@ func (sc *SubCache[EntityT, ExcerptT, CacheT]) Resolve(id entity.Id) (CacheT, er } sc.mu.RUnlock() - e, err := sc.readWithResolver(sc.repo, sc.resolvers(), id) + e, err := sc.actions.ReadWithResolver(sc.repo, sc.resolvers(), id) if err != nil { return *new(CacheT), err } @@ -315,8 +347,6 @@ func (sc *SubCache[EntityT, ExcerptT, CacheT]) resolveMatcher(f func(ExcerptT) b return matching[0], nil } -var errNotInCache = errors.New("entity missing from cache") - func (sc *SubCache[EntityT, ExcerptT, CacheT]) add(e EntityT) (CacheT, error) { sc.mu.Lock() if _, has := sc.cached[e.Id()]; has { @@ -348,26 +378,74 @@ func (sc *SubCache[EntityT, ExcerptT, CacheT]) Remove(prefix string) error { sc.mu.Lock() - err = bug.Remove(c.repo, b.Id()) + err = sc.actions.Remove(sc.repo, e.Id()) if err != nil { - c.muBug.Unlock() - + sc.mu.Unlock() return err } - delete(c.bugs, b.Id()) - delete(c.bugExcerpts, b.Id()) - c.loadedBugs.Remove(b.Id()) + delete(sc.cached, e.Id()) + delete(sc.excerpts, e.Id()) + sc.lru.Remove(e.Id()) + + sc.mu.Unlock() + + return sc.write() +} + +func (sc *SubCache[EntityT, ExcerptT, CacheT]) MergeAll(remote string) <-chan entity.MergeResult { + out := make(chan entity.MergeResult) - c.muBug.Unlock() + // Intercept merge results to update the cache properly + go func() { + defer close(out) + + author, err := sc.getUserIdentity() + if err != nil { + out <- entity.NewMergeError(err, "") + return + } + + results := sc.actions.MergeAll(sc.repo, sc.resolvers(), remote, author) + for result := range results { + out <- result + + if result.Err != nil { + continue + } + + switch result.Status { + case entity.MergeStatusNew, entity.MergeStatusUpdated: + e := result.Entity.(EntityT) + + // TODO: doesn't actually record in cache, should we? + cached := sc.makeCached(e, sc.entityUpdated) + + sc.mu.Lock() + sc.excerpts[result.Id] = sc.makeExcerpt(cached) + sc.mu.Unlock() + } + } + + err = sc.write() + if err != nil { + out <- entity.NewMergeError(err, "") + return + } + }() + + return out - return c.writeBugCache() +} + +func (sc *SubCache[EntityT, ExcerptT, CacheT]) GetNamespace() string { + return sc.namespace } // entityUpdated is a callback to trigger when the excerpt of an entity changed func (sc *SubCache[EntityT, ExcerptT, CacheT]) entityUpdated(id entity.Id) error { sc.mu.Lock() - b, ok := sc.cached[id] + e, ok := sc.cached[id] if !ok { sc.mu.Unlock() @@ -376,19 +454,24 @@ func (sc *SubCache[EntityT, ExcerptT, CacheT]) entityUpdated(id entity.Id) error // memory and thus concurrent write. // Failing immediately here is the simple and safe solution to avoid // complicated data loss. - return errNotInCache + return errors.New("entity missing from cache") } sc.lru.Get(id) // sc.excerpts[id] = bug2.NewBugExcerpt(b.bug, b.Snapshot()) - sc.excerpts[id] = bug2.NewBugExcerpt(b.bug, b.Snapshot()) + sc.excerpts[id] = sc.makeExcerpt(e) sc.mu.Unlock() - if err := sc.addBugToSearchIndex(b.Snapshot()); err != nil { + index, err := sc.repo.GetIndex(sc.typename) + if err != nil { + return err + } + + err = index.IndexOne(e.Id().String(), sc.makeIndexData(e)) + if err != nil { return err } - // we only need to write the bug cache - return sc.Write() + return sc.write() } // evictIfNeeded will evict an entity from the cache if needed @@ -405,7 +488,8 @@ func (sc *SubCache[EntityT, ExcerptT, CacheT]) evictIfNeeded() { continue } - b.Lock() + // TODO + // b.Lock() sc.lru.Remove(id) delete(sc.cached, id) |