aboutsummaryrefslogtreecommitdiffstats
path: root/cache/subcache.go
diff options
context:
space:
mode:
Diffstat (limited to 'cache/subcache.go')
-rw-r--r--cache/subcache.go160
1 files changed, 122 insertions, 38 deletions
diff --git a/cache/subcache.go b/cache/subcache.go
index 1737da43..0678988a 100644
--- a/cache/subcache.go
+++ b/cache/subcache.go
@@ -4,18 +4,18 @@ import (
"bytes"
"encoding/gob"
"fmt"
- "os"
"sync"
"github.com/pkg/errors"
- "github.com/MichaelMure/git-bug/entities/bug"
+ "github.com/MichaelMure/git-bug/entities/identity"
"github.com/MichaelMure/git-bug/entity"
"github.com/MichaelMure/git-bug/repository"
)
type Excerpt interface {
Id() entity.Id
+ setId(id entity.Id)
}
type CacheEntity interface {
@@ -25,15 +25,27 @@ type CacheEntity interface {
type getUserIdentityFunc func() (*IdentityCache, error)
+// Actions expose a number of action functions on Entities, to give upper layers (cache) a way to normalize interactions.
+// Note: ideally this wouldn't exist, the cache layer would assume that everything is an entity/dag, and directly use the
+// functions from this package, but right now identities are not using that framework.
+type Actions[EntityT entity.Interface] struct {
+ ReadWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error)
+ ReadAllWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan entity.StreamedEntity[EntityT]
+ Remove func(repo repository.ClockedRepo, id entity.Id) error
+ MergeAll func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) <-chan entity.MergeResult
+}
+
+var _ cacheMgmt = &SubCache[entity.Interface, Excerpt, CacheEntity]{}
+
type SubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity] struct {
repo repository.ClockedRepo
resolvers func() entity.Resolvers
- getUserIdentity getUserIdentityFunc
- readWithResolver func(repository.ClockedRepo, entity.Resolvers, entity.Id) (EntityT, error)
- makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT
- makeExcerpt func(EntityT) ExcerptT
- makeIndex func(CacheT) []string
+ getUserIdentity getUserIdentityFunc
+ makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT
+ makeExcerpt func(CacheT) ExcerptT
+ makeIndexData func(CacheT) []string
+ actions Actions[EntityT]
typename string
namespace string
@@ -50,14 +62,19 @@ func NewSubCache[EntityT entity.Interface, ExcerptT Excerpt, CacheT CacheEntity]
repo repository.ClockedRepo,
resolvers func() entity.Resolvers, getUserIdentity getUserIdentityFunc,
makeCached func(entity EntityT, entityUpdated func(id entity.Id) error) CacheT,
- makeExcerpt func(EntityT) ExcerptT,
- makeIndex func(CacheT) []string,
+ makeExcerpt func(CacheT) ExcerptT,
+ makeIndexData func(CacheT) []string,
+ actions Actions[EntityT],
typename, namespace string,
version uint, maxLoaded int) *SubCache[EntityT, ExcerptT, CacheT] {
return &SubCache[EntityT, ExcerptT, CacheT]{
repo: repo,
resolvers: resolvers,
getUserIdentity: getUserIdentity,
+ makeCached: makeCached,
+ makeExcerpt: makeExcerpt,
+ makeIndexData: makeIndexData,
+ actions: actions,
typename: typename,
namespace: namespace,
version: version,
@@ -98,6 +115,12 @@ func (sc *SubCache[EntityT, ExcerptT, CacheT]) Load() error {
return fmt.Errorf("unknown %s cache format version %v", sc.namespace, aux.Version)
}
+ // the id is not serialized in the excerpt itself (non-exported field in go, long story ...),
+ // so we fix it here, which doubles as enforcing coherency.
+ for id, excerpt := range aux.Excerpts {
+ excerpt.setId(id)
+ }
+
sc.excerpts = aux.Excerpts
index, err := sc.repo.GetIndex(sc.typename)
@@ -118,7 +141,7 @@ func (sc *SubCache[EntityT, ExcerptT, CacheT]) Load() error {
}
// Write will serialize on disk the entity cache file
-func (sc *SubCache[EntityT, ExcerptT, CacheT]) Write() error {
+func (sc *SubCache[EntityT, ExcerptT, CacheT]) write() error {
sc.mu.RLock()
defer sc.mu.RUnlock()
@@ -155,9 +178,7 @@ func (sc *SubCache[EntityT, ExcerptT, CacheT]) Write() error {
func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() error {
sc.excerpts = make(map[entity.Id]ExcerptT)
- sc.readWithResolver
-
- allBugs := bug.ReadAllWithResolver(c.repo, c.resolvers)
+ allEntities := sc.actions.ReadAllWithResolver(sc.repo, sc.resolvers())
index, err := sc.repo.GetIndex(sc.typename)
if err != nil {
@@ -172,15 +193,17 @@ func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() error {
indexer, indexEnd := index.IndexBatch()
- for b := range allBugs {
- if b.Err != nil {
- return b.Err
+ for e := range allEntities {
+ if e.Err != nil {
+ return e.Err
}
- snap := b.Bug.Compile()
- c.bugExcerpts[b.Bug.Id()] = NewBugExcerpt(b.Bug, snap)
+ // TODO: doesn't actually record in cache, should we?
+ cached := sc.makeCached(e.Entity, sc.entityUpdated)
+ sc.excerpts[e.Entity.Id()] = sc.makeExcerpt(cached)
- if err := indexer(snap); err != nil {
+ indexData := sc.makeIndexData(cached)
+ if err := indexer(e.Entity.Id().String(), indexData); err != nil {
return err
}
}
@@ -190,10 +213,19 @@ func (sc *SubCache[EntityT, ExcerptT, CacheT]) Build() error {
return err
}
- _, _ = fmt.Fprintln(os.Stderr, "Done.")
+ err = sc.write()
+ if err != nil {
+ return err
+ }
+
return nil
}
+func (sc *SubCache[EntityT, ExcerptT, CacheT]) SetCacheSize(size int) {
+ sc.maxLoaded = size
+ sc.evictIfNeeded()
+}
+
func (sc *SubCache[EntityT, ExcerptT, CacheT]) Close() error {
sc.mu.Lock()
defer sc.mu.Unlock()
@@ -229,7 +261,7 @@ func (sc *SubCache[EntityT, ExcerptT, CacheT]) Resolve(id entity.Id) (CacheT, er
}
sc.mu.RUnlock()
- e, err := sc.readWithResolver(sc.repo, sc.resolvers(), id)
+ e, err := sc.actions.ReadWithResolver(sc.repo, sc.resolvers(), id)
if err != nil {
return *new(CacheT), err
}
@@ -315,8 +347,6 @@ func (sc *SubCache[EntityT, ExcerptT, CacheT]) resolveMatcher(f func(ExcerptT) b
return matching[0], nil
}
-var errNotInCache = errors.New("entity missing from cache")
-
func (sc *SubCache[EntityT, ExcerptT, CacheT]) add(e EntityT) (CacheT, error) {
sc.mu.Lock()
if _, has := sc.cached[e.Id()]; has {
@@ -348,26 +378,74 @@ func (sc *SubCache[EntityT, ExcerptT, CacheT]) Remove(prefix string) error {
sc.mu.Lock()
- err = bug.Remove(c.repo, b.Id())
+ err = sc.actions.Remove(sc.repo, e.Id())
if err != nil {
- c.muBug.Unlock()
-
+ sc.mu.Unlock()
return err
}
- delete(c.bugs, b.Id())
- delete(c.bugExcerpts, b.Id())
- c.loadedBugs.Remove(b.Id())
+ delete(sc.cached, e.Id())
+ delete(sc.excerpts, e.Id())
+ sc.lru.Remove(e.Id())
+
+ sc.mu.Unlock()
+
+ return sc.write()
+}
+
+func (sc *SubCache[EntityT, ExcerptT, CacheT]) MergeAll(remote string) <-chan entity.MergeResult {
+ out := make(chan entity.MergeResult)
- c.muBug.Unlock()
+ // Intercept merge results to update the cache properly
+ go func() {
+ defer close(out)
+
+ author, err := sc.getUserIdentity()
+ if err != nil {
+ out <- entity.NewMergeError(err, "")
+ return
+ }
+
+ results := sc.actions.MergeAll(sc.repo, sc.resolvers(), remote, author)
+ for result := range results {
+ out <- result
+
+ if result.Err != nil {
+ continue
+ }
+
+ switch result.Status {
+ case entity.MergeStatusNew, entity.MergeStatusUpdated:
+ e := result.Entity.(EntityT)
+
+ // TODO: doesn't actually record in cache, should we?
+ cached := sc.makeCached(e, sc.entityUpdated)
+
+ sc.mu.Lock()
+ sc.excerpts[result.Id] = sc.makeExcerpt(cached)
+ sc.mu.Unlock()
+ }
+ }
+
+ err = sc.write()
+ if err != nil {
+ out <- entity.NewMergeError(err, "")
+ return
+ }
+ }()
+
+ return out
- return c.writeBugCache()
+}
+
+func (sc *SubCache[EntityT, ExcerptT, CacheT]) GetNamespace() string {
+ return sc.namespace
}
// entityUpdated is a callback to trigger when the excerpt of an entity changed
func (sc *SubCache[EntityT, ExcerptT, CacheT]) entityUpdated(id entity.Id) error {
sc.mu.Lock()
- b, ok := sc.cached[id]
+ e, ok := sc.cached[id]
if !ok {
sc.mu.Unlock()
@@ -376,19 +454,24 @@ func (sc *SubCache[EntityT, ExcerptT, CacheT]) entityUpdated(id entity.Id) error
// memory and thus concurrent write.
// Failing immediately here is the simple and safe solution to avoid
// complicated data loss.
- return errNotInCache
+ return errors.New("entity missing from cache")
}
sc.lru.Get(id)
// sc.excerpts[id] = bug2.NewBugExcerpt(b.bug, b.Snapshot())
- sc.excerpts[id] = bug2.NewBugExcerpt(b.bug, b.Snapshot())
+ sc.excerpts[id] = sc.makeExcerpt(e)
sc.mu.Unlock()
- if err := sc.addBugToSearchIndex(b.Snapshot()); err != nil {
+ index, err := sc.repo.GetIndex(sc.typename)
+ if err != nil {
+ return err
+ }
+
+ err = index.IndexOne(e.Id().String(), sc.makeIndexData(e))
+ if err != nil {
return err
}
- // we only need to write the bug cache
- return sc.Write()
+ return sc.write()
}
// evictIfNeeded will evict an entity from the cache if needed
@@ -405,7 +488,8 @@ func (sc *SubCache[EntityT, ExcerptT, CacheT]) evictIfNeeded() {
continue
}
- b.Lock()
+ // TODO
+ // b.Lock()
sc.lru.Remove(id)
delete(sc.cached, id)