From 45f5f852b71a63c142bca8b05efe53eebf142594 Mon Sep 17 00:00:00 2001 From: Michael Muré Date: Sat, 13 Aug 2022 12:08:48 +0200 Subject: core: generalized resolvers to resolve any entity time when unmarshalling an operation --- board/resolver_test.go | 28 ++++++++++++ bug/bug.go | 24 +++++++---- bug/bug_actions.go | 13 ++---- bug/operation.go | 4 +- bug/resolver.go | 21 +++++++++ cache/repo_cache.go | 14 +++--- cache/repo_cache_bug.go | 2 +- cache/repo_cache_common.go | 2 +- cache/resolvers.go | 39 +++++++---------- entity/dag/clock.go | 21 +++------ entity/dag/common_test.go | 42 +++++++++--------- entity/dag/entity.go | 90 +++++++++++++++++++++++++++++++++++---- entity/dag/entity_actions.go | 14 +++--- entity/dag/entity_actions_test.go | 38 ++++++++--------- entity/dag/example_test.go | 18 +++++--- entity/dag/operation_pack.go | 42 +++++++++++++++--- entity/resolver.go | 74 ++++++++++++++++++++++++++++++++ identity/resolver.go | 47 +++----------------- 18 files changed, 355 insertions(+), 178 deletions(-) create mode 100644 board/resolver_test.go create mode 100644 bug/resolver.go create mode 100644 entity/resolver.go diff --git a/board/resolver_test.go b/board/resolver_test.go new file mode 100644 index 00000000..77bfbe61 --- /dev/null +++ b/board/resolver_test.go @@ -0,0 +1,28 @@ +package board + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/MichaelMure/git-bug/bug" + "github.com/MichaelMure/git-bug/entity" + "github.com/MichaelMure/git-bug/identity" + "github.com/MichaelMure/git-bug/repository" +) + +func TestResolvers(t *testing.T) { + repo := repository.NewMockRepo() + + rs := entity.Resolvers{ + &identity.IdentityStub{}: identity.NewStubResolver(), + &identity.Identity{}: identity.NewSimpleResolver(repo), + &bug.Bug{}: bug.NewSimpleResolver(repo), + } + + ide, err := entity.Resolve[identity.Interface](rs, "foo") + require.NoError(t, err) + + fmt.Println(ide) +} diff --git a/bug/bug.go b/bug/bug.go index dce30f76..65fb621e 100644 --- a/bug/bug.go +++ b/bug/bug.go @@ -42,14 +42,20 @@ func NewBug() *Bug { } } +func simpleResolvers(repo repository.ClockedRepo) entity.Resolvers { + return entity.Resolvers{ + &identity.Identity{}: identity.NewSimpleResolver(repo), + } +} + // Read will read a bug from a repository func Read(repo repository.ClockedRepo, id entity.Id) (*Bug, error) { - return ReadWithResolver(repo, identity.NewSimpleResolver(repo), id) + return ReadWithResolver(repo, simpleResolvers(repo), id) } -// ReadWithResolver will read a bug from its Id, with a custom identity.Resolver -func ReadWithResolver(repo repository.ClockedRepo, identityResolver identity.Resolver, id entity.Id) (*Bug, error) { - e, err := dag.Read(def, repo, identityResolver, id) +// ReadWithResolver will read a bug from its Id, with custom resolvers +func ReadWithResolver(repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (*Bug, error) { + e, err := dag.Read(def, repo, resolvers, id) if err != nil { return nil, err } @@ -63,22 +69,22 @@ type StreamedBug struct { // ReadAll read and parse all local bugs func ReadAll(repo repository.ClockedRepo) <-chan StreamedBug { - return readAll(repo, identity.NewSimpleResolver(repo)) + return readAll(repo, simpleResolvers(repo)) } // ReadAllWithResolver read and parse all local bugs -func ReadAllWithResolver(repo repository.ClockedRepo, identityResolver identity.Resolver) <-chan StreamedBug { - return readAll(repo, identityResolver) +func ReadAllWithResolver(repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan StreamedBug { + return readAll(repo, resolvers) } // Read and parse all available bug with a given ref prefix -func readAll(repo repository.ClockedRepo, identityResolver identity.Resolver) <-chan StreamedBug { +func readAll(repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan StreamedBug { out := make(chan StreamedBug) go func() { defer close(out) - for streamedEntity := range dag.ReadAll(def, repo, identityResolver) { + for streamedEntity := range dag.ReadAll(def, repo, resolvers) { if streamedEntity.Err != nil { out <- StreamedBug{ Err: streamedEntity.Err, diff --git a/bug/bug_actions.go b/bug/bug_actions.go index c8239e41..3a8ec3f0 100644 --- a/bug/bug_actions.go +++ b/bug/bug_actions.go @@ -24,13 +24,13 @@ func Push(repo repository.Repo, remote string) (string, error) { // This function will return an error if a merge fail // Note: an author is necessary for the case where a merge commit is created, as this commit will // have an author and may be signed if a signing key is available. -func Pull(repo repository.ClockedRepo, remote string, mergeAuthor identity.Interface) error { +func Pull(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) error { _, err := Fetch(repo, remote) if err != nil { return err } - for merge := range MergeAll(repo, remote, mergeAuthor) { + for merge := range MergeAll(repo, resolvers, remote, mergeAuthor) { if merge.Err != nil { return merge.Err } @@ -45,18 +45,13 @@ func Pull(repo repository.ClockedRepo, remote string, mergeAuthor identity.Inter // MergeAll will merge all the available remote bug // Note: an author is necessary for the case where a merge commit is created, as this commit will // have an author and may be signed if a signing key is available. -func MergeAll(repo repository.ClockedRepo, remote string, mergeAuthor identity.Interface) <-chan entity.MergeResult { - // no caching for the merge, we load everything from git even if that means multiple - // copy of the same entity in memory. The cache layer will intercept the results to - // invalidate entities if necessary. - identityResolver := identity.NewSimpleResolver(repo) - +func MergeAll(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) <-chan entity.MergeResult { out := make(chan entity.MergeResult) go func() { defer close(out) - results := dag.MergeAll(def, repo, identityResolver, remote, mergeAuthor) + results := dag.MergeAll(def, repo, resolvers, remote, mergeAuthor) // wrap the dag.Entity into a complete Bug for result := range results { diff --git a/bug/operation.go b/bug/operation.go index 9c87d8f3..a02fc780 100644 --- a/bug/operation.go +++ b/bug/operation.go @@ -4,8 +4,8 @@ import ( "encoding/json" "fmt" + "github.com/MichaelMure/git-bug/entity" "github.com/MichaelMure/git-bug/entity/dag" - "github.com/MichaelMure/git-bug/identity" ) const ( @@ -32,7 +32,7 @@ type Operation interface { var _ Operation = &dag.NoOpOperation[*Snapshot]{} var _ Operation = &dag.SetMetadataOperation[*Snapshot]{} -func operationUnmarshaller(raw json.RawMessage, resolver identity.Resolver) (dag.Operation, error) { +func operationUnmarshaller(raw json.RawMessage, resolvers entity.Resolvers) (dag.Operation, error) { var t struct { OperationType dag.OperationType `json:"type"` } diff --git a/bug/resolver.go b/bug/resolver.go new file mode 100644 index 00000000..e7beb0e4 --- /dev/null +++ b/bug/resolver.go @@ -0,0 +1,21 @@ +package bug + +import ( + "github.com/MichaelMure/git-bug/entity" + "github.com/MichaelMure/git-bug/repository" +) + +var _ entity.Resolver = &SimpleResolver{} + +// SimpleResolver is a Resolver loading Bugs directly from a Repo +type SimpleResolver struct { + repo repository.ClockedRepo +} + +func NewSimpleResolver(repo repository.ClockedRepo) *SimpleResolver { + return &SimpleResolver{repo: repo} +} + +func (r *SimpleResolver) Resolve(id entity.Id) (entity.Interface, error) { + return Read(r.repo, id) +} diff --git a/cache/repo_cache.go b/cache/repo_cache.go index 53948331..8af221bb 100644 --- a/cache/repo_cache.go +++ b/cache/repo_cache.go @@ -49,6 +49,9 @@ type RepoCache struct { // the name of the repository, as defined in the MultiRepoCache name string + // resolvers for all known entities + resolvers entity.Resolvers + // maximum number of loaded bugs maxLoadedBugs int @@ -84,6 +87,8 @@ func NewNamedRepoCache(r repository.ClockedRepo, name string) (*RepoCache, error identities: make(map[entity.Id]*IdentityCache), } + c.resolvers = makeResolvers(c) + err := c.lock() if err != nil { return &RepoCache{}, err @@ -168,13 +173,6 @@ func (c *RepoCache) Close() error { } func (c *RepoCache) buildCache() error { - // TODO: make that parallel - - c.muBug.Lock() - defer c.muBug.Unlock() - c.muIdentity.Lock() - defer c.muIdentity.Unlock() - _, _ = fmt.Fprintf(os.Stderr, "Building identity cache... ") c.identitiesExcerpts = make(map[entity.Id]*IdentityExcerpt) @@ -195,7 +193,7 @@ func (c *RepoCache) buildCache() error { c.bugExcerpts = make(map[entity.Id]*BugExcerpt) - allBugs := bug.ReadAllWithResolver(c.repo, newIdentityCacheResolverNoLock(c)) + allBugs := bug.ReadAllWithResolver(c.repo, c.resolvers) // wipe the index just to be sure err := c.repo.ClearBleveIndex("bug") diff --git a/cache/repo_cache_bug.go b/cache/repo_cache_bug.go index 6af9fc04..a3f195ff 100644 --- a/cache/repo_cache_bug.go +++ b/cache/repo_cache_bug.go @@ -153,7 +153,7 @@ func (c *RepoCache) ResolveBug(id entity.Id) (*BugCache, error) { } c.muBug.RUnlock() - b, err := bug.ReadWithResolver(c.repo, newIdentityCacheResolver(c), id) + b, err := bug.ReadWithResolver(c.repo, c.resolvers, id) if err != nil { return nil, err } diff --git a/cache/repo_cache_common.go b/cache/repo_cache_common.go index 66797e80..49ec72d0 100644 --- a/cache/repo_cache_common.go +++ b/cache/repo_cache_common.go @@ -118,7 +118,7 @@ func (c *RepoCache) MergeAll(remote string) <-chan entity.MergeResult { } } - results = bug.MergeAll(c.repo, remote, author) + results = bug.MergeAll(c.repo, c.resolvers, remote, author) for result := range results { out <- result diff --git a/cache/resolvers.go b/cache/resolvers.go index e53c3660..9ed2fa4c 100644 --- a/cache/resolvers.go +++ b/cache/resolvers.go @@ -2,10 +2,16 @@ package cache import ( "github.com/MichaelMure/git-bug/entity" - "github.com/MichaelMure/git-bug/identity" ) -var _ identity.Resolver = &identityCacheResolver{} +func makeResolvers(cache *RepoCache) entity.Resolvers { + return entity.Resolvers{ + &IdentityCache{}: newIdentityCacheResolver(cache), + &BugCache{}: newBugCacheResolver(cache), + } +} + +var _ entity.Resolver = &identityCacheResolver{} // identityCacheResolver is an identity Resolver that retrieve identities from // the cache @@ -17,35 +23,20 @@ func newIdentityCacheResolver(cache *RepoCache) *identityCacheResolver { return &identityCacheResolver{cache: cache} } -func (i *identityCacheResolver) ResolveIdentity(id entity.Id) (identity.Interface, error) { +func (i *identityCacheResolver) Resolve(id entity.Id) (entity.Interface, error) { return i.cache.ResolveIdentity(id) } -var _ identity.Resolver = &identityCacheResolverNoLock{} +var _ entity.Resolver = &bugCacheResolver{} -// identityCacheResolverNoLock is an identity Resolver that retrieve identities from -// the cache, without locking it. -type identityCacheResolverNoLock struct { +type bugCacheResolver struct { cache *RepoCache } -func newIdentityCacheResolverNoLock(cache *RepoCache) *identityCacheResolverNoLock { - return &identityCacheResolverNoLock{cache: cache} +func newBugCacheResolver(cache *RepoCache) *bugCacheResolver { + return &bugCacheResolver{cache: cache} } -func (ir *identityCacheResolverNoLock) ResolveIdentity(id entity.Id) (identity.Interface, error) { - cached, ok := ir.cache.identities[id] - if ok { - return cached, nil - } - - i, err := identity.ReadLocal(ir.cache.repo, id) - if err != nil { - return nil, err - } - - cached = NewIdentityCache(ir.cache, i) - ir.cache.identities[id] = cached - - return cached, nil +func (b *bugCacheResolver) Resolve(id entity.Id) (entity.Interface, error) { + return b.cache.ResolveBug(id) } diff --git a/entity/dag/clock.go b/entity/dag/clock.go index 793fa1bf..74a6cd73 100644 --- a/entity/dag/clock.go +++ b/entity/dag/clock.go @@ -3,7 +3,8 @@ package dag import ( "fmt" - "github.com/MichaelMure/git-bug/identity" + "golang.org/x/sync/errgroup" + "github.com/MichaelMure/git-bug/repository" ) @@ -18,21 +19,13 @@ func ClockLoader(defs ...Definition) repository.ClockLoader { return repository.ClockLoader{ Clocks: clocks, Witnesser: func(repo repository.ClockedRepo) error { - // we need to actually load the identities because of the commit signature check when reading, - // which require the full identities with crypto keys - resolver := identity.NewCachedResolver(identity.NewSimpleResolver(repo)) - + var errG errgroup.Group for _, def := range defs { - // we actually just need to read all entities, - // as that will create and update the clocks - // TODO: concurrent loading to be faster? - for b := range ReadAll(def, repo, resolver) { - if b.Err != nil { - return b.Err - } - } + errG.Go(func() error { + return ReadAllClocksNoCheck(def, repo) + }) } - return nil + return errG.Wait() }, } } diff --git a/entity/dag/common_test.go b/entity/dag/common_test.go index 774acba8..df8622d4 100644 --- a/entity/dag/common_test.go +++ b/entity/dag/common_test.go @@ -59,7 +59,7 @@ func (op *op2) Id() entity.Id { func (op *op2) Validate() error { return nil } -func unmarshaler(raw json.RawMessage, resolver identity.Resolver) (Operation, error) { +func unmarshaler(raw json.RawMessage, resolvers entity.Resolvers) (Operation, error) { var t struct { OperationType OperationType `json:"type"` } @@ -91,13 +91,13 @@ func unmarshaler(raw json.RawMessage, resolver identity.Resolver) (Operation, er Identities + repo + definition */ -func makeTestContext() (repository.ClockedRepo, identity.Interface, identity.Interface, identity.Resolver, Definition) { +func makeTestContext() (repository.ClockedRepo, identity.Interface, identity.Interface, entity.Resolvers, Definition) { repo := repository.NewMockRepo() - id1, id2, resolver, def := makeTestContextInternal(repo) - return repo, id1, id2, resolver, def + id1, id2, resolvers, def := makeTestContextInternal(repo) + return repo, id1, id2, resolvers, def } -func makeTestContextRemote(t *testing.T) (repository.ClockedRepo, repository.ClockedRepo, repository.ClockedRepo, identity.Interface, identity.Interface, identity.Resolver, Definition) { +func makeTestContextRemote(t *testing.T) (repository.ClockedRepo, repository.ClockedRepo, repository.ClockedRepo, identity.Interface, identity.Interface, entity.Resolvers, Definition) { repoA := repository.CreateGoGitTestRepo(t, false) repoB := repository.CreateGoGitTestRepo(t, false) remote := repository.CreateGoGitTestRepo(t, true) @@ -122,7 +122,7 @@ func makeTestContextRemote(t *testing.T) (repository.ClockedRepo, repository.Clo return repoA, repoB, remote, id1, id2, resolver, def } -func makeTestContextInternal(repo repository.ClockedRepo) (identity.Interface, identity.Interface, identity.Resolver, Definition) { +func makeTestContextInternal(repo repository.ClockedRepo) (identity.Interface, identity.Interface, entity.Resolvers, Definition) { id1, err := identity.NewIdentity(repo, "name1", "email1") if err != nil { panic(err) @@ -140,16 +140,18 @@ func makeTestContextInternal(repo repository.ClockedRepo) (identity.Interface, i panic(err) } - resolver := identityResolverFunc(func(id entity.Id) (identity.Interface, error) { - switch id { - case id1.Id(): - return id1, nil - case id2.Id(): - return id2, nil - default: - return nil, identity.ErrIdentityNotExist - } - }) + resolvers := entity.Resolvers{ + &identity.Identity{}: entity.ResolverFunc(func(id entity.Id) (entity.Interface, error) { + switch id { + case id1.Id(): + return id1, nil + case id2.Id(): + return id2, nil + default: + return nil, identity.ErrIdentityNotExist + } + }), + } def := Definition{ Typename: "foo", @@ -158,11 +160,5 @@ func makeTestContextInternal(repo repository.ClockedRepo) (identity.Interface, i FormatVersion: 1, } - return id1, id2, resolver, def -} - -type identityResolverFunc func(id entity.Id) (identity.Interface, error) - -func (fn identityResolverFunc) ResolveIdentity(id entity.Id) (identity.Interface, error) { - return fn(id) + return id1, id2, resolvers, def } diff --git a/entity/dag/entity.go b/entity/dag/entity.go index 4ccf0e0e..09f37246 100644 --- a/entity/dag/entity.go +++ b/entity/dag/entity.go @@ -26,7 +26,7 @@ type Definition struct { // the Namespace in git references (bugs, prs, ...) Namespace string // a function decoding a JSON message into an Operation - OperationUnmarshaler func(raw json.RawMessage, resolver identity.Resolver) (Operation, error) + OperationUnmarshaler func(raw json.RawMessage, resolver entity.Resolvers) (Operation, error) // the expected format version number, that can be used for data migration/upgrade FormatVersion uint } @@ -57,29 +57,29 @@ func New(definition Definition) *Entity { } // Read will read and decode a stored local Entity from a repository -func Read(def Definition, repo repository.ClockedRepo, resolver identity.Resolver, id entity.Id) (*Entity, error) { +func Read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (*Entity, error) { if err := id.Validate(); err != nil { return nil, errors.Wrap(err, "invalid id") } ref := fmt.Sprintf("refs/%s/%s", def.Namespace, id.String()) - return read(def, repo, resolver, ref) + return read(def, repo, resolvers, ref) } // readRemote will read and decode a stored remote Entity from a repository -func readRemote(def Definition, repo repository.ClockedRepo, resolver identity.Resolver, remote string, id entity.Id) (*Entity, error) { +func readRemote(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, id entity.Id) (*Entity, error) { if err := id.Validate(); err != nil { return nil, errors.Wrap(err, "invalid id") } ref := fmt.Sprintf("refs/remotes/%s/%s/%s", def.Namespace, remote, id.String()) - return read(def, repo, resolver, ref) + return read(def, repo, resolvers, ref) } // read fetch from git and decode an Entity at an arbitrary git reference. -func read(def Definition, repo repository.ClockedRepo, resolver identity.Resolver, ref string) (*Entity, error) { +func read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers, ref string) (*Entity, error) { rootHash, err := repo.ResolveRef(ref) if err != nil { return nil, err @@ -138,7 +138,7 @@ func read(def Definition, repo repository.ClockedRepo, resolver identity.Resolve return nil, fmt.Errorf("multiple leafs in the entity DAG") } - opp, err := readOperationPack(def, repo, resolver, commit) + opp, err := readOperationPack(def, repo, resolvers, commit) if err != nil { return nil, err } @@ -239,13 +239,65 @@ func read(def Definition, repo repository.ClockedRepo, resolver identity.Resolve }, nil } +// readClockNoCheck fetch from git, read and witness the clocks of an Entity at an arbitrary git reference. +// Note: readClockNoCheck does not verify the integrity of the Entity and could witness incorrect or incomplete +// clocks if so. If data integrity check is a requirement, a flow similar to read without actually reading/decoding +// operation blobs can be implemented instead. +func readClockNoCheck(def Definition, repo repository.ClockedRepo, ref string) error { + rootHash, err := repo.ResolveRef(ref) + if err != nil { + return err + } + + commit, err := repo.ReadCommit(rootHash) + if err != nil { + return err + } + + createTime, editTime, err := readOperationPackClock(repo, commit) + if err != nil { + return err + } + + // if we have more than one commit, we need to find the root to have the create time + if len(commit.Parents) > 0 { + for len(commit.Parents) > 0 { + // The path to the root is irrelevant. + commit, err = repo.ReadCommit(commit.Parents[0]) + if err != nil { + return err + } + } + createTime, _, err = readOperationPackClock(repo, commit) + if err != nil { + return err + } + } + + if createTime <= 0 { + return fmt.Errorf("creation lamport time not set") + } + if editTime <= 0 { + return fmt.Errorf("creation lamport time not set") + } + err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), createTime) + if err != nil { + return err + } + err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), editTime) + if err != nil { + return err + } + return nil +} + type StreamedEntity struct { Entity *Entity Err error } // ReadAll read and parse all local Entity -func ReadAll(def Definition, repo repository.ClockedRepo, resolver identity.Resolver) <-chan StreamedEntity { +func ReadAll(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan StreamedEntity { out := make(chan StreamedEntity) go func() { @@ -260,7 +312,7 @@ func ReadAll(def Definition, repo repository.ClockedRepo, resolver identity.Reso } for _, ref := range refs { - e, err := read(def, repo, resolver, ref) + e, err := read(def, repo, resolvers, ref) if err != nil { out <- StreamedEntity{Err: err} @@ -274,6 +326,26 @@ func ReadAll(def Definition, repo repository.ClockedRepo, resolver identity.Reso return out } +// ReadAllClocksNoCheck goes over all entities matching Definition and read/witness the corresponding clocks so that the +// repo end up with correct clocks for the next write. +func ReadAllClocksNoCheck(def Definition, repo repository.ClockedRepo) error { + refPrefix := fmt.Sprintf("refs/%s/", def.Namespace) + + refs, err := repo.ListRefs(refPrefix) + if err != nil { + return err + } + + for _, ref := range refs { + err = readClockNoCheck(def, repo, ref) + if err != nil { + return err + } + } + + return nil +} + // Id return the Entity identifier func (e *Entity) Id() entity.Id { // id is the id of the first operation diff --git a/entity/dag/entity_actions.go b/entity/dag/entity_actions.go index 5b6e884d..673799ec 100644 --- a/entity/dag/entity_actions.go +++ b/entity/dag/entity_actions.go @@ -32,13 +32,13 @@ func Push(def Definition, repo repository.Repo, remote string) (string, error) { // Pull will do a Fetch + MergeAll // Contrary to MergeAll, this function will return an error if a merge fail. -func Pull(def Definition, repo repository.ClockedRepo, resolver identity.Resolver, remote string, author identity.Interface) error { +func Pull(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, author identity.Interface) error { _, err := Fetch(def, repo, remote) if err != nil { return err } - for merge := range MergeAll(def, repo, resolver, remote, author) { + for merge := range MergeAll(def, repo, resolvers, remote, author) { if merge.Err != nil { return merge.Err } @@ -68,7 +68,7 @@ func Pull(def Definition, repo repository.ClockedRepo, resolver identity.Resolve // // Note: an author is necessary for the case where a merge commit is created, as this commit will // have an author and may be signed if a signing key is available. -func MergeAll(def Definition, repo repository.ClockedRepo, resolver identity.Resolver, remote string, author identity.Interface) <-chan entity.MergeResult { +func MergeAll(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, author identity.Interface) <-chan entity.MergeResult { out := make(chan entity.MergeResult) go func() { @@ -82,7 +82,7 @@ func MergeAll(def Definition, repo repository.ClockedRepo, resolver identity.Res } for _, remoteRef := range remoteRefs { - out <- merge(def, repo, resolver, remoteRef, author) + out <- merge(def, repo, resolvers, remoteRef, author) } }() @@ -91,14 +91,14 @@ func MergeAll(def Definition, repo repository.ClockedRepo, resolver identity.Res // merge perform a merge to make sure a local Entity is up-to-date. // See MergeAll for more details. -func merge(def Definition, repo repository.ClockedRepo, resolver identity.Resolver, remoteRef string, author identity.Interface) entity.MergeResult { +func merge(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers, remoteRef string, author identity.Interface) entity.MergeResult { id := entity.RefToId(remoteRef) if err := id.Validate(); err != nil { return entity.NewMergeInvalidStatus(id, errors.Wrap(err, "invalid ref").Error()) } - remoteEntity, err := read(def, repo, resolver, remoteRef) + remoteEntity, err := read(def, repo, resolvers, remoteRef) if err != nil { return entity.NewMergeInvalidStatus(id, errors.Wrapf(err, "remote %s is not readable", def.Typename).Error()) @@ -197,7 +197,7 @@ func merge(def Definition, repo repository.ClockedRepo, resolver identity.Resolv // an empty operationPack. // First step is to collect those clocks. - localEntity, err := read(def, repo, resolver, localRef) + localEntity, err := read(def, repo, resolvers, localRef) if err != nil { return entity.NewMergeError(err, id) } diff --git a/entity/dag/entity_actions_test.go b/entity/dag/entity_actions_test.go index 68aa59b8..e6888148 100644 --- a/entity/dag/entity_actions_test.go +++ b/entity/dag/entity_actions_test.go @@ -24,7 +24,7 @@ func allEntities(t testing.TB, bugs <-chan StreamedEntity) []*Entity { } func TestEntityPushPull(t *testing.T) { - repoA, repoB, _, id1, id2, resolver, def := makeTestContextRemote(t) + repoA, repoB, _, id1, id2, resolvers, def := makeTestContextRemote(t) // A --> remote --> B e := New(def) @@ -36,10 +36,10 @@ func TestEntityPushPull(t *testing.T) { _, err = Push(def, repoA, "remote") require.NoError(t, err) - err = Pull(def, repoB, resolver, "remote", id1) + err = Pull(def, repoB, resolvers, "remote", id1) require.NoError(t, err) - entities := allEntities(t, ReadAll(def, repoB, resolver)) + entities := allEntities(t, ReadAll(def, repoB, resolvers)) require.Len(t, entities, 1) // B --> remote --> A @@ -52,15 +52,15 @@ func TestEntityPushPull(t *testing.T) { _, err = Push(def, repoB, "remote") require.NoError(t, err) - err = Pull(def, repoA, resolver, "remote", id1) + err = Pull(def, repoA, resolvers, "remote", id1) require.NoError(t, err) - entities = allEntities(t, ReadAll(def, repoB, resolver)) + entities = allEntities(t, ReadAll(def, repoB, resolvers)) require.Len(t, entities, 2) } func TestListLocalIds(t *testing.T) { - repoA, repoB, _, id1, id2, resolver, def := makeTestContextRemote(t) + repoA, repoB, _, id1, id2, resolvers, def := makeTestContextRemote(t) // A --> remote --> B e := New(def) @@ -85,7 +85,7 @@ func TestListLocalIds(t *testing.T) { listLocalIds(t, def, repoA, 2) listLocalIds(t, def, repoB, 0) - err = Pull(def, repoB, resolver, "remote", id1) + err = Pull(def, repoB, resolvers, "remote", id1) require.NoError(t, err) listLocalIds(t, def, repoA, 2) @@ -204,7 +204,7 @@ func assertNotEqualRefs(t *testing.T, repoA, repoB repository.RepoData, prefix s } func TestMerge(t *testing.T) { - repoA, repoB, _, id1, id2, resolver, def := makeTestContextRemote(t) + repoA, repoB, _, id1, id2, resolvers, def := makeTestContextRemote(t) // SCENARIO 1 // if the remote Entity doesn't exist locally, it's created @@ -228,7 +228,7 @@ func TestMerge(t *testing.T) { _, err = Fetch(def, repoB, "remote") require.NoError(t, err) - results := MergeAll(def, repoB, resolver, "remote", id1) + results := MergeAll(def, repoB, resolvers, "remote", id1) assertMergeResults(t, []entity.MergeResult{ { @@ -246,7 +246,7 @@ func TestMerge(t *testing.T) { // SCENARIO 2 // if the remote and local Entity have the same state, nothing is changed - results = MergeAll(def, repoB, resolver, "remote", id1) + results = MergeAll(def, repoB, resolvers, "remote", id1) assertMergeResults(t, []entity.MergeResult{ { @@ -272,7 +272,7 @@ func TestMerge(t *testing.T) { err = e2A.Commit(repoA) require.NoError(t, err) - results = MergeAll(def, repoA, resolver, "remote", id1) + results = MergeAll(def, repoA, resolvers, "remote", id1) assertMergeResults(t, []entity.MergeResult{ { @@ -297,7 +297,7 @@ func TestMerge(t *testing.T) { _, err = Fetch(def, repoB, "remote") require.NoError(t, err) - results = MergeAll(def, repoB, resolver, "remote", id1) + results = MergeAll(def, repoB, resolvers, "remote", id1) assertMergeResults(t, []entity.MergeResult{ { @@ -324,10 +324,10 @@ func TestMerge(t *testing.T) { err = e2A.Commit(repoA) require.NoError(t, err) - e1B, err := Read(def, repoB, resolver, e1A.Id()) + e1B, err := Read(def, repoB, resolvers, e1A.Id()) require.NoError(t, err) - e2B, err := Read(def, repoB, resolver, e2A.Id()) + e2B, err := Read(def, repoB, resolvers, e2A.Id()) require.NoError(t, err) e1B.Append(newOp1(id1, "barbarfoofoo")) @@ -344,7 +344,7 @@ func TestMerge(t *testing.T) { _, err = Fetch(def, repoB, "remote") require.NoError(t, err) - results = MergeAll(def, repoB, resolver, "remote", id1) + results = MergeAll(def, repoB, resolvers, "remote", id1) assertMergeResults(t, []entity.MergeResult{ { @@ -365,7 +365,7 @@ func TestMerge(t *testing.T) { _, err = Fetch(def, repoA, "remote") require.NoError(t, err) - results = MergeAll(def, repoA, resolver, "remote", id1) + results = MergeAll(def, repoA, resolvers, "remote", id1) assertMergeResults(t, []entity.MergeResult{ { @@ -384,7 +384,7 @@ func TestMerge(t *testing.T) { } func TestRemove(t *testing.T) { - repoA, _, _, id1, _, resolver, def := makeTestContextRemote(t) + repoA, _, _, id1, _, resolvers, def := makeTestContextRemote(t) e := New(def) e.Append(newOp1(id1, "foo")) @@ -396,10 +396,10 @@ func TestRemove(t *testing.T) { err = Remove(def, repoA, e.Id()) require.NoError(t, err) - _, err = Read(def, repoA, resolver, e.Id()) + _, err = Read(def, repoA, resolvers, e.Id()) require.Error(t, err) - _, err = readRemote(def, repoA, resolver, "remote", e.Id()) + _, err = readRemote(def, repoA, resolvers, "remote", e.Id()) require.Error(t, err) // Remove is idempotent diff --git a/entity/dag/example_test.go b/entity/dag/example_test.go index 94850bd9..39d77f8d 100644 --- a/entity/dag/example_test.go +++ b/entity/dag/example_test.go @@ -214,7 +214,7 @@ var def = dag.Definition{ // operationUnmarshaller is a function doing the de-serialization of the JSON data into our own // concrete Operations. If needed, we can use the resolver to connect to other entities. -func operationUnmarshaller(raw json.RawMessage, resolver identity.Resolver) (dag.Operation, error) { +func operationUnmarshaller(raw json.RawMessage, resolvers entity.Resolvers) (dag.Operation, error) { var t struct { OperationType dag.OperationType `json:"type"` } @@ -245,7 +245,7 @@ func operationUnmarshaller(raw json.RawMessage, resolver identity.Resolver) (dag case *AddAdministrator: // We need to resolve identities for i, stub := range op.ToAdd { - iden, err := resolver.ResolveIdentity(stub.Id()) + iden, err := entity.Resolve[identity.Interface](resolvers, stub.Id()) if err != nil { return nil, err } @@ -254,7 +254,7 @@ func operationUnmarshaller(raw json.RawMessage, resolver identity.Resolver) (dag case *RemoveAdministrator: // We need to resolve identities for i, stub := range op.ToRemove { - iden, err := resolver.ResolveIdentity(stub.Id()) + iden, err := entity.Resolve[identity.Interface](resolvers, stub.Id()) if err != nil { return nil, err } @@ -282,13 +282,21 @@ func (pc ProjectConfig) Compile() *Snapshot { // Read is a helper to load a ProjectConfig from a Repository func Read(repo repository.ClockedRepo, id entity.Id) (*ProjectConfig, error) { - e, err := dag.Read(def, repo, identity.NewSimpleResolver(repo), id) + e, err := dag.Read(def, repo, simpleResolvers(repo), id) if err != nil { return nil, err } return &ProjectConfig{Entity: e}, nil } +func simpleResolvers(repo repository.ClockedRepo) entity.Resolvers { + // resolvers can look a bit complex or out of place here, but it's an important concept + // to allow caching and flexibility when constructing the final app. + return entity.Resolvers{ + &identity.Identity{}: identity.NewSimpleResolver(repo), + } +} + func Example_entity() { const gitBugNamespace = "git-bug" // Note: this example ignore errors for readability @@ -323,7 +331,7 @@ func Example_entity() { _ = confRene.Commit(repoRene) // Isaac pull and read the config - _ = dag.Pull(def, repoIsaac, identity.NewSimpleResolver(repoIsaac), "origin", isaac) + _ = dag.Pull(def, repoIsaac, simpleResolvers(repoIsaac), "origin", isaac) confIsaac, _ := Read(repoIsaac, confRene.Id()) // Compile gives the current state of the config diff --git a/entity/dag/operation_pack.go b/entity/dag/operation_pack.go index b2973343..b32a699f 100644 --- a/entity/dag/operation_pack.go +++ b/entity/dag/operation_pack.go @@ -206,7 +206,7 @@ func (opp *operationPack) makeExtraTree() []repository.TreeEntry { // readOperationPack read the operationPack encoded in git at the given Tree hash. // // Validity of the Lamport clocks is left for the caller to decide. -func readOperationPack(def Definition, repo repository.RepoData, resolver identity.Resolver, commit repository.Commit) (*operationPack, error) { +func readOperationPack(def Definition, repo repository.RepoData, resolvers entity.Resolvers, commit repository.Commit) (*operationPack, error) { entries, err := repo.ReadTree(commit.TreeHash) if err != nil { return nil, err @@ -247,7 +247,7 @@ func readOperationPack(def Definition, repo repository.RepoData, resolver identi if err != nil { return nil, errors.Wrap(err, "failed to read git blob data") } - ops, author, err = unmarshallPack(def, resolver, data) + ops, author, err = unmarshallPack(def, resolvers, data) if err != nil { return nil, err } @@ -288,10 +288,42 @@ func readOperationPack(def Definition, repo repository.RepoData, resolver identi }, nil } +// readOperationPackClock is similar to readOperationPack but only read and decode the Lamport clocks. +// Validity of those is left for the caller to decide. +func readOperationPackClock(repo repository.RepoData, commit repository.Commit) (lamport.Time, lamport.Time, error) { + entries, err := repo.ReadTree(commit.TreeHash) + if err != nil { + return 0, 0, err + } + + var createTime lamport.Time + var editTime lamport.Time + + for _, entry := range entries { + switch { + case strings.HasPrefix(entry.Name, createClockEntryPrefix): + v, err := strconv.ParseUint(strings.TrimPrefix(entry.Name, createClockEntryPrefix), 10, 64) + if err != nil { + return 0, 0, errors.Wrap(err, "can't read creation lamport time") + } + createTime = lamport.Time(v) + + case strings.HasPrefix(entry.Name, editClockEntryPrefix): + v, err := strconv.ParseUint(strings.TrimPrefix(entry.Name, editClockEntryPrefix), 10, 64) + if err != nil { + return 0, 0, errors.Wrap(err, "can't read edit lamport time") + } + editTime = lamport.Time(v) + } + } + + return createTime, editTime, nil +} + // unmarshallPack delegate the unmarshalling of the Operation's JSON to the decoding // function provided by the concrete entity. This gives access to the concrete type of each // Operation. -func unmarshallPack(def Definition, resolver identity.Resolver, data []byte) ([]Operation, identity.Interface, error) { +func unmarshallPack(def Definition, resolvers entity.Resolvers, data []byte) ([]Operation, identity.Interface, error) { aux := struct { Author identity.IdentityStub `json:"author"` Operations []json.RawMessage `json:"ops"` @@ -305,7 +337,7 @@ func unmarshallPack(def Definition, resolver identity.Resolver, data []byte) ([] return nil, nil, fmt.Errorf("missing author") } - author, err := resolver.ResolveIdentity(aux.Author.Id()) + author, err := entity.Resolve[identity.Interface](resolvers, aux.Author.Id()) if err != nil { return nil, nil, err } @@ -314,7 +346,7 @@ func unmarshallPack(def Definition, resolver identity.Resolver, data []byte) ([] for _, raw := range aux.Operations { // delegate to specialized unmarshal function - op, err := def.OperationUnmarshaler(raw, resolver) + op, err := def.OperationUnmarshaler(raw, resolvers) if err != nil { return nil, nil, err } diff --git a/entity/resolver.go b/entity/resolver.go new file mode 100644 index 00000000..d4fe5d3e --- /dev/null +++ b/entity/resolver.go @@ -0,0 +1,74 @@ +package entity + +import ( + "fmt" + "sync" +) + +// Resolver is an interface to find an Entity from its Id +type Resolver interface { + Resolve(id Id) (Interface, error) +} + +// Resolvers is a collection of Resolver, for different type of Entity +type Resolvers map[Interface]Resolver + +// Resolve use the appropriate sub-resolver for the given type and find the Entity matching the Id. +func Resolve[T Interface](rs Resolvers, id Id) (T, error) { + var zero T + for t, resolver := range rs { + switch t.(type) { + case T: + val, err := resolver.(Resolver).Resolve(id) + if err != nil { + return zero, err + } + return val.(T), nil + } + } + return zero, fmt.Errorf("unknown type to resolve") +} + +var _ Resolver = &CachedResolver{} + +// CachedResolver is a resolver ensuring that loading is done only once through another Resolver. +type CachedResolver struct { + resolver Resolver + mu sync.RWMutex + entities map[Id]Interface +} + +func NewCachedResolver(resolver Resolver) *CachedResolver { + return &CachedResolver{ + resolver: resolver, + entities: make(map[Id]Interface), + } +} + +func (c *CachedResolver) Resolve(id Id) (Interface, error) { + c.mu.RLock() + if i, ok := c.entities[id]; ok { + c.mu.RUnlock() + return i, nil + } + c.mu.RUnlock() + + c.mu.Lock() + defer c.mu.Unlock() + + i, err := c.resolver.Resolve(id) + if err != nil { + return nil, err + } + c.entities[id] = i + return i, nil +} + +var _ Resolver = ResolverFunc(nil) + +// ResolverFunc is a helper to morph a function resolver into a Resolver +type ResolverFunc func(id Id) (Interface, error) + +func (fn ResolverFunc) Resolve(id Id) (Interface, error) { + return fn(id) +} diff --git a/identity/resolver.go b/identity/resolver.go index 8e066e9d..5468a8f8 100644 --- a/identity/resolver.go +++ b/identity/resolver.go @@ -1,17 +1,11 @@ package identity import ( - "sync" - "github.com/MichaelMure/git-bug/entity" "github.com/MichaelMure/git-bug/repository" ) -// Resolver define the interface of an Identity resolver, able to load -// an identity from, for example, a repo or a cache. -type Resolver interface { - ResolveIdentity(id entity.Id) (Interface, error) -} +var _ entity.Resolver = &SimpleResolver{} // SimpleResolver is a Resolver loading Identities directly from a Repo type SimpleResolver struct { @@ -22,10 +16,12 @@ func NewSimpleResolver(repo repository.Repo) *SimpleResolver { return &SimpleResolver{repo: repo} } -func (r *SimpleResolver) ResolveIdentity(id entity.Id) (Interface, error) { +func (r *SimpleResolver) Resolve(id entity.Id) (entity.Interface, error) { return ReadLocal(r.repo, id) } +var _ entity.Resolver = &StubResolver{} + // StubResolver is a Resolver that doesn't load anything, only returning IdentityStub instances type StubResolver struct{} @@ -33,39 +29,6 @@ func NewStubResolver() *StubResolver { return &StubResolver{} } -func (s *StubResolver) ResolveIdentity(id entity.Id) (Interface, error) { +func (s *StubResolver) Resolve(id entity.Id) (entity.Interface, error) { return &IdentityStub{id: id}, nil } - -// CachedResolver is a resolver ensuring that loading is done only once through another Resolver. -type CachedResolver struct { - mu sync.RWMutex - resolver Resolver - identities map[entity.Id]Interface -} - -func NewCachedResolver(resolver Resolver) *CachedResolver { - return &CachedResolver{ - resolver: resolver, - identities: make(map[entity.Id]Interface), - } -} - -func (c *CachedResolver) ResolveIdentity(id entity.Id) (Interface, error) { - c.mu.RLock() - if i, ok := c.identities[id]; ok { - c.mu.RUnlock() - return i, nil - } - c.mu.RUnlock() - - c.mu.Lock() - defer c.mu.Unlock() - - i, err := c.resolver.ResolveIdentity(id) - if err != nil { - return nil, err - } - c.identities[id] = i - return i, nil -} -- cgit