diff options
author | Michael Muré <batolettre@gmail.com> | 2022-12-08 12:30:08 +0100 |
---|---|---|
committer | Michael Muré <batolettre@gmail.com> | 2022-12-15 13:17:03 +0100 |
commit | 3b62fe0a4c7b3f1ae4f9a163b6c483444b5a8d20 (patch) | |
tree | 7cb299f4b2298e7147182b137a2988d2e20cdfff | |
parent | 4a341b5e1714a6a36ec7f5839a6a1b73571d4851 (diff) | |
download | git-bug-3b62fe0a4c7b3f1ae4f9a163b6c483444b5a8d20.tar.gz |
WIP
-rw-r--r-- | cache/bug_subcache.go | 2 | ||||
-rw-r--r-- | cache/repo_cache.go | 89 | ||||
-rw-r--r-- | cache/subcache.go | 50 | ||||
-rw-r--r-- | util/multierr/errwaitgroup.go | 115 | ||||
-rw-r--r-- | util/multierr/join.go | 51 |
5 files changed, 277 insertions, 30 deletions
diff --git a/cache/bug_subcache.go b/cache/bug_subcache.go index e61bbf2b..c8901754 100644 --- a/cache/bug_subcache.go +++ b/cache/bug_subcache.go @@ -18,7 +18,7 @@ import ( ) type RepoCacheBug struct { - SubCache[*BugExcerpt, *BugCache, bug.Interface] + *SubCache[*BugExcerpt, *BugCache, bug.Interface] } // ResolveBugCreateMetadata retrieve a bug that has the exact given metadata on diff --git a/cache/repo_cache.go b/cache/repo_cache.go index 9250bb40..982804b5 100644 --- a/cache/repo_cache.go +++ b/cache/repo_cache.go @@ -6,13 +6,13 @@ import ( "io/ioutil" "os" "strconv" - - "golang.org/x/sync/errgroup" + "sync" "github.com/MichaelMure/git-bug/entities/bug" "github.com/MichaelMure/git-bug/entities/identity" "github.com/MichaelMure/git-bug/entity" "github.com/MichaelMure/git-bug/repository" + "github.com/MichaelMure/git-bug/util/multierr" "github.com/MichaelMure/git-bug/util/process" ) @@ -29,6 +29,14 @@ var _ repository.RepoCommon = &RepoCache{} var _ repository.RepoConfig = &RepoCache{} var _ repository.RepoKeyring = &RepoCache{} +type cacheMgmt interface { + Typename() string + Load() error + Write() error + Build() error + Close() error +} + // RepoCache is a cache for a Repository. This cache has multiple functions: // // 1. After being loaded, a Bug is kept in memory in the cache, allowing for fast @@ -62,11 +70,11 @@ type RepoCache struct { userIdentityId entity.Id } -func NewRepoCache(r repository.ClockedRepo) (*RepoCache, error) { +func NewRepoCache(r repository.ClockedRepo) (*RepoCache, chan BuildEvent, error) { return NewNamedRepoCache(r, "") } -func NewNamedRepoCache(r repository.ClockedRepo, name string) (*RepoCache, error) { +func NewNamedRepoCache(r repository.ClockedRepo, name string) (*RepoCache, chan BuildEvent, error) { c := &RepoCache{ repo: r, name: name, @@ -87,12 +95,12 @@ func NewNamedRepoCache(r repository.ClockedRepo, name string) (*RepoCache, error err := c.lock() if err != nil { - return &RepoCache{}, err + return &RepoCache{}, nil, err } err = c.load() if err == nil { - return c, nil + return c, nil, nil } // Cache is either missing, broken or outdated. Rebuilding. @@ -126,20 +134,20 @@ func (c *RepoCache) setCacheSize(size int) { // load will try to read from the disk all the cache files func (c *RepoCache) load() error { - var errG errgroup.Group + var errWait multierr.ErrWaitGroup for _, mgmt := range c.subcaches { - errG.Go(mgmt.Load) + errWait.Go(mgmt.Load) } - return errG.Wait() + return errWait.Wait() } // write will serialize on disk all the cache files func (c *RepoCache) write() error { - var errG errgroup.Group + var errWait multierr.ErrWaitGroup for _, mgmt := range c.subcaches { - errG.Go(mgmt.Write) + errWait.Go(mgmt.Write) } - return errG.Wait() + return errWait.Wait() } func (c *RepoCache) lock() error { @@ -163,11 +171,11 @@ func (c *RepoCache) lock() error { } func (c *RepoCache) Close() error { - var errG errgroup.Group + var errWait multierr.ErrWaitGroup for _, mgmt := range c.subcaches { - errG.Go(mgmt.Close) + errWait.Go(mgmt.Close) } - err := errG.Wait() + err := errWait.Wait() if err != nil { return err } @@ -180,7 +188,56 @@ func (c *RepoCache) Close() error { return c.repo.LocalStorage().Remove(lockfile) } -func (c *RepoCache) buildCache() error { +type BuildEventType int + +const ( + _ BuildEventType = iota + BuildEventStarted + BuildEventFinished +) + +type BuildEvent struct { + Typename string + Event BuildEventType + Err error +} + +func (c *RepoCache) buildCache() chan BuildEvent { + out := make(chan BuildEvent) + + go func() { + defer close(out) + + var wg sync.WaitGroup + for _, subcache := range c.subcaches { + wg.Add(1) + go func(subcache cacheMgmt) { + defer wg.Done() + out <- BuildEvent{ + Typename: subcache.Typename(), + Event: BuildEventStarted, + } + + err := subcache.Build() + if err != nil { + out <- BuildEvent{ + Typename: subcache.Typename(), + Err: err, + } + return + } + + out <- BuildEvent{ + Typename: subcache.Typename(), + Event: BuildEventFinished, + } + }(subcache) + } + wg.Wait() + }() + + return out + _, _ = fmt.Fprintf(os.Stderr, "Building identity cache... ") c.identitiesExcerpts = make(map[entity.Id]*IdentityExcerpt) diff --git a/cache/subcache.go b/cache/subcache.go index 66f72767..285d4fe7 100644 --- a/cache/subcache.go +++ b/cache/subcache.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/gob" "fmt" + "os" "sync" "github.com/pkg/errors" @@ -21,13 +22,6 @@ type CacheEntity interface { NeedCommit() bool } -type cacheMgmt interface { - Load() error - Write() error - Build() error - Close() error -} - type getUserIdentityFunc func() (*IdentityCache, error) type SubCache[ExcerptT Excerpt, CacheT CacheEntity, EntityT entity.Interface] struct { @@ -38,6 +32,7 @@ type SubCache[ExcerptT Excerpt, CacheT CacheEntity, EntityT entity.Interface] st readWithResolver func(repository.ClockedRepo, entity.Resolvers, entity.Id) (EntityT, error) makeCached func(*SubCache[ExcerptT, CacheT, EntityT], getUserIdentityFunc, EntityT) CacheT makeExcerpt func() Excerpt + indexingCallback func(CacheT) error typename string namespace string @@ -70,6 +65,10 @@ func NewSubCache[ExcerptT Excerpt, CacheT CacheEntity, EntityT entity.Interface] } } +func (sc *SubCache[ExcerptT, CacheT, EntityT]) Typename() string { + return sc.typename +} + // Load will try to read from the disk the entity cache file func (sc *SubCache[ExcerptT, CacheT, EntityT]) Load() error { sc.mu.Lock() @@ -151,7 +150,32 @@ func (sc *SubCache[ExcerptT, CacheT, EntityT]) Write() error { } func (sc *SubCache[ExcerptT, CacheT, EntityT]) Build() error { + sc.excerpts = make(map[entity.Id]ExcerptT) + + sc.readWithResolver + + allBugs := bug.ReadAllWithResolver(c.repo, c.resolvers) + + // wipe the index just to be sure + err := c.repo.ClearBleveIndex("bug") + if err != nil { + return err + } + + for b := range allBugs { + if b.Err != nil { + return b.Err + } + + snap := b.Bug.Compile() + c.bugExcerpts[b.Bug.Id()] = NewBugExcerpt(b.Bug, snap) + + if err := c.addBugToSearchIndex(snap); err != nil { + return err + } + } + _, _ = fmt.Fprintln(os.Stderr, "Done.") } func (sc *SubCache[ExcerptT, CacheT, EntityT]) Close() error { @@ -191,7 +215,7 @@ func (sc *SubCache[ExcerptT, CacheT, EntityT]) Resolve(id entity.Id) (CacheT, er b, err := sc.readWithResolver(sc.repo, sc.resolvers(), id) if err != nil { - return nil, err + return *new(CacheT), err } cached = sc.makeCached(sc, sc.getUserIdentity, b) @@ -217,7 +241,7 @@ func (sc *SubCache[ExcerptT, CacheT, EntityT]) ResolvePrefix(prefix string) (Cac func (sc *SubCache[ExcerptT, CacheT, EntityT]) ResolveMatcher(f func(ExcerptT) bool) (CacheT, error) { id, err := sc.resolveMatcher(f) if err != nil { - return nil, err + return *new(CacheT), err } return sc.Resolve(id) } @@ -229,7 +253,7 @@ func (sc *SubCache[ExcerptT, CacheT, EntityT]) ResolveExcerpt(id entity.Id) (Exc excerpt, ok := sc.excerpts[id] if !ok { - return nil, entity.NewErrNotFound(sc.typename) + return *new(ExcerptT), entity.NewErrNotFound(sc.typename) } return excerpt, nil @@ -246,7 +270,7 @@ func (sc *SubCache[ExcerptT, CacheT, EntityT]) ResolveExcerptPrefix(prefix strin func (sc *SubCache[ExcerptT, CacheT, EntityT]) ResolveExcerptMatcher(f func(ExcerptT) bool) (ExcerptT, error) { id, err := sc.resolveMatcher(f) if err != nil { - return nil, err + return *new(ExcerptT), err } return sc.ResolveExcerpt(id) } @@ -281,7 +305,7 @@ func (sc *SubCache[ExcerptT, CacheT, EntityT]) add(e EntityT) (CacheT, error) { sc.mu.Lock() if _, has := sc.cached[e.Id()]; has { sc.mu.Unlock() - return nil, fmt.Errorf("entity %s already exist in the cache", e.Id()) + return *new(CacheT), fmt.Errorf("entity %s already exist in the cache", e.Id()) } cached := sc.makeCached(sc, sc.getUserIdentity, e) @@ -294,7 +318,7 @@ func (sc *SubCache[ExcerptT, CacheT, EntityT]) add(e EntityT) (CacheT, error) { // force the write of the excerpt err := sc.entityUpdated(e.Id()) if err != nil { - return nil, err + return *new(CacheT), err } return cached, nil diff --git a/util/multierr/errwaitgroup.go b/util/multierr/errwaitgroup.go new file mode 100644 index 00000000..1c785b30 --- /dev/null +++ b/util/multierr/errwaitgroup.go @@ -0,0 +1,115 @@ +package multierr + +import ( + "context" + "fmt" + "sync" +) + +type token struct{} + +// A ErrWaitGroup is a collection of goroutines working on subtasks that are part of +// the same overall task. +// +// A zero ErrWaitGroup is valid, has no limit on the number of active goroutines, +// and does not cancel on error. +type ErrWaitGroup struct { + cancel func() + + wg sync.WaitGroup + + sem chan token + + mu sync.Mutex + err error +} + +func (g *ErrWaitGroup) done() { + if g.sem != nil { + <-g.sem + } + g.wg.Done() +} + +// WithContext returns a new ErrWaitGroup and an associated Context derived from ctx. +// +// The derived Context is canceled the first time Wait returns. +func WithContext(ctx context.Context) (*ErrWaitGroup, context.Context) { + ctx, cancel := context.WithCancel(ctx) + return &ErrWaitGroup{cancel: cancel}, ctx +} + +// Wait blocks until all function calls from the Go method have returned, then +// returns the combined non-nil errors (if any) from them. +func (g *ErrWaitGroup) Wait() error { + g.wg.Wait() + if g.cancel != nil { + g.cancel() + } + return g.err +} + +// Go calls the given function in a new goroutine. +// It blocks until the new goroutine can be added without the number of +// active goroutines in the group exceeding the configured limit. +func (g *ErrWaitGroup) Go(f func() error) { + if g.sem != nil { + g.sem <- token{} + } + + g.wg.Add(1) + go func() { + defer g.done() + + if err := f(); err != nil { + g.mu.Lock() + err = Join(g.err, err) + g.mu.Unlock() + } + }() +} + +// TryGo calls the given function in a new goroutine only if the number of +// active goroutines in the group is currently below the configured limit. +// +// The return value reports whether the goroutine was started. +func (g *ErrWaitGroup) TryGo(f func() error) bool { + if g.sem != nil { + select { + case g.sem <- token{}: + // Note: this allows barging iff channels in general allow barging. + default: + return false + } + } + + g.wg.Add(1) + go func() { + defer g.done() + + if err := f(); err != nil { + g.mu.Lock() + err = Join(g.err, err) + g.mu.Unlock() + } + }() + return true +} + +// SetLimit limits the number of active goroutines in this group to at most n. +// A negative value indicates no limit. +// +// Any subsequent call to the Go method will block until it can add an active +// goroutine without exceeding the configured limit. +// +// The limit must not be modified while any goroutines in the group are active. +func (g *ErrWaitGroup) SetLimit(n int) { + if n < 0 { + g.sem = nil + return + } + if len(g.sem) != 0 { + panic(fmt.Errorf("errwaitgroup: modify limit while %v goroutines in the group are still active", len(g.sem))) + } + g.sem = make(chan token, n) +} diff --git a/util/multierr/join.go b/util/multierr/join.go new file mode 100644 index 00000000..880ba095 --- /dev/null +++ b/util/multierr/join.go @@ -0,0 +1,51 @@ +package multierr + +// Copyright 2022 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Join returns an error that wraps the given errors. +// Any nil error values are discarded. +// Join returns nil if errs contains no non-nil values. +// The error formats as the concatenation of the strings obtained +// by calling the Error method of each element of errs, with a newline +// between each string. +func Join(errs ...error) error { + n := 0 + for _, err := range errs { + if err != nil { + n++ + } + } + if n == 0 { + return nil + } + e := &joinError{ + errs: make([]error, 0, n), + } + for _, err := range errs { + if err != nil { + e.errs = append(e.errs, err) + } + } + return e +} + +type joinError struct { + errs []error +} + +func (e *joinError) Error() string { + var b []byte + for i, err := range e.errs { + if i > 0 { + b = append(b, '\n') + } + b = append(b, err.Error()...) + } + return string(b) +} + +func (e *joinError) Unwrap() []error { + return e.errs +} |