diff options
Diffstat (limited to 'entity/dag/entity.go')
-rw-r--r-- | entity/dag/entity.go | 63 |
1 files changed, 32 insertions, 31 deletions
diff --git a/entity/dag/entity.go b/entity/dag/entity.go index ca674ad7..2028e1b4 100644 --- a/entity/dag/entity.go +++ b/entity/dag/entity.go @@ -59,32 +59,35 @@ func New(definition Definition) *Entity { } // Read will read and decode a stored local Entity from a repository -func Read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (*Entity, error) { +func Read[EntityT entity.Interface](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error) { if err := id.Validate(); err != nil { - return nil, errors.Wrap(err, "invalid id") + return *new(EntityT), errors.Wrap(err, "invalid id") } ref := fmt.Sprintf("refs/%s/%s", def.Namespace, id.String()) - return read(def, repo, resolvers, ref) + return read[EntityT](def, wrapper, repo, resolvers, ref) } // readRemote will read and decode a stored remote Entity from a repository -func readRemote(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, id entity.Id) (*Entity, error) { +func readRemote[EntityT entity.Interface](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, id entity.Id) (EntityT, error) { if err := id.Validate(); err != nil { - return nil, errors.Wrap(err, "invalid id") + return *new(EntityT), errors.Wrap(err, "invalid id") } ref := fmt.Sprintf("refs/remotes/%s/%s/%s", def.Namespace, remote, id.String()) - return read(def, repo, resolvers, ref) + return read[EntityT](def, wrapper, repo, resolvers, ref) } // read fetch from git and decode an Entity at an arbitrary git reference. -func read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers, ref string) (*Entity, error) { +func read[EntityT entity.Interface](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, ref string) (EntityT, error) { rootHash, err := repo.ResolveRef(ref) + if err == repository.ErrNotFound { + return *new(EntityT), entity.NewErrNotFound(def.Typename) + } if err != nil { - return nil, err + return *new(EntityT), err } // Perform a breadth-first search to get a topological order of the DAG where we discover the @@ -104,7 +107,7 @@ func read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolver commit, err := repo.ReadCommit(hash) if err != nil { - return nil, err + return *new(EntityT), err } BFSOrder = append(BFSOrder, commit) @@ -137,26 +140,26 @@ func read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolver // can have no parents. Said otherwise, the DAG need to have exactly // one leaf. if !isFirstCommit && len(commit.Parents) == 0 { - return nil, fmt.Errorf("multiple leafs in the entity DAG") + return *new(EntityT), fmt.Errorf("multiple leafs in the entity DAG") } opp, err := readOperationPack(def, repo, resolvers, commit) if err != nil { - return nil, err + return *new(EntityT), err } err = opp.Validate() if err != nil { - return nil, err + return *new(EntityT), err } if isMerge && len(opp.Operations) > 0 { - return nil, fmt.Errorf("merge commit cannot have operations") + return *new(EntityT), fmt.Errorf("merge commit cannot have operations") } // Check that the create lamport clock is set (not checked in Validate() as it's optional) if isFirstCommit && opp.CreateTime <= 0 { - return nil, fmt.Errorf("creation lamport time not set") + return *new(EntityT), fmt.Errorf("creation lamport time not set") } // make sure that the lamport clocks causality match the DAG topology @@ -167,7 +170,7 @@ func read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolver } if parentPack.EditTime >= opp.EditTime { - return nil, fmt.Errorf("lamport clock ordering doesn't match the DAG") + return *new(EntityT), fmt.Errorf("lamport clock ordering doesn't match the DAG") } // to avoid an attack where clocks are pushed toward the uint64 rollover, make sure @@ -175,7 +178,7 @@ func read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolver // we ignore merge commits here to allow merging after a loooong time without breaking anything, // as long as there is one valid chain of small hops, it's fine. if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 { - return nil, fmt.Errorf("lamport clock jumping too far in the future, likely an attack") + return *new(EntityT), fmt.Errorf("lamport clock jumping too far in the future, likely an attack") } } @@ -187,11 +190,11 @@ func read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolver for _, opp := range oppMap { err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), opp.CreateTime) if err != nil { - return nil, err + return *new(EntityT), err } err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), opp.EditTime) if err != nil { - return nil, err + return *new(EntityT), err } } @@ -232,13 +235,13 @@ func read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolver } } - return &Entity{ + return wrapper(&Entity{ Definition: def, ops: ops, lastCommit: rootHash, createTime: createTime, editTime: editTime, - }, nil + }), nil } // readClockNoCheck fetch from git, read and witness the clocks of an Entity at an arbitrary git reference. @@ -247,6 +250,9 @@ func read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolver // operation blobs can be implemented instead. func readClockNoCheck(def Definition, repo repository.ClockedRepo, ref string) error { rootHash, err := repo.ResolveRef(ref) + if err == repository.ErrNotFound { + return entity.NewErrNotFound(def.Typename) + } if err != nil { return err } @@ -293,14 +299,9 @@ func readClockNoCheck(def Definition, repo repository.ClockedRepo, ref string) e return nil } -type StreamedEntity struct { - Entity *Entity - Err error -} - // ReadAll read and parse all local Entity -func ReadAll(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan StreamedEntity { - out := make(chan StreamedEntity) +func ReadAll[EntityT entity.Interface](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan entity.StreamedEntity[EntityT] { + out := make(chan entity.StreamedEntity[EntityT]) go func() { defer close(out) @@ -309,19 +310,19 @@ func ReadAll(def Definition, repo repository.ClockedRepo, resolvers entity.Resol refs, err := repo.ListRefs(refPrefix) if err != nil { - out <- StreamedEntity{Err: err} + out <- entity.StreamedEntity[EntityT]{Err: err} return } for _, ref := range refs { - e, err := read(def, repo, resolvers, ref) + e, err := read[EntityT](def, wrapper, repo, resolvers, ref) if err != nil { - out <- StreamedEntity{Err: err} + out <- entity.StreamedEntity[EntityT]{Err: err} return } - out <- StreamedEntity{Entity: e} + out <- entity.StreamedEntity[EntityT]{Entity: e} } }() |