aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael Muré <batolettre@gmail.com>2022-12-08 12:30:08 +0100
committerMichael Muré <batolettre@gmail.com>2022-12-15 13:17:03 +0100
commit3b62fe0a4c7b3f1ae4f9a163b6c483444b5a8d20 (patch)
tree7cb299f4b2298e7147182b137a2988d2e20cdfff
parent4a341b5e1714a6a36ec7f5839a6a1b73571d4851 (diff)
downloadgit-bug-3b62fe0a4c7b3f1ae4f9a163b6c483444b5a8d20.tar.gz
WIP
-rw-r--r--cache/bug_subcache.go2
-rw-r--r--cache/repo_cache.go89
-rw-r--r--cache/subcache.go50
-rw-r--r--util/multierr/errwaitgroup.go115
-rw-r--r--util/multierr/join.go51
5 files changed, 277 insertions, 30 deletions
diff --git a/cache/bug_subcache.go b/cache/bug_subcache.go
index e61bbf2b..c8901754 100644
--- a/cache/bug_subcache.go
+++ b/cache/bug_subcache.go
@@ -18,7 +18,7 @@ import (
)
type RepoCacheBug struct {
- SubCache[*BugExcerpt, *BugCache, bug.Interface]
+ *SubCache[*BugExcerpt, *BugCache, bug.Interface]
}
// ResolveBugCreateMetadata retrieve a bug that has the exact given metadata on
diff --git a/cache/repo_cache.go b/cache/repo_cache.go
index 9250bb40..982804b5 100644
--- a/cache/repo_cache.go
+++ b/cache/repo_cache.go
@@ -6,13 +6,13 @@ import (
"io/ioutil"
"os"
"strconv"
-
- "golang.org/x/sync/errgroup"
+ "sync"
"github.com/MichaelMure/git-bug/entities/bug"
"github.com/MichaelMure/git-bug/entities/identity"
"github.com/MichaelMure/git-bug/entity"
"github.com/MichaelMure/git-bug/repository"
+ "github.com/MichaelMure/git-bug/util/multierr"
"github.com/MichaelMure/git-bug/util/process"
)
@@ -29,6 +29,14 @@ var _ repository.RepoCommon = &RepoCache{}
var _ repository.RepoConfig = &RepoCache{}
var _ repository.RepoKeyring = &RepoCache{}
+type cacheMgmt interface {
+ Typename() string
+ Load() error
+ Write() error
+ Build() error
+ Close() error
+}
+
// RepoCache is a cache for a Repository. This cache has multiple functions:
//
// 1. After being loaded, a Bug is kept in memory in the cache, allowing for fast
@@ -62,11 +70,11 @@ type RepoCache struct {
userIdentityId entity.Id
}
-func NewRepoCache(r repository.ClockedRepo) (*RepoCache, error) {
+func NewRepoCache(r repository.ClockedRepo) (*RepoCache, chan BuildEvent, error) {
return NewNamedRepoCache(r, "")
}
-func NewNamedRepoCache(r repository.ClockedRepo, name string) (*RepoCache, error) {
+func NewNamedRepoCache(r repository.ClockedRepo, name string) (*RepoCache, chan BuildEvent, error) {
c := &RepoCache{
repo: r,
name: name,
@@ -87,12 +95,12 @@ func NewNamedRepoCache(r repository.ClockedRepo, name string) (*RepoCache, error
err := c.lock()
if err != nil {
- return &RepoCache{}, err
+ return &RepoCache{}, nil, err
}
err = c.load()
if err == nil {
- return c, nil
+ return c, nil, nil
}
// Cache is either missing, broken or outdated. Rebuilding.
@@ -126,20 +134,20 @@ func (c *RepoCache) setCacheSize(size int) {
// load will try to read from the disk all the cache files
func (c *RepoCache) load() error {
- var errG errgroup.Group
+ var errWait multierr.ErrWaitGroup
for _, mgmt := range c.subcaches {
- errG.Go(mgmt.Load)
+ errWait.Go(mgmt.Load)
}
- return errG.Wait()
+ return errWait.Wait()
}
// write will serialize on disk all the cache files
func (c *RepoCache) write() error {
- var errG errgroup.Group
+ var errWait multierr.ErrWaitGroup
for _, mgmt := range c.subcaches {
- errG.Go(mgmt.Write)
+ errWait.Go(mgmt.Write)
}
- return errG.Wait()
+ return errWait.Wait()
}
func (c *RepoCache) lock() error {
@@ -163,11 +171,11 @@ func (c *RepoCache) lock() error {
}
func (c *RepoCache) Close() error {
- var errG errgroup.Group
+ var errWait multierr.ErrWaitGroup
for _, mgmt := range c.subcaches {
- errG.Go(mgmt.Close)
+ errWait.Go(mgmt.Close)
}
- err := errG.Wait()
+ err := errWait.Wait()
if err != nil {
return err
}
@@ -180,7 +188,56 @@ func (c *RepoCache) Close() error {
return c.repo.LocalStorage().Remove(lockfile)
}
-func (c *RepoCache) buildCache() error {
+type BuildEventType int
+
+const (
+ _ BuildEventType = iota
+ BuildEventStarted
+ BuildEventFinished
+)
+
+type BuildEvent struct {
+ Typename string
+ Event BuildEventType
+ Err error
+}
+
+func (c *RepoCache) buildCache() chan BuildEvent {
+ out := make(chan BuildEvent)
+
+ go func() {
+ defer close(out)
+
+ var wg sync.WaitGroup
+ for _, subcache := range c.subcaches {
+ wg.Add(1)
+ go func(subcache cacheMgmt) {
+ defer wg.Done()
+ out <- BuildEvent{
+ Typename: subcache.Typename(),
+ Event: BuildEventStarted,
+ }
+
+ err := subcache.Build()
+ if err != nil {
+ out <- BuildEvent{
+ Typename: subcache.Typename(),
+ Err: err,
+ }
+ return
+ }
+
+ out <- BuildEvent{
+ Typename: subcache.Typename(),
+ Event: BuildEventFinished,
+ }
+ }(subcache)
+ }
+ wg.Wait()
+ }()
+
+ return out
+
_, _ = fmt.Fprintf(os.Stderr, "Building identity cache... ")
c.identitiesExcerpts = make(map[entity.Id]*IdentityExcerpt)
diff --git a/cache/subcache.go b/cache/subcache.go
index 66f72767..285d4fe7 100644
--- a/cache/subcache.go
+++ b/cache/subcache.go
@@ -4,6 +4,7 @@ import (
"bytes"
"encoding/gob"
"fmt"
+ "os"
"sync"
"github.com/pkg/errors"
@@ -21,13 +22,6 @@ type CacheEntity interface {
NeedCommit() bool
}
-type cacheMgmt interface {
- Load() error
- Write() error
- Build() error
- Close() error
-}
-
type getUserIdentityFunc func() (*IdentityCache, error)
type SubCache[ExcerptT Excerpt, CacheT CacheEntity, EntityT entity.Interface] struct {
@@ -38,6 +32,7 @@ type SubCache[ExcerptT Excerpt, CacheT CacheEntity, EntityT entity.Interface] st
readWithResolver func(repository.ClockedRepo, entity.Resolvers, entity.Id) (EntityT, error)
makeCached func(*SubCache[ExcerptT, CacheT, EntityT], getUserIdentityFunc, EntityT) CacheT
makeExcerpt func() Excerpt
+ indexingCallback func(CacheT) error
typename string
namespace string
@@ -70,6 +65,10 @@ func NewSubCache[ExcerptT Excerpt, CacheT CacheEntity, EntityT entity.Interface]
}
}
+func (sc *SubCache[ExcerptT, CacheT, EntityT]) Typename() string {
+ return sc.typename
+}
+
// Load will try to read from the disk the entity cache file
func (sc *SubCache[ExcerptT, CacheT, EntityT]) Load() error {
sc.mu.Lock()
@@ -151,7 +150,32 @@ func (sc *SubCache[ExcerptT, CacheT, EntityT]) Write() error {
}
func (sc *SubCache[ExcerptT, CacheT, EntityT]) Build() error {
+ sc.excerpts = make(map[entity.Id]ExcerptT)
+
+ sc.readWithResolver
+
+ allBugs := bug.ReadAllWithResolver(c.repo, c.resolvers)
+
+ // wipe the index just to be sure
+ err := c.repo.ClearBleveIndex("bug")
+ if err != nil {
+ return err
+ }
+
+ for b := range allBugs {
+ if b.Err != nil {
+ return b.Err
+ }
+
+ snap := b.Bug.Compile()
+ c.bugExcerpts[b.Bug.Id()] = NewBugExcerpt(b.Bug, snap)
+
+ if err := c.addBugToSearchIndex(snap); err != nil {
+ return err
+ }
+ }
+ _, _ = fmt.Fprintln(os.Stderr, "Done.")
}
func (sc *SubCache[ExcerptT, CacheT, EntityT]) Close() error {
@@ -191,7 +215,7 @@ func (sc *SubCache[ExcerptT, CacheT, EntityT]) Resolve(id entity.Id) (CacheT, er
b, err := sc.readWithResolver(sc.repo, sc.resolvers(), id)
if err != nil {
- return nil, err
+ return *new(CacheT), err
}
cached = sc.makeCached(sc, sc.getUserIdentity, b)
@@ -217,7 +241,7 @@ func (sc *SubCache[ExcerptT, CacheT, EntityT]) ResolvePrefix(prefix string) (Cac
func (sc *SubCache[ExcerptT, CacheT, EntityT]) ResolveMatcher(f func(ExcerptT) bool) (CacheT, error) {
id, err := sc.resolveMatcher(f)
if err != nil {
- return nil, err
+ return *new(CacheT), err
}
return sc.Resolve(id)
}
@@ -229,7 +253,7 @@ func (sc *SubCache[ExcerptT, CacheT, EntityT]) ResolveExcerpt(id entity.Id) (Exc
excerpt, ok := sc.excerpts[id]
if !ok {
- return nil, entity.NewErrNotFound(sc.typename)
+ return *new(ExcerptT), entity.NewErrNotFound(sc.typename)
}
return excerpt, nil
@@ -246,7 +270,7 @@ func (sc *SubCache[ExcerptT, CacheT, EntityT]) ResolveExcerptPrefix(prefix strin
func (sc *SubCache[ExcerptT, CacheT, EntityT]) ResolveExcerptMatcher(f func(ExcerptT) bool) (ExcerptT, error) {
id, err := sc.resolveMatcher(f)
if err != nil {
- return nil, err
+ return *new(ExcerptT), err
}
return sc.ResolveExcerpt(id)
}
@@ -281,7 +305,7 @@ func (sc *SubCache[ExcerptT, CacheT, EntityT]) add(e EntityT) (CacheT, error) {
sc.mu.Lock()
if _, has := sc.cached[e.Id()]; has {
sc.mu.Unlock()
- return nil, fmt.Errorf("entity %s already exist in the cache", e.Id())
+ return *new(CacheT), fmt.Errorf("entity %s already exist in the cache", e.Id())
}
cached := sc.makeCached(sc, sc.getUserIdentity, e)
@@ -294,7 +318,7 @@ func (sc *SubCache[ExcerptT, CacheT, EntityT]) add(e EntityT) (CacheT, error) {
// force the write of the excerpt
err := sc.entityUpdated(e.Id())
if err != nil {
- return nil, err
+ return *new(CacheT), err
}
return cached, nil
diff --git a/util/multierr/errwaitgroup.go b/util/multierr/errwaitgroup.go
new file mode 100644
index 00000000..1c785b30
--- /dev/null
+++ b/util/multierr/errwaitgroup.go
@@ -0,0 +1,115 @@
+package multierr
+
+import (
+ "context"
+ "fmt"
+ "sync"
+)
+
+type token struct{}
+
+// A ErrWaitGroup is a collection of goroutines working on subtasks that are part of
+// the same overall task.
+//
+// A zero ErrWaitGroup is valid, has no limit on the number of active goroutines,
+// and does not cancel on error.
+type ErrWaitGroup struct {
+ cancel func()
+
+ wg sync.WaitGroup
+
+ sem chan token
+
+ mu sync.Mutex
+ err error
+}
+
+func (g *ErrWaitGroup) done() {
+ if g.sem != nil {
+ <-g.sem
+ }
+ g.wg.Done()
+}
+
+// WithContext returns a new ErrWaitGroup and an associated Context derived from ctx.
+//
+// The derived Context is canceled the first time Wait returns.
+func WithContext(ctx context.Context) (*ErrWaitGroup, context.Context) {
+ ctx, cancel := context.WithCancel(ctx)
+ return &ErrWaitGroup{cancel: cancel}, ctx
+}
+
+// Wait blocks until all function calls from the Go method have returned, then
+// returns the combined non-nil errors (if any) from them.
+func (g *ErrWaitGroup) Wait() error {
+ g.wg.Wait()
+ if g.cancel != nil {
+ g.cancel()
+ }
+ return g.err
+}
+
+// Go calls the given function in a new goroutine.
+// It blocks until the new goroutine can be added without the number of
+// active goroutines in the group exceeding the configured limit.
+func (g *ErrWaitGroup) Go(f func() error) {
+ if g.sem != nil {
+ g.sem <- token{}
+ }
+
+ g.wg.Add(1)
+ go func() {
+ defer g.done()
+
+ if err := f(); err != nil {
+ g.mu.Lock()
+ err = Join(g.err, err)
+ g.mu.Unlock()
+ }
+ }()
+}
+
+// TryGo calls the given function in a new goroutine only if the number of
+// active goroutines in the group is currently below the configured limit.
+//
+// The return value reports whether the goroutine was started.
+func (g *ErrWaitGroup) TryGo(f func() error) bool {
+ if g.sem != nil {
+ select {
+ case g.sem <- token{}:
+ // Note: this allows barging iff channels in general allow barging.
+ default:
+ return false
+ }
+ }
+
+ g.wg.Add(1)
+ go func() {
+ defer g.done()
+
+ if err := f(); err != nil {
+ g.mu.Lock()
+ err = Join(g.err, err)
+ g.mu.Unlock()
+ }
+ }()
+ return true
+}
+
+// SetLimit limits the number of active goroutines in this group to at most n.
+// A negative value indicates no limit.
+//
+// Any subsequent call to the Go method will block until it can add an active
+// goroutine without exceeding the configured limit.
+//
+// The limit must not be modified while any goroutines in the group are active.
+func (g *ErrWaitGroup) SetLimit(n int) {
+ if n < 0 {
+ g.sem = nil
+ return
+ }
+ if len(g.sem) != 0 {
+ panic(fmt.Errorf("errwaitgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
+ }
+ g.sem = make(chan token, n)
+}
diff --git a/util/multierr/join.go b/util/multierr/join.go
new file mode 100644
index 00000000..880ba095
--- /dev/null
+++ b/util/multierr/join.go
@@ -0,0 +1,51 @@
+package multierr
+
+// Copyright 2022 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Join returns an error that wraps the given errors.
+// Any nil error values are discarded.
+// Join returns nil if errs contains no non-nil values.
+// The error formats as the concatenation of the strings obtained
+// by calling the Error method of each element of errs, with a newline
+// between each string.
+func Join(errs ...error) error {
+ n := 0
+ for _, err := range errs {
+ if err != nil {
+ n++
+ }
+ }
+ if n == 0 {
+ return nil
+ }
+ e := &joinError{
+ errs: make([]error, 0, n),
+ }
+ for _, err := range errs {
+ if err != nil {
+ e.errs = append(e.errs, err)
+ }
+ }
+ return e
+}
+
+type joinError struct {
+ errs []error
+}
+
+func (e *joinError) Error() string {
+ var b []byte
+ for i, err := range e.errs {
+ if i > 0 {
+ b = append(b, '\n')
+ }
+ b = append(b, err.Error()...)
+ }
+ return string(b)
+}
+
+func (e *joinError) Unwrap() []error {
+ return e.errs
+}