package cache import ( "bytes" "encoding/gob" "fmt" "path/filepath" "sync" "github.com/pkg/errors" "github.com/MichaelMure/git-bug/entities/identity" "github.com/MichaelMure/git-bug/entity" "github.com/MichaelMure/git-bug/repository" ) type Excerpt interface { Id() entity.Id setId(id entity.Id) } type CacheEntity interface { Id() entity.Id NeedCommit() bool Lock() } type getUserIdentityFunc func() (*IdentityCache, error) // Actions expose a number of action functions on Entities, to give upper layers (cache) a way to normalize interactions. // Note: ideally this wouldn't exist, the cache layer would assume that everything is an entity/dag, and directly use the // functions from this package, but right now identities are not using that framework. type Actions[EntityT entity.Interface] struct { ReadWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error) ReadAllWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan entity.StreamedEntity[EntityT] Remove func(repo repository.ClockedRepo, id entity.Id) error RemoveAll func(repo repository.ClockedRepo) error MergeAll func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) <-chan entity.MergeResult } var _ cacheMgmt = &SubCache[entity.Interface, Excerpt, CacheEntity]{} type SubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity] struct { repo repository.ClockedRepo resolvers func() entity.Resolvers getUserIdentity getUserIdentityFunc makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT makeExcerpt func(CacheT) ExcerptT makeIndexData func(CacheT) []string actions Actions[EntityT] typename string namespace string version uint maxLoaded int mu sync.RWMutex excerpts map[entity.Id]ExcerptT cached map[entity.Id]CacheT lru lruIdCache } func NewSubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity]( repo repository.ClockedRepo, resolvers func() entity.Resolvers, getUserIdentity getUserIdentityFunc, makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT, makeExcerpt func(CacheT) ExcerptT, makeIndexData func(CacheT) []string, actions Actions[EntityT], typename, namespace string, version uint, maxLoaded int) *SubCache[EntityT, ExcerptT, CacheT] { return &SubCache[EntityT, ExcerptT, CacheT]{ repo: repo, resolvers: resolvers, getUserIdentity: getUserIdentity, makeCached: makeCached, makeExcerpt: makeExcerpt, makeIndexData: makeIndexData, actions: actions, typename: typename, namespace: namespace, version: version, maxLoaded: maxLoaded, excerpts: make(map[entity.Id]ExcerptT), cached: make(map[entity.Id]CacheT), lru: newLRUIdCache(), } } func (sc *SubCache[EntityT, ExcerptT, CacheT]) Typename() string { return sc.typename } // Load will try to read from the disk the entity cache file func (sc *SubCache[EntityT, ExcerptT, CacheT]) Load() error { sc.mu.Lock() defer sc.mu.Unlock() f, err := sc.repo.LocalStorage().Open(filepath.Join("cache", sc.namespace)) if err != nil { return err } aux := struct { Version uint Excerpts map[entity.Id]ExcerptT }{} decoder := gob.NewDecoder(f) err = decoder.Decode(&aux) if err != nil { _ = f.Close() return err } err = f.Close() if err != nil { return err } if aux.Version != sc.version { return fmt.Errorf("unknown %s cache format version %v", sc.namespace, aux.Version) } // the id is not serialized in the excerpt itself (non-exported field in go, long story ...), // so we fix it here, which doubles as enforcing coherency. for id, excerpt := range aux.Excerpts { excerpt.setId(id) } sc.excerpts = aux.Excerpts index, err := sc.repo.GetIndex(sc.namespace) if err != nil { return err } // simple heuristic to detect a mismatch between the index and the entities count, err := index.DocCount() if err != nil { return err } if count != uint64(len(sc.excerpts)) { return fmt.Errorf("count mismatch between bleve and %s excerpts", sc.namespace) } // TODO: find a way to check lamport clocks return nil } // Write will serialize on disk the entity cache file func (sc *SubCache[EntityT, ExcerptT, CacheT]) write() error { sc.mu.RLock() defer sc.mu.RUnlock() var data bytes.Buffer aux := struct { Version uint Excerpts map[entity.Id]ExcerptT }{ Version: sc.version, Excerpts: sc.excerpts, } encoder := gob.NewEncoder(&data) err := encoder.Encode(aux) if err != nil { return err } f, err := sc.repo.LocalStorage().Create(filepath.Join("cache", sc.namespace)) if err != nil { return err } _, err = f.Write(data.Bytes()) if err != nil { _ = f.Close() return err } return f.Close() } func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() <-chan BuildEvent { // value chosen experimentally as giving the fasted indexing, while // not driving the cache size on disk too high. // // | batchCount | bugIndex (MB) | idIndex (kB) | time (s) | // |:----------:|:-------------:|:------------:|:--------:| // | 10 | 24 | 84 | 1,59 | // | 30 | 26 | 84 | 1,388 | // | 50 | 26 | 84 | 1,44 | // | 60 | 26 | 80 | 1,377 | // | 68 | 27 | 80 | 1,385 | // | 75 | 26 | 84 | 1,32 | // | 80 | 26 | 80 | 1,37 | // | 85 | 27 | 80 | 1,317 | // | 100 | 26 | 80 | 1,455 | // | 150 | 26 | 80 | 2,066 | // | 200 | 28 | 80 | 2,885 | // | 250 | 30 | 72 | 3,555 | // | 300 | 31 | 72 | 4,787 | // | 500 | 23 | 72 | 5,4 | const maxBatchCount = 75 out := make(chan BuildEvent) go func() { defer close(out) out <- BuildEvent{ Typename: sc.typename, Event: BuildEventStarted, } sc.excerpts = make(map[entity.Id]ExcerptT) allEntities := sc.actions.ReadAllWithResolver(sc.repo, sc.resolvers()) index, err := sc.repo.GetIndex(sc.namespace) if err != nil { out <- BuildEvent{ Typename: sc.typename, Err: err, } return } // wipe the index just to be sure err = index.Clear() if err != nil { out <- BuildEvent{ Typename: sc.typename, Err: err, } return } indexer, indexEnd := index.IndexBatch() var batchCount int for e := range allEntities { if e.Err != nil { out <- BuildEvent{ Typename: sc.typename, Err: e.Err, } return } cached := sc.makeCached(e.Entity, sc.entityUpdated) sc.excerpts[e.Entity.Id()] = sc.makeExcerpt(cached) // might as well keep them in memory sc.cached[e.Entity.Id()] = cached indexData := sc.makeIndexData(cached) if err := indexer(e.Entity.Id().String(), indexData); err != nil { out <- BuildEvent{ Typename: sc.typename, Err: err, } return } batchCount++ if batchCount >= maxBatchCount { err = indexEnd() if err != nil { out <- BuildEvent{ Typename: sc.typename, Err: err, } return } indexer, indexEnd = index.IndexBatch() batchCount = 0 } out <- BuildEvent{ Typename: sc.typename, Event: BuildEventProgress, Progress: e.CurrentEntity, Total: e.TotalEntities, } } if batchCount > 0 { err = indexEnd() if err != nil { out <- BuildEvent{ Typename: sc.typename, Err: err, } return } } err = sc.write() if err != nil { out <- BuildEvent{ Typename: sc.typename, Err: err, } return } out <- BuildEvent{ Typename: sc.typename, Event: BuildEventFinished, } }() return out } func (sc *SubCache[EntityT, ExcerptT, CacheT]) SetCacheSize(size int) { sc.maxLoaded = size sc.evictIfNeeded() } func (sc *SubCache[EntityT, ExcerptT, CacheT]) Close() error { sc.mu.Lock() defer sc.mu.Unlock() sc.excerpts = nil sc.cached = make(map[entity.Id]CacheT) return nil } // AllIds return all known bug ids func (sc *SubCache[EntityT, ExcerptT, CacheT]) AllIds() []entity.Id { sc.mu.RLock() defer sc.mu.RUnlock() result := make([]entity.Id, len(sc.excerpts)) i := 0 for _, excerpt := range sc.excerpts { result[i] = excerpt.Id() i++ } return result } // Resolve retrieve an entity matching the exact given id func (sc *SubCache[EntityT, ExcerptT, CacheT]) Resolve(id entity.Id) (CacheT, error) { sc.mu.RLock() cached, ok := sc.cached[id] if ok { sc.lru.Get(id) sc.mu.RUnlock() return cached, nil } sc.mu.RUnlock() e, err := sc.actions.ReadWithResolver(sc.repo, sc.resolvers(), id) if err != nil { return *new(CacheT), err } cached = sc.makeCached(e, sc.entityUpdated) sc.mu.Lock() sc.cached[id] = cached sc.lru.Add(id) sc.mu.Unlock() sc.evictIfNeeded() return cached, nil } // ResolvePrefix retrieve an entity matching an id prefix. It fails if multiple // entities match. func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolvePrefix(prefix string) (CacheT, error) { return sc.ResolveMatcher(func(excerpt ExcerptT) bool { return excerpt.Id().HasPrefix(prefix) }) } func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveMatcher(f func(ExcerptT) bool) (CacheT, error) { id, err := sc.resolveMatcher(f) if err != nil { return *new(CacheT), err } return sc.Resolve(id) } // ResolveExcerpt retrieve an Excerpt matching the exact given id func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerpt(id entity.Id) (ExcerptT, error) { sc.mu.RLock() defer sc.mu.RUnlock() excerpt, ok := sc.excerpts[id] if !ok { return *new(ExcerptT), entity.NewErrNotFound(sc.typename) } return excerpt, nil } // ResolveExcerptPrefix retrieve an Excerpt matching an id prefix. It fails if multiple // entities match. func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptPrefix(prefix string) (ExcerptT, error) { return sc.ResolveExcerptMatcher(func(excerpt ExcerptT) bool { return excerpt.Id().HasPrefix(prefix) }) } func (sc *SubCache[EntityT, ExcerptT, CacheT]) ResolveExcerptMatcher(f func(ExcerptT) bool) (ExcerptT, error) { id, err := sc.resolveMatcher(f) if err != nil { return *new(ExcerptT), err } return sc.ResolveExcerpt(id) } func (sc *SubCache[EntityT, ExcerptT, CacheT]) resolveMatcher(f func(ExcerptT) bool) (entity.Id, error) { sc.mu.RLock() defer sc.mu.RUnlock() // preallocate but empty matching := make([]entity.Id, 0, 5) for _, excerpt := range sc.excerpts { if f(excerpt) { matching = append(matching, excerpt.Id()) } } if len(matching) > 1 { return entity.UnsetId, entity.NewErrMultipleMatch(sc.typename, matching) } if len(matching) == 0 { return entity.UnsetId, entity.NewErrNotFound(sc.typename) } return matching[0], nil } func (sc *SubCache[EntityT, ExcerptT, CacheT]) add(e EntityT) (CacheT, error) { sc.mu.Lock() if _, has := sc.cached[e.Id()]; has { sc.mu.Unlock() return *new(CacheT), fmt.Errorf("entity %s already exist in the cache", e.Id()) } cached := sc.makeCached(e, sc.entityUpdated) sc.cached[e.Id()] = cached sc.lru.Add(e.Id()) sc.mu.Unlock() sc.evictIfNeeded() // force the write of the excerpt err := sc.entityUpdated(e.Id()) if err != nil { return *new(CacheT), err } return cached, nil } func (sc *SubCache[EntityT, ExcerptT, CacheT]) Remove(prefix string) error { e, err := sc.ResolvePrefix(prefix) if err != nil { return err } sc.mu.Lock() err = sc.actions.Remove(sc.repo, e.Id()) if err != nil { sc.mu.Unlock() return err } delete(sc.cached, e.Id()) delete(sc.excerpts, e.Id()) sc.lru.Remove(e.Id()) index, err := sc.repo.GetIndex(sc.namespace) if err != nil { sc.mu.Unlock() return err } err = index.Remove(e.Id().String()) sc.mu.Unlock() if err != nil { return err } return sc.write() } func (sc *SubCache[EntityT, ExcerptT, CacheT]) RemoveAll() error { sc.mu.Lock() err := sc.actions.RemoveAll(sc.repo) if err != nil { sc.mu.Unlock() return err } for id, _ := range sc.cached { delete(sc.cached, id) sc.lru.Remove(id) } for id, _ := range sc.excerpts { delete(sc.excerpts, id) } index, err := sc.repo.GetIndex(sc.namespace) if err != nil { sc.mu.Unlock() return err } err = index.Clear() sc.mu.Unlock() if err != nil { return err } return sc.write() } func (sc *SubCache[EntityT, ExcerptT, CacheT]) MergeAll(remote string) <-chan entity.MergeResult { out := make(chan entity.MergeResult) // Intercept merge results to update the cache properly go func() { defer close(out) author, err := sc.getUserIdentity() if err != nil { out <- entity.NewMergeError(err, "") return } results := sc.actions.MergeAll(sc.repo, sc.resolvers(), remote, author) for result := range results { out <- result if result.Err != nil { continue } switch result.Status { case entity.MergeStatusNew, entity.MergeStatusUpdated: e := result.Entity.(EntityT) cached := sc.makeCached(e, sc.entityUpdated) sc.mu.Lock() sc.excerpts[result.Id] = sc.makeExcerpt(cached) // might as well keep them in memory sc.cached[result.Id] = cached sc.mu.Unlock() } } err = sc.write() if err != nil { out <- entity.NewMergeError(err, "") return } }() return out } func (sc *SubCache[EntityT, ExcerptT, CacheT]) GetNamespace() string { return sc.namespace } // entityUpdated is a callback to trigger when the excerpt of an entity changed func (sc *SubCache[EntityT, ExcerptT, CacheT]) entityUpdated(id entity.Id) error { sc.mu.Lock() e, ok := sc.cached[id] if !ok { sc.mu.Unlock() // if the bug is not loaded at this point, it means it was loaded before // but got evicted. Which means we potentially have multiple copies in // memory and thus concurrent write. // Failing immediately here is the simple and safe solution to avoid // complicated data loss. return errors.New("entity missing from cache") } sc.lru.Get(id) // sc.excerpts[id] = bug2.NewBugExcerpt(b.bug, b.Snapshot()) sc.excerpts[id] = sc.makeExcerpt(e) sc.mu.Unlock() index, err := sc.repo.GetIndex(sc.namespace) if err != nil { return err } err = index.IndexOne(e.Id().String(), sc.makeIndexData(e)) if err != nil { return err } return sc.write() } // evictIfNeeded will evict an entity from the cache if needed func (sc *SubCache[EntityT, ExcerptT, CacheT]) evictIfNeeded() { sc.mu.Lock() defer sc.mu.Unlock() if sc.lru.Len() <= sc.maxLoaded { return } for _, id := range sc.lru.GetOldestToNewest() { b := sc.cached[id] if b.NeedCommit() { continue } // as a form of assurance that evicted entities don't get manipulated, we lock them here. // if something tries to do it anyway, it will lock the program and make it obvious. b.Lock() sc.lru.Remove(id) delete(sc.cached, id) if sc.lru.Len() <= sc.maxLoaded { return } } }