aboutsummaryrefslogtreecommitdiffstats
path: root/entity/dag/entity.go
diff options
context:
space:
mode:
Diffstat (limited to 'entity/dag/entity.go')
-rw-r--r--entity/dag/entity.go63
1 files changed, 32 insertions, 31 deletions
diff --git a/entity/dag/entity.go b/entity/dag/entity.go
index ca674ad7..2028e1b4 100644
--- a/entity/dag/entity.go
+++ b/entity/dag/entity.go
@@ -59,32 +59,35 @@ func New(definition Definition) *Entity {
}
// Read will read and decode a stored local Entity from a repository
-func Read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (*Entity, error) {
+func Read[EntityT entity.Interface](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error) {
if err := id.Validate(); err != nil {
- return nil, errors.Wrap(err, "invalid id")
+ return *new(EntityT), errors.Wrap(err, "invalid id")
}
ref := fmt.Sprintf("refs/%s/%s", def.Namespace, id.String())
- return read(def, repo, resolvers, ref)
+ return read[EntityT](def, wrapper, repo, resolvers, ref)
}
// readRemote will read and decode a stored remote Entity from a repository
-func readRemote(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, id entity.Id) (*Entity, error) {
+func readRemote[EntityT entity.Interface](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, id entity.Id) (EntityT, error) {
if err := id.Validate(); err != nil {
- return nil, errors.Wrap(err, "invalid id")
+ return *new(EntityT), errors.Wrap(err, "invalid id")
}
ref := fmt.Sprintf("refs/remotes/%s/%s/%s", def.Namespace, remote, id.String())
- return read(def, repo, resolvers, ref)
+ return read[EntityT](def, wrapper, repo, resolvers, ref)
}
// read fetch from git and decode an Entity at an arbitrary git reference.
-func read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers, ref string) (*Entity, error) {
+func read[EntityT entity.Interface](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, ref string) (EntityT, error) {
rootHash, err := repo.ResolveRef(ref)
+ if err == repository.ErrNotFound {
+ return *new(EntityT), entity.NewErrNotFound(def.Typename)
+ }
if err != nil {
- return nil, err
+ return *new(EntityT), err
}
// Perform a breadth-first search to get a topological order of the DAG where we discover the
@@ -104,7 +107,7 @@ func read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolver
commit, err := repo.ReadCommit(hash)
if err != nil {
- return nil, err
+ return *new(EntityT), err
}
BFSOrder = append(BFSOrder, commit)
@@ -137,26 +140,26 @@ func read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolver
// can have no parents. Said otherwise, the DAG need to have exactly
// one leaf.
if !isFirstCommit && len(commit.Parents) == 0 {
- return nil, fmt.Errorf("multiple leafs in the entity DAG")
+ return *new(EntityT), fmt.Errorf("multiple leafs in the entity DAG")
}
opp, err := readOperationPack(def, repo, resolvers, commit)
if err != nil {
- return nil, err
+ return *new(EntityT), err
}
err = opp.Validate()
if err != nil {
- return nil, err
+ return *new(EntityT), err
}
if isMerge && len(opp.Operations) > 0 {
- return nil, fmt.Errorf("merge commit cannot have operations")
+ return *new(EntityT), fmt.Errorf("merge commit cannot have operations")
}
// Check that the create lamport clock is set (not checked in Validate() as it's optional)
if isFirstCommit && opp.CreateTime <= 0 {
- return nil, fmt.Errorf("creation lamport time not set")
+ return *new(EntityT), fmt.Errorf("creation lamport time not set")
}
// make sure that the lamport clocks causality match the DAG topology
@@ -167,7 +170,7 @@ func read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolver
}
if parentPack.EditTime >= opp.EditTime {
- return nil, fmt.Errorf("lamport clock ordering doesn't match the DAG")
+ return *new(EntityT), fmt.Errorf("lamport clock ordering doesn't match the DAG")
}
// to avoid an attack where clocks are pushed toward the uint64 rollover, make sure
@@ -175,7 +178,7 @@ func read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolver
// we ignore merge commits here to allow merging after a loooong time without breaking anything,
// as long as there is one valid chain of small hops, it's fine.
if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 {
- return nil, fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
+ return *new(EntityT), fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
}
}
@@ -187,11 +190,11 @@ func read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolver
for _, opp := range oppMap {
err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), opp.CreateTime)
if err != nil {
- return nil, err
+ return *new(EntityT), err
}
err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), opp.EditTime)
if err != nil {
- return nil, err
+ return *new(EntityT), err
}
}
@@ -232,13 +235,13 @@ func read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolver
}
}
- return &Entity{
+ return wrapper(&Entity{
Definition: def,
ops: ops,
lastCommit: rootHash,
createTime: createTime,
editTime: editTime,
- }, nil
+ }), nil
}
// readClockNoCheck fetch from git, read and witness the clocks of an Entity at an arbitrary git reference.
@@ -247,6 +250,9 @@ func read(def Definition, repo repository.ClockedRepo, resolvers entity.Resolver
// operation blobs can be implemented instead.
func readClockNoCheck(def Definition, repo repository.ClockedRepo, ref string) error {
rootHash, err := repo.ResolveRef(ref)
+ if err == repository.ErrNotFound {
+ return entity.NewErrNotFound(def.Typename)
+ }
if err != nil {
return err
}
@@ -293,14 +299,9 @@ func readClockNoCheck(def Definition, repo repository.ClockedRepo, ref string) e
return nil
}
-type StreamedEntity struct {
- Entity *Entity
- Err error
-}
-
// ReadAll read and parse all local Entity
-func ReadAll(def Definition, repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan StreamedEntity {
- out := make(chan StreamedEntity)
+func ReadAll[EntityT entity.Interface](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan entity.StreamedEntity[EntityT] {
+ out := make(chan entity.StreamedEntity[EntityT])
go func() {
defer close(out)
@@ -309,19 +310,19 @@ func ReadAll(def Definition, repo repository.ClockedRepo, resolvers entity.Resol
refs, err := repo.ListRefs(refPrefix)
if err != nil {
- out <- StreamedEntity{Err: err}
+ out <- entity.StreamedEntity[EntityT]{Err: err}
return
}
for _, ref := range refs {
- e, err := read(def, repo, resolvers, ref)
+ e, err := read[EntityT](def, wrapper, repo, resolvers, ref)
if err != nil {
- out <- StreamedEntity{Err: err}
+ out <- entity.StreamedEntity[EntityT]{Err: err}
return
}
- out <- StreamedEntity{Entity: e}
+ out <- entity.StreamedEntity[EntityT]{Entity: e}
}
}()