aboutsummaryrefslogtreecommitdiffstats
path: root/entity/dag
diff options
context:
space:
mode:
Diffstat (limited to 'entity/dag')
-rw-r--r--entity/dag/clock.go38
-rw-r--r--entity/dag/common_test.go173
-rw-r--r--entity/dag/entity.go439
-rw-r--r--entity/dag/entity_actions.go260
-rw-r--r--entity/dag/entity_actions_test.go412
-rw-r--r--entity/dag/entity_test.go68
-rw-r--r--entity/dag/operation.go48
-rw-r--r--entity/dag/operation_pack.go358
-rw-r--r--entity/dag/operation_pack_test.go159
9 files changed, 1955 insertions, 0 deletions
diff --git a/entity/dag/clock.go b/entity/dag/clock.go
new file mode 100644
index 00000000..793fa1bf
--- /dev/null
+++ b/entity/dag/clock.go
@@ -0,0 +1,38 @@
+package dag
+
+import (
+ "fmt"
+
+ "github.com/MichaelMure/git-bug/identity"
+ "github.com/MichaelMure/git-bug/repository"
+)
+
+// ClockLoader is the repository.ClockLoader for Entity
+func ClockLoader(defs ...Definition) repository.ClockLoader {
+ clocks := make([]string, 0, len(defs)*2)
+ for _, def := range defs {
+ clocks = append(clocks, fmt.Sprintf(creationClockPattern, def.Namespace))
+ clocks = append(clocks, fmt.Sprintf(editClockPattern, def.Namespace))
+ }
+
+ return repository.ClockLoader{
+ Clocks: clocks,
+ Witnesser: func(repo repository.ClockedRepo) error {
+ // we need to actually load the identities because of the commit signature check when reading,
+ // which require the full identities with crypto keys
+ resolver := identity.NewCachedResolver(identity.NewSimpleResolver(repo))
+
+ for _, def := range defs {
+ // we actually just need to read all entities,
+ // as that will create and update the clocks
+ // TODO: concurrent loading to be faster?
+ for b := range ReadAll(def, repo, resolver) {
+ if b.Err != nil {
+ return b.Err
+ }
+ }
+ }
+ return nil
+ },
+ }
+}
diff --git a/entity/dag/common_test.go b/entity/dag/common_test.go
new file mode 100644
index 00000000..25289b76
--- /dev/null
+++ b/entity/dag/common_test.go
@@ -0,0 +1,173 @@
+package dag
+
+import (
+ "encoding/json"
+ "fmt"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/MichaelMure/git-bug/entity"
+ "github.com/MichaelMure/git-bug/identity"
+ "github.com/MichaelMure/git-bug/repository"
+)
+
+// This file contains an example dummy entity to be used in the tests
+
+/*
+ Operations
+*/
+
+type op1 struct {
+ author identity.Interface
+
+ OperationType int `json:"type"`
+ Field1 string `json:"field_1"`
+ Files []repository.Hash `json:"files"`
+}
+
+func newOp1(author identity.Interface, field1 string, files ...repository.Hash) *op1 {
+ return &op1{author: author, OperationType: 1, Field1: field1, Files: files}
+}
+
+func (o *op1) Id() entity.Id {
+ data, _ := json.Marshal(o)
+ return entity.DeriveId(data)
+}
+
+func (o *op1) Validate() error { return nil }
+
+func (o *op1) Author() identity.Interface {
+ return o.author
+}
+
+func (o *op1) GetFiles() []repository.Hash {
+ return o.Files
+}
+
+type op2 struct {
+ author identity.Interface
+
+ OperationType int `json:"type"`
+ Field2 string `json:"field_2"`
+}
+
+func newOp2(author identity.Interface, field2 string) *op2 {
+ return &op2{author: author, OperationType: 2, Field2: field2}
+}
+
+func (o *op2) Id() entity.Id {
+ data, _ := json.Marshal(o)
+ return entity.DeriveId(data)
+}
+
+func (o *op2) Validate() error { return nil }
+
+func (o *op2) Author() identity.Interface {
+ return o.author
+}
+
+func unmarshaler(author identity.Interface, raw json.RawMessage) (Operation, error) {
+ var t struct {
+ OperationType int `json:"type"`
+ }
+
+ if err := json.Unmarshal(raw, &t); err != nil {
+ return nil, err
+ }
+
+ switch t.OperationType {
+ case 1:
+ op := &op1{}
+ err := json.Unmarshal(raw, &op)
+ op.author = author
+ return op, err
+ case 2:
+ op := &op2{}
+ err := json.Unmarshal(raw, &op)
+ op.author = author
+ return op, err
+ default:
+ return nil, fmt.Errorf("unknown operation type %v", t.OperationType)
+ }
+}
+
+/*
+ Identities + repo + definition
+*/
+
+func makeTestContext() (repository.ClockedRepo, identity.Interface, identity.Interface, identity.Resolver, Definition) {
+ repo := repository.NewMockRepo()
+ id1, id2, resolver, def := makeTestContextInternal(repo)
+ return repo, id1, id2, resolver, def
+}
+
+func makeTestContextRemote(t *testing.T) (repository.ClockedRepo, repository.ClockedRepo, repository.ClockedRepo, identity.Interface, identity.Interface, identity.Resolver, Definition) {
+ repoA := repository.CreateGoGitTestRepo(false)
+ repoB := repository.CreateGoGitTestRepo(false)
+ remote := repository.CreateGoGitTestRepo(true)
+
+ err := repoA.AddRemote("remote", remote.GetLocalRemote())
+ require.NoError(t, err)
+ err = repoA.AddRemote("repoB", repoB.GetLocalRemote())
+ require.NoError(t, err)
+ err = repoB.AddRemote("remote", remote.GetLocalRemote())
+ require.NoError(t, err)
+ err = repoB.AddRemote("repoA", repoA.GetLocalRemote())
+ require.NoError(t, err)
+
+ id1, id2, resolver, def := makeTestContextInternal(repoA)
+
+ // distribute the identities
+ _, err = identity.Push(repoA, "remote")
+ require.NoError(t, err)
+ err = identity.Pull(repoB, "remote")
+ require.NoError(t, err)
+
+ return repoA, repoB, remote, id1, id2, resolver, def
+}
+
+func makeTestContextInternal(repo repository.ClockedRepo) (identity.Interface, identity.Interface, identity.Resolver, Definition) {
+ id1, err := identity.NewIdentity(repo, "name1", "email1")
+ if err != nil {
+ panic(err)
+ }
+ err = id1.Commit(repo)
+ if err != nil {
+ panic(err)
+ }
+ id2, err := identity.NewIdentity(repo, "name2", "email2")
+ if err != nil {
+ panic(err)
+ }
+ err = id2.Commit(repo)
+ if err != nil {
+ panic(err)
+ }
+
+ resolver := identityResolverFunc(func(id entity.Id) (identity.Interface, error) {
+ switch id {
+ case id1.Id():
+ return id1, nil
+ case id2.Id():
+ return id2, nil
+ default:
+ return nil, identity.ErrIdentityNotExist
+ }
+ })
+
+ def := Definition{
+ Typename: "foo",
+ Namespace: "foos",
+ OperationUnmarshaler: unmarshaler,
+ FormatVersion: 1,
+ }
+
+ return id1, id2, resolver, def
+}
+
+type identityResolverFunc func(id entity.Id) (identity.Interface, error)
+
+func (fn identityResolverFunc) ResolveIdentity(id entity.Id) (identity.Interface, error) {
+ return fn(id)
+}
diff --git a/entity/dag/entity.go b/entity/dag/entity.go
new file mode 100644
index 00000000..c4368514
--- /dev/null
+++ b/entity/dag/entity.go
@@ -0,0 +1,439 @@
+// Package dag contains the base common code to define an entity stored
+// in a chain of git objects, supporting actions like Push, Pull and Merge.
+package dag
+
+import (
+ "encoding/json"
+ "fmt"
+ "sort"
+
+ "github.com/pkg/errors"
+
+ "github.com/MichaelMure/git-bug/entity"
+ "github.com/MichaelMure/git-bug/identity"
+ "github.com/MichaelMure/git-bug/repository"
+ "github.com/MichaelMure/git-bug/util/lamport"
+)
+
+const refsPattern = "refs/%s/%s"
+const creationClockPattern = "%s-create"
+const editClockPattern = "%s-edit"
+
+// Definition hold the details defining one specialization of an Entity.
+type Definition struct {
+ // the name of the entity (bug, pull-request, ...)
+ Typename string
+ // the Namespace in git (bugs, prs, ...)
+ Namespace string
+ // a function decoding a JSON message into an Operation
+ OperationUnmarshaler func(author identity.Interface, raw json.RawMessage) (Operation, error)
+ // the expected format version number, that can be used for data migration/upgrade
+ FormatVersion uint
+}
+
+// Entity is a data structure stored in a chain of git objects, supporting actions like Push, Pull and Merge.
+type Entity struct {
+ // A Lamport clock is a logical clock that allow to order event
+ // inside a distributed system.
+ // It must be the first field in this struct due to https://github.com/golang/go/issues/36606
+ createTime lamport.Time
+ editTime lamport.Time
+
+ Definition
+
+ // operations that are already stored in the repository
+ ops []Operation
+ // operations not yet stored in the repository
+ staging []Operation
+
+ lastCommit repository.Hash
+}
+
+// New create an empty Entity
+func New(definition Definition) *Entity {
+ return &Entity{
+ Definition: definition,
+ }
+}
+
+// Read will read and decode a stored local Entity from a repository
+func Read(def Definition, repo repository.ClockedRepo, resolver identity.Resolver, id entity.Id) (*Entity, error) {
+ if err := id.Validate(); err != nil {
+ return nil, errors.Wrap(err, "invalid id")
+ }
+
+ ref := fmt.Sprintf("refs/%s/%s", def.Namespace, id.String())
+
+ return read(def, repo, resolver, ref)
+}
+
+// readRemote will read and decode a stored remote Entity from a repository
+func readRemote(def Definition, repo repository.ClockedRepo, resolver identity.Resolver, remote string, id entity.Id) (*Entity, error) {
+ if err := id.Validate(); err != nil {
+ return nil, errors.Wrap(err, "invalid id")
+ }
+
+ ref := fmt.Sprintf("refs/remotes/%s/%s/%s", def.Namespace, remote, id.String())
+
+ return read(def, repo, resolver, ref)
+}
+
+// read fetch from git and decode an Entity at an arbitrary git reference.
+func read(def Definition, repo repository.ClockedRepo, resolver identity.Resolver, ref string) (*Entity, error) {
+ rootHash, err := repo.ResolveRef(ref)
+ if err != nil {
+ return nil, err
+ }
+
+ // Perform a breadth-first search to get a topological order of the DAG where we discover the
+ // parents commit and go back in time up to the chronological root
+
+ queue := make([]repository.Hash, 0, 32)
+ visited := make(map[repository.Hash]struct{})
+ BFSOrder := make([]repository.Commit, 0, 32)
+
+ queue = append(queue, rootHash)
+ visited[rootHash] = struct{}{}
+
+ for len(queue) > 0 {
+ // pop
+ hash := queue[0]
+ queue = queue[1:]
+
+ commit, err := repo.ReadCommit(hash)
+ if err != nil {
+ return nil, err
+ }
+
+ BFSOrder = append(BFSOrder, commit)
+
+ for _, parent := range commit.Parents {
+ if _, ok := visited[parent]; !ok {
+ queue = append(queue, parent)
+ // mark as visited
+ visited[parent] = struct{}{}
+ }
+ }
+ }
+
+ // Now, we can reverse this topological order and read the commits in an order where
+ // we are sure to have read all the chronological ancestors when we read a commit.
+
+ // Next step is to:
+ // 1) read the operationPacks
+ // 2) make sure that the clocks causality respect the DAG topology.
+
+ oppMap := make(map[repository.Hash]*operationPack)
+ var opsCount int
+
+ for i := len(BFSOrder) - 1; i >= 0; i-- {
+ commit := BFSOrder[i]
+ isFirstCommit := i == len(BFSOrder)-1
+ isMerge := len(commit.Parents) > 1
+
+ // Verify DAG structure: single chronological root, so only the root
+ // can have no parents. Said otherwise, the DAG need to have exactly
+ // one leaf.
+ if !isFirstCommit && len(commit.Parents) == 0 {
+ return nil, fmt.Errorf("multiple leafs in the entity DAG")
+ }
+
+ opp, err := readOperationPack(def, repo, resolver, commit)
+ if err != nil {
+ return nil, err
+ }
+
+ err = opp.Validate()
+ if err != nil {
+ return nil, err
+ }
+
+ if isMerge && len(opp.Operations) > 0 {
+ return nil, fmt.Errorf("merge commit cannot have operations")
+ }
+
+ // Check that the create lamport clock is set (not checked in Validate() as it's optional)
+ if isFirstCommit && opp.CreateTime <= 0 {
+ return nil, fmt.Errorf("creation lamport time not set")
+ }
+
+ // make sure that the lamport clocks causality match the DAG topology
+ for _, parentHash := range commit.Parents {
+ parentPack, ok := oppMap[parentHash]
+ if !ok {
+ panic("DFS failed")
+ }
+
+ if parentPack.EditTime >= opp.EditTime {
+ return nil, fmt.Errorf("lamport clock ordering doesn't match the DAG")
+ }
+
+ // to avoid an attack where clocks are pushed toward the uint64 rollover, make sure
+ // that the clocks don't jump too far in the future
+ // we ignore merge commits here to allow merging after a loooong time without breaking anything,
+ // as long as there is one valid chain of small hops, it's fine.
+ if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 {
+ return nil, fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
+ }
+ }
+
+ oppMap[commit.Hash] = opp
+ opsCount += len(opp.Operations)
+ }
+
+ // The clocks are fine, we witness them
+ for _, opp := range oppMap {
+ err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), opp.CreateTime)
+ if err != nil {
+ return nil, err
+ }
+ err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), opp.EditTime)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ // Now that we know that the topological order and clocks are fine, we order the operationPacks
+ // based on the logical clocks, entirely ignoring the DAG topology
+
+ oppSlice := make([]*operationPack, 0, len(oppMap))
+ for _, pack := range oppMap {
+ oppSlice = append(oppSlice, pack)
+ }
+ sort.Slice(oppSlice, func(i, j int) bool {
+ // Primary ordering with the EditTime.
+ if oppSlice[i].EditTime != oppSlice[j].EditTime {
+ return oppSlice[i].EditTime < oppSlice[j].EditTime
+ }
+ // We have equal EditTime, which means we have concurrent edition over different machines and we
+ // can't tell which one came first. So, what now? We still need a total ordering and the most stable possible.
+ // As a secondary ordering, we can order based on a hash of the serialized Operations in the
+ // operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse.
+ // This is a lexicographic ordering on the stringified ID.
+ return oppSlice[i].Id() < oppSlice[j].Id()
+ })
+
+ // Now that we ordered the operationPacks, we have the order of the Operations
+
+ ops := make([]Operation, 0, opsCount)
+ var createTime lamport.Time
+ var editTime lamport.Time
+ for _, pack := range oppSlice {
+ for _, operation := range pack.Operations {
+ ops = append(ops, operation)
+ }
+ if pack.CreateTime > createTime {
+ createTime = pack.CreateTime
+ }
+ if pack.EditTime > editTime {
+ editTime = pack.EditTime
+ }
+ }
+
+ return &Entity{
+ Definition: def,
+ ops: ops,
+ lastCommit: rootHash,
+ createTime: createTime,
+ editTime: editTime,
+ }, nil
+}
+
+type StreamedEntity struct {
+ Entity *Entity
+ Err error
+}
+
+// ReadAll read and parse all local Entity
+func ReadAll(def Definition, repo repository.ClockedRepo, resolver identity.Resolver) <-chan StreamedEntity {
+ out := make(chan StreamedEntity)
+
+ go func() {
+ defer close(out)
+
+ refPrefix := fmt.Sprintf("refs/%s/", def.Namespace)
+
+ refs, err := repo.ListRefs(refPrefix)
+ if err != nil {
+ out <- StreamedEntity{Err: err}
+ return
+ }
+
+ for _, ref := range refs {
+ e, err := read(def, repo, resolver, ref)
+
+ if err != nil {
+ out <- StreamedEntity{Err: err}
+ return
+ }
+
+ out <- StreamedEntity{Entity: e}
+ }
+ }()
+
+ return out
+}
+
+// Id return the Entity identifier
+func (e *Entity) Id() entity.Id {
+ // id is the id of the first operation
+ return e.FirstOp().Id()
+}
+
+// Validate check if the Entity data is valid
+func (e *Entity) Validate() error {
+ // non-empty
+ if len(e.ops) == 0 && len(e.staging) == 0 {
+ return fmt.Errorf("entity has no operations")
+ }
+
+ // check if each operations are valid
+ for _, op := range e.ops {
+ if err := op.Validate(); err != nil {
+ return err
+ }
+ }
+
+ // check if staging is valid if needed
+ for _, op := range e.staging {
+ if err := op.Validate(); err != nil {
+ return err
+ }
+ }
+
+ // Check that there is no colliding operation's ID
+ ids := make(map[entity.Id]struct{})
+ for _, op := range e.Operations() {
+ if _, ok := ids[op.Id()]; ok {
+ return fmt.Errorf("id collision: %s", op.Id())
+ }
+ ids[op.Id()] = struct{}{}
+ }
+
+ return nil
+}
+
+// Operations return the ordered operations
+func (e *Entity) Operations() []Operation {
+ return append(e.ops, e.staging...)
+}
+
+// FirstOp lookup for the very first operation of the Entity
+func (e *Entity) FirstOp() Operation {
+ for _, op := range e.ops {
+ return op
+ }
+ for _, op := range e.staging {
+ return op
+ }
+ return nil
+}
+
+// LastOp lookup for the very last operation of the Entity
+func (e *Entity) LastOp() Operation {
+ if len(e.staging) > 0 {
+ return e.staging[len(e.staging)-1]
+ }
+ if len(e.ops) > 0 {
+ return e.ops[len(e.ops)-1]
+ }
+ return nil
+}
+
+// Append add a new Operation to the Entity
+func (e *Entity) Append(op Operation) {
+ e.staging = append(e.staging, op)
+}
+
+// NeedCommit indicate if the in-memory state changed and need to be commit in the repository
+func (e *Entity) NeedCommit() bool {
+ return len(e.staging) > 0
+}
+
+// CommitAsNeeded execute a Commit only if necessary. This function is useful to avoid getting an error if the Entity
+// is already in sync with the repository.
+func (e *Entity) CommitAsNeeded(repo repository.ClockedRepo) error {
+ if e.NeedCommit() {
+ return e.Commit(repo)
+ }
+ return nil
+}
+
+// Commit write the appended operations in the repository
+func (e *Entity) Commit(repo repository.ClockedRepo) error {
+ if !e.NeedCommit() {
+ return fmt.Errorf("can't commit an entity with no pending operation")
+ }
+
+ err := e.Validate()
+ if err != nil {
+ return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.Typename)
+ }
+
+ for len(e.staging) > 0 {
+ var author identity.Interface
+ var toCommit []Operation
+
+ // Split into chunks with the same author
+ for len(e.staging) > 0 {
+ op := e.staging[0]
+ if author != nil && op.Author().Id() != author.Id() {
+ break
+ }
+ author = e.staging[0].Author()
+ toCommit = append(toCommit, op)
+ e.staging = e.staging[1:]
+ }
+
+ e.editTime, err = repo.Increment(fmt.Sprintf(editClockPattern, e.Namespace))
+ if err != nil {
+ return err
+ }
+
+ opp := &operationPack{
+ Author: author,
+ Operations: toCommit,
+ EditTime: e.editTime,
+ }
+
+ if e.lastCommit == "" {
+ e.createTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.Namespace))
+ if err != nil {
+ return err
+ }
+ opp.CreateTime = e.createTime
+ }
+
+ var parentCommit []repository.Hash
+ if e.lastCommit != "" {
+ parentCommit = []repository.Hash{e.lastCommit}
+ }
+
+ commitHash, err := opp.Write(e.Definition, repo, parentCommit...)
+ if err != nil {
+ return err
+ }
+
+ e.lastCommit = commitHash
+ e.ops = append(e.ops, toCommit...)
+ }
+
+ // not strictly necessary but make equality testing easier in tests
+ e.staging = nil
+
+ // Create or update the Git reference for this entity
+ // When pushing later, the remote will ensure that this ref update
+ // is fast-forward, that is no data has been overwritten.
+ ref := fmt.Sprintf(refsPattern, e.Namespace, e.Id().String())
+ return repo.UpdateRef(ref, e.lastCommit)
+}
+
+// CreateLamportTime return the Lamport time of creation
+func (e *Entity) CreateLamportTime() lamport.Time {
+ return e.createTime
+}
+
+// EditLamportTime return the Lamport time of the last edition
+func (e *Entity) EditLamportTime() lamport.Time {
+ return e.editTime
+}
diff --git a/entity/dag/entity_actions.go b/entity/dag/entity_actions.go
new file mode 100644
index 00000000..2926e992
--- /dev/null
+++ b/entity/dag/entity_actions.go
@@ -0,0 +1,260 @@
+package dag
+
+import (
+ "fmt"
+
+ "github.com/pkg/errors"
+
+ "github.com/MichaelMure/git-bug/entity"
+ "github.com/MichaelMure/git-bug/identity"
+ "github.com/MichaelMure/git-bug/repository"
+)
+
+// ListLocalIds list all the available local Entity's Id
+func ListLocalIds(def Definition, repo repository.RepoData) ([]entity.Id, error) {
+ refs, err := repo.ListRefs(fmt.Sprintf("refs/%s/", def.Namespace))
+ if err != nil {
+ return nil, err
+ }
+ return entity.RefsToIds(refs), nil
+}
+
+// Fetch retrieve updates from a remote
+// This does not change the local entity state
+func Fetch(def Definition, repo repository.Repo, remote string) (string, error) {
+ return repo.FetchRefs(remote, def.Namespace)
+}
+
+// Push update a remote with the local changes
+func Push(def Definition, repo repository.Repo, remote string) (string, error) {
+ return repo.PushRefs(remote, def.Namespace)
+}
+
+// Pull will do a Fetch + MergeAll
+// Contrary to MergeAll, this function will return an error if a merge fail.
+func Pull(def Definition, repo repository.ClockedRepo, resolver identity.Resolver, remote string, author identity.Interface) error {
+ _, err := Fetch(def, repo, remote)
+ if err != nil {
+ return err
+ }
+
+ for merge := range MergeAll(def, repo, resolver, remote, author) {
+ if merge.Err != nil {
+ return merge.Err
+ }
+ if merge.Status == entity.MergeStatusInvalid {
+ return errors.Errorf("merge failure: %s", merge.Reason)
+ }
+ }
+
+ return nil
+}
+
+// MergeAll will merge all the available remote Entity:
+//
+// Multiple scenario exist:
+// 1. if the remote Entity doesn't exist locally, it's created
+// --> emit entity.MergeStatusNew
+// 2. if the remote and local Entity have the same state, nothing is changed
+// --> emit entity.MergeStatusNothing
+// 3. if the local Entity has new commits but the remote don't, nothing is changed
+// --> emit entity.MergeStatusNothing
+// 4. if the remote has new commit, the local bug is updated to match the same history
+// (fast-forward update)
+// --> emit entity.MergeStatusUpdated
+// 5. if both local and remote Entity have new commits (that is, we have a concurrent edition),
+// a merge commit with an empty operationPack is created to join both branch and form a DAG.
+// --> emit entity.MergeStatusUpdated
+//
+// Note: an author is necessary for the case where a merge commit is created, as this commit will
+// have an author and may be signed if a signing key is available.
+func MergeAll(def Definition, repo repository.ClockedRepo, resolver identity.Resolver, remote string, author identity.Interface) <-chan entity.MergeResult {
+ out := make(chan entity.MergeResult)
+
+ go func() {
+ defer close(out)
+
+ remoteRefSpec := fmt.Sprintf("refs/remotes/%s/%s/", remote, def.Namespace)
+ remoteRefs, err := repo.ListRefs(remoteRefSpec)
+ if err != nil {
+ out <- entity.MergeResult{Err: err}
+ return
+ }
+
+ for _, remoteRef := range remoteRefs {
+ out <- merge(def, repo, resolver, remoteRef, author)
+ }
+ }()
+
+ return out
+}
+
+// merge perform a merge to make sure a local Entity is up to date.
+// See MergeAll for more details.
+func merge(def Definition, repo repository.ClockedRepo, resolver identity.Resolver, remoteRef string, author identity.Interface) entity.MergeResult {
+ id := entity.RefToId(remoteRef)
+
+ if err := id.Validate(); err != nil {
+ return entity.NewMergeInvalidStatus(id, errors.Wrap(err, "invalid ref").Error())
+ }
+
+ remoteEntity, err := read(def, repo, resolver, remoteRef)
+ if err != nil {
+ return entity.NewMergeInvalidStatus(id,
+ errors.Wrapf(err, "remote %s is not readable", def.Typename).Error())
+ }
+
+ // Check for error in remote data
+ if err := remoteEntity.Validate(); err != nil {
+ return entity.NewMergeInvalidStatus(id,
+ errors.Wrapf(err, "remote %s data is invalid", def.Typename).Error())
+ }
+
+ localRef := fmt.Sprintf("refs/%s/%s", def.Namespace, id.String())
+
+ // SCENARIO 1
+ // if the remote Entity doesn't exist locally, it's created
+
+ localExist, err := repo.RefExist(localRef)
+ if err != nil {
+ return entity.NewMergeError(err, id)
+ }
+
+ if !localExist {
+ // the bug is not local yet, simply create the reference
+ err := repo.CopyRef(remoteRef, localRef)
+ if err != nil {
+ return entity.NewMergeError(err, id)
+ }
+
+ return entity.NewMergeNewStatus(id, remoteEntity)
+ }
+
+ localCommit, err := repo.ResolveRef(localRef)
+ if err != nil {
+ return entity.NewMergeError(err, id)
+ }
+
+ remoteCommit, err := repo.ResolveRef(remoteRef)
+ if err != nil {
+ return entity.NewMergeError(err, id)
+ }
+
+ // SCENARIO 2
+ // if the remote and local Entity have the same state, nothing is changed
+
+ if localCommit == remoteCommit {
+ // nothing to merge
+ return entity.NewMergeNothingStatus(id)
+ }
+
+ // SCENARIO 3
+ // if the local Entity has new commits but the remote don't, nothing is changed
+
+ localCommits, err := repo.ListCommits(localRef)
+ if err != nil {
+ return entity.NewMergeError(err, id)
+ }
+
+ for _, hash := range localCommits {
+ if hash == remoteCommit {
+ return entity.NewMergeNothingStatus(id)
+ }
+ }
+
+ // SCENARIO 4
+ // if the remote has new commit, the local bug is updated to match the same history
+ // (fast-forward update)
+
+ remoteCommits, err := repo.ListCommits(remoteRef)
+ if err != nil {
+ return entity.NewMergeError(err, id)
+ }
+
+ // fast-forward is possible if otherRef include ref
+ fastForwardPossible := false
+ for _, hash := range remoteCommits {
+ if hash == localCommit {
+ fastForwardPossible = true
+ break
+ }
+ }
+
+ if fastForwardPossible {
+ err = repo.UpdateRef(localRef, remoteCommit)
+ if err != nil {
+ return entity.NewMergeError(err, id)
+ }
+ return entity.NewMergeUpdatedStatus(id, remoteEntity)
+ }
+
+ // SCENARIO 5
+ // if both local and remote Entity have new commits (that is, we have a concurrent edition),
+ // a merge commit with an empty operationPack is created to join both branch and form a DAG.
+
+ // fast-forward is not possible, we need to create a merge commit
+ // For simplicity when reading and to have clocks that record this change, we store
+ // an empty operationPack.
+ // First step is to collect those clocks.
+
+ localEntity, err := read(def, repo, resolver, localRef)
+ if err != nil {
+ return entity.NewMergeError(err, id)
+ }
+
+ editTime, err := repo.Increment(fmt.Sprintf(editClockPattern, def.Namespace))
+ if err != nil {
+ return entity.NewMergeError(err, id)
+ }
+
+ opp := &operationPack{
+ Author: author,
+ Operations: nil,
+ CreateTime: 0,
+ EditTime: editTime,
+ }
+
+ commitHash, err := opp.Write(def, repo, localCommit, remoteCommit)
+ if err != nil {
+ return entity.NewMergeError(err, id)
+ }
+
+ // finally update the ref
+ err = repo.UpdateRef(localRef, commitHash)
+ if err != nil {
+ return entity.NewMergeError(err, id)
+ }
+
+ // Note: we don't need to update localEntity state (lastCommit, operations...) as we
+ // discard it entirely anyway.
+
+ return entity.NewMergeUpdatedStatus(id, localEntity)
+}
+
+// Remove delete an Entity.
+// Remove is idempotent.
+func Remove(def Definition, repo repository.ClockedRepo, id entity.Id) error {
+ var matches []string
+
+ ref := fmt.Sprintf("refs/%s/%s", def.Namespace, id.String())
+ matches = append(matches, ref)
+
+ remotes, err := repo.GetRemotes()
+ if err != nil {
+ return err
+ }
+
+ for remote := range remotes {
+ ref = fmt.Sprintf("refs/remotes/%s/%s/%s", remote, def.Namespace, id.String())
+ matches = append(matches, ref)
+ }
+
+ for _, ref = range matches {
+ err = repo.RemoveRef(ref)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
diff --git a/entity/dag/entity_actions_test.go b/entity/dag/entity_actions_test.go
new file mode 100644
index 00000000..45e69c7d
--- /dev/null
+++ b/entity/dag/entity_actions_test.go
@@ -0,0 +1,412 @@
+package dag
+
+import (
+ "sort"
+ "strings"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/MichaelMure/git-bug/entity"
+ "github.com/MichaelMure/git-bug/repository"
+)
+
+func allEntities(t testing.TB, bugs <-chan StreamedEntity) []*Entity {
+ t.Helper()
+
+ var result []*Entity
+ for streamed := range bugs {
+ require.NoError(t, streamed.Err)
+
+ result = append(result, streamed.Entity)
+ }
+ return result
+}
+
+func TestEntityPushPull(t *testing.T) {
+ repoA, repoB, remote, id1, id2, resolver, def := makeTestContextRemote(t)
+ defer repository.CleanupTestRepos(repoA, repoB, remote)
+
+ // A --> remote --> B
+ e := New(def)
+ e.Append(newOp1(id1, "foo"))
+
+ err := e.Commit(repoA)
+ require.NoError(t, err)
+
+ _, err = Push(def, repoA, "remote")
+ require.NoError(t, err)
+
+ err = Pull(def, repoB, resolver, "remote", id1)
+ require.NoError(t, err)
+
+ entities := allEntities(t, ReadAll(def, repoB, resolver))
+ require.Len(t, entities, 1)
+
+ // B --> remote --> A
+ e = New(def)
+ e.Append(newOp2(id2, "bar"))
+
+ err = e.Commit(repoB)
+ require.NoError(t, err)
+
+ _, err = Push(def, repoB, "remote")
+ require.NoError(t, err)
+
+ err = Pull(def, repoA, resolver, "remote", id1)
+ require.NoError(t, err)
+
+ entities = allEntities(t, ReadAll(def, repoB, resolver))
+ require.Len(t, entities, 2)
+}
+
+func TestListLocalIds(t *testing.T) {
+ repoA, repoB, remote, id1, id2, resolver, def := makeTestContextRemote(t)
+ defer repository.CleanupTestRepos(repoA, repoB, remote)
+
+ // A --> remote --> B
+ e := New(def)
+ e.Append(newOp1(id1, "foo"))
+ err := e.Commit(repoA)
+ require.NoError(t, err)
+
+ e = New(def)
+ e.Append(newOp2(id2, "bar"))
+ err = e.Commit(repoA)
+ require.NoError(t, err)
+
+ listLocalIds(t, def, repoA, 2)
+ listLocalIds(t, def, repoB, 0)
+
+ _, err = Push(def, repoA, "remote")
+ require.NoError(t, err)
+
+ _, err = Fetch(def, repoB, "remote")
+ require.NoError(t, err)
+
+ listLocalIds(t, def, repoA, 2)
+ listLocalIds(t, def, repoB, 0)
+
+ err = Pull(def, repoB, resolver, "remote", id1)
+ require.NoError(t, err)
+
+ listLocalIds(t, def, repoA, 2)
+ listLocalIds(t, def, repoB, 2)
+}
+
+func listLocalIds(t *testing.T, def Definition, repo repository.RepoData, expectedCount int) {
+ ids, err := ListLocalIds(def, repo)
+ require.NoError(t, err)
+ require.Len(t, ids, expectedCount)
+}
+
+func assertMergeResults(t *testing.T, expected []entity.MergeResult, results <-chan entity.MergeResult) {
+ t.Helper()
+
+ var allResults []entity.MergeResult
+ for result := range results {
+ allResults = append(allResults, result)
+ }
+
+ require.Equal(t, len(expected), len(allResults))
+
+ sort.Slice(allResults, func(i, j int) bool {
+ return allResults[i].Id < allResults[j].Id
+ })
+ sort.Slice(expected, func(i, j int) bool {
+ return expected[i].Id < expected[j].Id
+ })
+
+ for i, result := range allResults {
+ require.NoError(t, result.Err)
+
+ require.Equal(t, expected[i].Id, result.Id)
+ require.Equal(t, expected[i].Status, result.Status)
+
+ switch result.Status {
+ case entity.MergeStatusNew, entity.MergeStatusUpdated:
+ require.NotNil(t, result.Entity)
+ require.Equal(t, expected[i].Id, result.Entity.Id())
+ }
+
+ i++
+ }
+}
+
+func assertEqualRefs(t *testing.T, repoA, repoB repository.RepoData, prefix string) {
+ t.Helper()
+
+ refsA, err := repoA.ListRefs("")
+ require.NoError(t, err)
+
+ var refsAFiltered []string
+ for _, ref := range refsA {
+ if strings.HasPrefix(ref, prefix) {
+ refsAFiltered = append(refsAFiltered, ref)
+ }
+ }
+
+ refsB, err := repoB.ListRefs("")
+ require.NoError(t, err)
+
+ var refsBFiltered []string
+ for _, ref := range refsB {
+ if strings.HasPrefix(ref, prefix) {
+ refsBFiltered = append(refsBFiltered, ref)
+ }
+ }
+
+ require.NotEmpty(t, refsAFiltered)
+ require.Equal(t, refsAFiltered, refsBFiltered)
+
+ for _, ref := range refsAFiltered {
+ commitA, err := repoA.ResolveRef(ref)
+ require.NoError(t, err)
+ commitB, err := repoB.ResolveRef(ref)
+ require.NoError(t, err)
+
+ require.Equal(t, commitA, commitB)
+ }
+}
+
+func assertNotEqualRefs(t *testing.T, repoA, repoB repository.RepoData, prefix string) {
+ t.Helper()
+
+ refsA, err := repoA.ListRefs("")
+ require.NoError(t, err)
+
+ var refsAFiltered []string
+ for _, ref := range refsA {
+ if strings.HasPrefix(ref, prefix) {
+ refsAFiltered = append(refsAFiltered, ref)
+ }
+ }
+
+ refsB, err := repoB.ListRefs("")
+ require.NoError(t, err)
+
+ var refsBFiltered []string
+ for _, ref := range refsB {
+ if strings.HasPrefix(ref, prefix) {
+ refsBFiltered = append(refsBFiltered, ref)
+ }
+ }
+
+ require.NotEmpty(t, refsAFiltered)
+ require.Equal(t, refsAFiltered, refsBFiltered)
+
+ for _, ref := range refsAFiltered {
+ commitA, err := repoA.ResolveRef(ref)
+ require.NoError(t, err)
+ commitB, err := repoB.ResolveRef(ref)
+ require.NoError(t, err)
+
+ require.NotEqual(t, commitA, commitB)
+ }
+}
+
+func TestMerge(t *testing.T) {
+ repoA, repoB, remote, id1, id2, resolver, def := makeTestContextRemote(t)
+ defer repository.CleanupTestRepos(repoA, repoB, remote)
+
+ // SCENARIO 1
+ // if the remote Entity doesn't exist locally, it's created
+
+ // 2 entities in repoA + push to remote
+ e1A := New(def)
+ e1A.Append(newOp1(id1, "foo"))
+ err := e1A.Commit(repoA)
+ require.NoError(t, err)
+
+ e2A := New(def)
+ e2A.Append(newOp2(id2, "bar"))
+ err = e2A.Commit(repoA)
+ require.NoError(t, err)
+
+ _, err = Push(def, repoA, "remote")
+ require.NoError(t, err)
+
+ // repoB: fetch + merge from remote
+
+ _, err = Fetch(def, repoB, "remote")
+ require.NoError(t, err)
+
+ results := MergeAll(def, repoB, resolver, "remote", id1)
+
+ assertMergeResults(t, []entity.MergeResult{
+ {
+ Id: e1A.Id(),
+ Status: entity.MergeStatusNew,
+ },
+ {
+ Id: e2A.Id(),
+ Status: entity.MergeStatusNew,
+ },
+ }, results)
+
+ assertEqualRefs(t, repoA, repoB, "refs/"+def.Namespace)
+
+ // SCENARIO 2
+ // if the remote and local Entity have the same state, nothing is changed
+
+ results = MergeAll(def, repoB, resolver, "remote", id1)
+
+ assertMergeResults(t, []entity.MergeResult{
+ {
+ Id: e1A.Id(),
+ Status: entity.MergeStatusNothing,
+ },
+ {
+ Id: e2A.Id(),
+ Status: entity.MergeStatusNothing,
+ },
+ }, results)
+
+ assertEqualRefs(t, repoA, repoB, "refs/"+def.Namespace)
+
+ // SCENARIO 3
+ // if the local Entity has new commits but the remote don't, nothing is changed
+
+ e1A.Append(newOp1(id1, "barbar"))
+ err = e1A.Commit(repoA)
+ require.NoError(t, err)
+
+ e2A.Append(newOp2(id2, "barbarbar"))
+ err = e2A.Commit(repoA)
+ require.NoError(t, err)
+
+ results = MergeAll(def, repoA, resolver, "remote", id1)
+
+ assertMergeResults(t, []entity.MergeResult{
+ {
+ Id: e1A.Id(),
+ Status: entity.MergeStatusNothing,
+ },
+ {
+ Id: e2A.Id(),
+ Status: entity.MergeStatusNothing,
+ },
+ }, results)
+
+ assertNotEqualRefs(t, repoA, repoB, "refs/"+def.Namespace)
+
+ // SCENARIO 4
+ // if the remote has new commit, the local bug is updated to match the same history
+ // (fast-forward update)
+
+ _, err = Push(def, repoA, "remote")
+ require.NoError(t, err)
+
+ _, err = Fetch(def, repoB, "remote")
+ require.NoError(t, err)
+
+ results = MergeAll(def, repoB, resolver, "remote", id1)
+
+ assertMergeResults(t, []entity.MergeResult{
+ {
+ Id: e1A.Id(),
+ Status: entity.MergeStatusUpdated,
+ },
+ {
+ Id: e2A.Id(),
+ Status: entity.MergeStatusUpdated,
+ },
+ }, results)
+
+ assertEqualRefs(t, repoA, repoB, "refs/"+def.Namespace)
+
+ // SCENARIO 5
+ // if both local and remote Entity have new commits (that is, we have a concurrent edition),
+ // a merge commit with an empty operationPack is created to join both branch and form a DAG.
+
+ e1A.Append(newOp1(id1, "barbarfoo"))
+ err = e1A.Commit(repoA)
+ require.NoError(t, err)
+
+ e2A.Append(newOp2(id2, "barbarbarfoo"))
+ err = e2A.Commit(repoA)
+ require.NoError(t, err)
+
+ e1B, err := Read(def, repoB, resolver, e1A.Id())
+ require.NoError(t, err)
+
+ e2B, err := Read(def, repoB, resolver, e2A.Id())
+ require.NoError(t, err)
+
+ e1B.Append(newOp1(id1, "barbarfoofoo"))
+ err = e1B.Commit(repoB)
+ require.NoError(t, err)
+
+ e2B.Append(newOp2(id2, "barbarbarfoofoo"))
+ err = e2B.Commit(repoB)
+ require.NoError(t, err)
+
+ _, err = Push(def, repoA, "remote")
+ require.NoError(t, err)
+
+ _, err = Fetch(def, repoB, "remote")
+ require.NoError(t, err)
+
+ results = MergeAll(def, repoB, resolver, "remote", id1)
+
+ assertMergeResults(t, []entity.MergeResult{
+ {
+ Id: e1A.Id(),
+ Status: entity.MergeStatusUpdated,
+ },
+ {
+ Id: e2A.Id(),
+ Status: entity.MergeStatusUpdated,
+ },
+ }, results)
+
+ assertNotEqualRefs(t, repoA, repoB, "refs/"+def.Namespace)
+
+ _, err = Push(def, repoB, "remote")
+ require.NoError(t, err)
+
+ _, err = Fetch(def, repoA, "remote")
+ require.NoError(t, err)
+
+ results = MergeAll(def, repoA, resolver, "remote", id1)
+
+ assertMergeResults(t, []entity.MergeResult{
+ {
+ Id: e1A.Id(),
+ Status: entity.MergeStatusUpdated,
+ },
+ {
+ Id: e2A.Id(),
+ Status: entity.MergeStatusUpdated,
+ },
+ }, results)
+
+ // make sure that the graphs become stable over multiple repo, due to the
+ // fast-forward
+ assertEqualRefs(t, repoA, repoB, "refs/"+def.Namespace)
+}
+
+func TestRemove(t *testing.T) {
+ repoA, repoB, remote, id1, _, resolver, def := makeTestContextRemote(t)
+ defer repository.CleanupTestRepos(repoA, repoB, remote)
+
+ e := New(def)
+ e.Append(newOp1(id1, "foo"))
+ require.NoError(t, e.Commit(repoA))
+
+ _, err := Push(def, repoA, "remote")
+ require.NoError(t, err)
+
+ err = Remove(def, repoA, e.Id())
+ require.NoError(t, err)
+
+ _, err = Read(def, repoA, resolver, e.Id())
+ require.Error(t, err)
+
+ _, err = readRemote(def, repoA, resolver, "remote", e.Id())
+ require.Error(t, err)
+
+ // Remove is idempotent
+ err = Remove(def, repoA, e.Id())
+ require.NoError(t, err)
+}
diff --git a/entity/dag/entity_test.go b/entity/dag/entity_test.go
new file mode 100644
index 00000000..6d621bbe
--- /dev/null
+++ b/entity/dag/entity_test.go
@@ -0,0 +1,68 @@
+package dag
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestWriteRead(t *testing.T) {
+ repo, id1, id2, resolver, def := makeTestContext()
+
+ entity := New(def)
+ require.False(t, entity.NeedCommit())
+
+ entity.Append(newOp1(id1, "foo"))
+ entity.Append(newOp2(id1, "bar"))
+
+ require.True(t, entity.NeedCommit())
+ require.NoError(t, entity.CommitAsNeeded(repo))
+ require.False(t, entity.NeedCommit())
+
+ entity.Append(newOp2(id2, "foobar"))
+ require.True(t, entity.NeedCommit())
+ require.NoError(t, entity.CommitAsNeeded(repo))
+ require.False(t, entity.NeedCommit())
+
+ read, err := Read(def, repo, resolver, entity.Id())
+ require.NoError(t, err)
+
+ assertEqualEntities(t, entity, read)
+}
+
+func TestWriteReadMultipleAuthor(t *testing.T) {
+ repo, id1, id2, resolver, def := makeTestContext()
+
+ entity := New(def)
+
+ entity.Append(newOp1(id1, "foo"))
+ entity.Append(newOp2(id2, "bar"))
+
+ require.NoError(t, entity.CommitAsNeeded(repo))
+
+ entity.Append(newOp2(id1, "foobar"))
+ require.NoError(t, entity.CommitAsNeeded(repo))
+
+ read, err := Read(def, repo, resolver, entity.Id())
+ require.NoError(t, err)
+
+ assertEqualEntities(t, entity, read)
+}
+
+func assertEqualEntities(t *testing.T, a, b *Entity) {
+ // testify doesn't support comparing functions and systematically fail if they are not nil
+ // so we have to set them to nil temporarily
+
+ backOpUnA := a.Definition.OperationUnmarshaler
+ backOpUnB := b.Definition.OperationUnmarshaler
+
+ a.Definition.OperationUnmarshaler = nil
+ b.Definition.OperationUnmarshaler = nil
+
+ defer func() {
+ a.Definition.OperationUnmarshaler = backOpUnA
+ b.Definition.OperationUnmarshaler = backOpUnB
+ }()
+
+ require.Equal(t, a, b)
+}
diff --git a/entity/dag/operation.go b/entity/dag/operation.go
new file mode 100644
index 00000000..a320859f
--- /dev/null
+++ b/entity/dag/operation.go
@@ -0,0 +1,48 @@
+package dag
+
+import (
+ "github.com/MichaelMure/git-bug/entity"
+ "github.com/MichaelMure/git-bug/identity"
+ "github.com/MichaelMure/git-bug/repository"
+)
+
+// Operation is a piece of data defining a change to reflect on the state of an Entity.
+// What this Operation or Entity's state looks like is not of the resort of this package as it only deals with the
+// data structure and storage.
+type Operation interface {
+ // Id return the Operation identifier
+ //
+ // Some care need to be taken to define a correct Id derivation and enough entropy in the data used to avoid
+ // collisions. Notably:
+ // - the Id of the first Operation will be used as the Id of the Entity. Collision need to be avoided across entities
+ // of the same type (example: no collision within the "bug" namespace).
+ // - collisions can also happen within the set of Operations of an Entity. Simple Operation might not have enough
+ // entropy to yield unique Ids (example: two "close" operation within the same second, same author).
+ // If this is a concern, it is recommended to include a piece of random data in the operation's data, to guarantee
+ // a minimal amount of entropy and avoid collision.
+ //
+ // Author's note: I tried to find a clever way around that inelegance (stuffing random useless data into the stored
+ // structure is not exactly elegant) but I failed to find a proper way. Essentially, anything that would reuse some
+ // other data (parent operation's Id, lamport clock) or the graph structure (depth) impose that the Id would only
+ // make sense in the context of the graph and yield some deep coupling between Entity and Operation. This in turn
+ // make the whole thing even less elegant.
+ //
+ // A common way to derive an Id will be to use the entity.DeriveId() function on the serialized operation data.
+ Id() entity.Id
+ // Validate check if the Operation data is valid
+ Validate() error
+ // Author returns the author of this operation
+ Author() identity.Interface
+}
+
+// OperationWithFiles is an extended Operation that has files dependency, stored in git.
+type OperationWithFiles interface {
+ Operation
+
+ // GetFiles return the files needed by this operation
+ // This implies that the Operation maintain and store internally the references to those files. This is how
+ // this information is read later, when loading from storage.
+ // For example, an operation that has a text value referencing some files would maintain a mapping (text ref -->
+ // hash).
+ GetFiles() []repository.Hash
+}
diff --git a/entity/dag/operation_pack.go b/entity/dag/operation_pack.go
new file mode 100644
index 00000000..72063c60
--- /dev/null
+++ b/entity/dag/operation_pack.go
@@ -0,0 +1,358 @@
+package dag
+
+import (
+ "encoding/json"
+ "fmt"
+ "strconv"
+ "strings"
+
+ "github.com/pkg/errors"
+ "golang.org/x/crypto/openpgp"
+
+ "github.com/MichaelMure/git-bug/entity"
+ "github.com/MichaelMure/git-bug/identity"
+ "github.com/MichaelMure/git-bug/repository"
+ "github.com/MichaelMure/git-bug/util/lamport"
+)
+
+const opsEntryName = "ops"
+const extraEntryName = "extra"
+const versionEntryPrefix = "version-"
+const createClockEntryPrefix = "create-clock-"
+const editClockEntryPrefix = "edit-clock-"
+
+// operationPack is a wrapper structure to store multiple operations in a single git blob.
+// Additionally, it holds and store the metadata for those operations.
+type operationPack struct {
+ // An identifier, taken from a hash of the serialized Operations.
+ id entity.Id
+
+ // The author of the Operations. Must be the same author for all the Operations.
+ Author identity.Interface
+ // The list of Operation stored in the operationPack
+ Operations []Operation
+ // Encode the entity's logical time of creation across all entities of the same type.
+ // Only exist on the root operationPack
+ CreateTime lamport.Time
+ // Encode the entity's logical time of last edition across all entities of the same type.
+ // Exist on all operationPack
+ EditTime lamport.Time
+}
+
+func (opp *operationPack) Id() entity.Id {
+ if opp.id == "" || opp.id == entity.UnsetId {
+ // This means we are trying to get the opp's Id *before* it has been stored.
+ // As the Id is computed based on the actual bytes written on the disk, we are going to predict
+ // those and then get the Id. This is safe as it will be the exact same code writing on disk later.
+
+ data, err := json.Marshal(opp)
+ if err != nil {
+ panic(err)
+ }
+ opp.id = entity.DeriveId(data)
+ }
+
+ return opp.id
+}
+
+func (opp *operationPack) MarshalJSON() ([]byte, error) {
+ return json.Marshal(struct {
+ Author identity.Interface `json:"author"`
+ Operations []Operation `json:"ops"`
+ }{
+ Author: opp.Author,
+ Operations: opp.Operations,
+ })
+}
+
+func (opp *operationPack) Validate() error {
+ if opp.Author == nil {
+ return fmt.Errorf("missing author")
+ }
+ for _, op := range opp.Operations {
+ if op.Author().Id() != opp.Author.Id() {
+ return fmt.Errorf("operation has different author than the operationPack's")
+ }
+ }
+ if opp.EditTime == 0 {
+ return fmt.Errorf("lamport edit time is zero")
+ }
+ return nil
+}
+
+// Write write the OperationPack in git, with zero, one or more parent commits.
+// If the repository has a keypair able to sign (that is, with a private key), the resulting commit is signed with that key.
+// Return the hash of the created commit.
+func (opp *operationPack) Write(def Definition, repo repository.Repo, parentCommit ...repository.Hash) (repository.Hash, error) {
+ if err := opp.Validate(); err != nil {
+ return "", err
+ }
+
+ // For different reason, we store the clocks and format version directly in the git tree.
+ // Version has to be accessible before any attempt to decode to return early with a unique error.
+ // Clocks could possibly be stored in the git blob but it's nice to separate data and metadata, and
+ // we are storing something directly in the tree already so why not.
+ //
+ // To have a valid Tree, we point the "fake" entries to always the same value, the empty blob.
+ emptyBlobHash, err := repo.StoreData([]byte{})
+ if err != nil {
+ return "", err
+ }
+
+ // Write the Ops as a Git blob containing the serialized array of operations
+ data, err := json.Marshal(opp)
+ if err != nil {
+ return "", err
+ }
+
+ // compute the Id while we have the serialized data
+ opp.id = entity.DeriveId(data)
+
+ hash, err := repo.StoreData(data)
+ if err != nil {
+ return "", err
+ }
+
+ // Make a Git tree referencing this blob and encoding the other values:
+ // - format version
+ // - clocks
+ // - extra data
+ tree := []repository.TreeEntry{
+ {ObjectType: repository.Blob, Hash: emptyBlobHash,
+ Name: fmt.Sprintf(versionEntryPrefix+"%d", def.FormatVersion)},
+ {ObjectType: repository.Blob, Hash: hash,
+ Name: opsEntryName},
+ {ObjectType: repository.Blob, Hash: emptyBlobHash,
+ Name: fmt.Sprintf(editClockEntryPrefix+"%d", opp.EditTime)},
+ }
+ if opp.CreateTime > 0 {
+ tree = append(tree, repository.TreeEntry{
+ ObjectType: repository.Blob,
+ Hash: emptyBlobHash,
+ Name: fmt.Sprintf(createClockEntryPrefix+"%d", opp.CreateTime),
+ })
+ }
+ if extraTree := opp.makeExtraTree(); len(extraTree) > 0 {
+ extraTreeHash, err := repo.StoreTree(extraTree)
+ if err != nil {
+ return "", err
+ }
+ tree = append(tree, repository.TreeEntry{
+ ObjectType: repository.Tree,
+ Hash: extraTreeHash,
+ Name: extraEntryName,
+ })
+ }
+
+ // Store the tree
+ treeHash, err := repo.StoreTree(tree)
+ if err != nil {
+ return "", err
+ }
+
+ // Write a Git commit referencing the tree, with the previous commit as parent
+ // If we have keys, sign.
+ var commitHash repository.Hash
+
+ // Sign the commit if we have a key
+ signingKey, err := opp.Author.SigningKey(repo)
+ if err != nil {
+ return "", err
+ }
+
+ if signingKey != nil {
+ commitHash, err = repo.StoreSignedCommit(treeHash, signingKey.PGPEntity(), parentCommit...)
+ } else {
+ commitHash, err = repo.StoreCommit(treeHash, parentCommit...)
+ }
+
+ if err != nil {
+ return "", err
+ }
+
+ return commitHash, nil
+}
+
+func (opp *operationPack) makeExtraTree() []repository.TreeEntry {
+ var tree []repository.TreeEntry
+ counter := 0
+ added := make(map[repository.Hash]interface{})
+
+ for _, ops := range opp.Operations {
+ ops, ok := ops.(OperationWithFiles)
+ if !ok {
+ continue
+ }
+
+ for _, file := range ops.GetFiles() {
+ if _, has := added[file]; !has {
+ tree = append(tree, repository.TreeEntry{
+ ObjectType: repository.Blob,
+ Hash: file,
+ // The name is not important here, we only need to
+ // reference the blob.
+ Name: fmt.Sprintf("file%d", counter),
+ })
+ counter++
+ added[file] = struct{}{}
+ }
+ }
+ }
+
+ return tree
+}
+
+// readOperationPack read the operationPack encoded in git at the given Tree hash.
+//
+// Validity of the Lamport clocks is left for the caller to decide.
+func readOperationPack(def Definition, repo repository.RepoData, resolver identity.Resolver, commit repository.Commit) (*operationPack, error) {
+ entries, err := repo.ReadTree(commit.TreeHash)
+ if err != nil {
+ return nil, err
+ }
+
+ // check the format version first, fail early instead of trying to read something
+ var version uint
+ for _, entry := range entries {
+ if strings.HasPrefix(entry.Name, versionEntryPrefix) {
+ v, err := strconv.ParseUint(strings.TrimPrefix(entry.Name, versionEntryPrefix), 10, 64)
+ if err != nil {
+ return nil, errors.Wrap(err, "can't read format version")
+ }
+ if v > 1<<12 {
+ return nil, fmt.Errorf("format version too big")
+ }
+ version = uint(v)
+ break
+ }
+ }
+ if version == 0 {
+ return nil, entity.NewErrUnknownFormat(def.FormatVersion)
+ }
+ if version != def.FormatVersion {
+ return nil, entity.NewErrInvalidFormat(version, def.FormatVersion)
+ }
+
+ var id entity.Id
+ var author identity.Interface
+ var ops []Operation
+ var createTime lamport.Time
+ var editTime lamport.Time
+
+ for _, entry := range entries {
+ switch {
+ case entry.Name == opsEntryName:
+ data, err := repo.ReadData(entry.Hash)
+ if err != nil {
+ return nil, errors.Wrap(err, "failed to read git blob data")
+ }
+ ops, author, err = unmarshallPack(def, resolver, data)
+ if err != nil {
+ return nil, err
+ }
+ id = entity.DeriveId(data)
+
+ case strings.HasPrefix(entry.Name, createClockEntryPrefix):
+ v, err := strconv.ParseUint(strings.TrimPrefix(entry.Name, createClockEntryPrefix), 10, 64)
+ if err != nil {
+ return nil, errors.Wrap(err, "can't read creation lamport time")
+ }
+ createTime = lamport.Time(v)
+
+ case strings.HasPrefix(entry.Name, editClockEntryPrefix):
+ v, err := strconv.ParseUint(strings.TrimPrefix(entry.Name, editClockEntryPrefix), 10, 64)
+ if err != nil {
+ return nil, errors.Wrap(err, "can't read edit lamport time")
+ }
+ editTime = lamport.Time(v)
+ }
+ }
+
+ // Verify signature if we expect one
+ keys := author.ValidKeysAtTime(fmt.Sprintf(editClockPattern, def.Namespace), editTime)
+ if len(keys) > 0 {
+ keyring := PGPKeyring(keys)
+ _, err = openpgp.CheckDetachedSignature(keyring, commit.SignedData, commit.Signature)
+ if err != nil {
+ return nil, fmt.Errorf("signature failure: %v", err)
+ }
+ }
+
+ return &operationPack{
+ id: id,
+ Author: author,
+ Operations: ops,
+ CreateTime: createTime,
+ EditTime: editTime,
+ }, nil
+}
+
+// unmarshallPack delegate the unmarshalling of the Operation's JSON to the decoding
+// function provided by the concrete entity. This gives access to the concrete type of each
+// Operation.
+func unmarshallPack(def Definition, resolver identity.Resolver, data []byte) ([]Operation, identity.Interface, error) {
+ aux := struct {
+ Author identity.IdentityStub `json:"author"`
+ Operations []json.RawMessage `json:"ops"`
+ }{}
+
+ if err := json.Unmarshal(data, &aux); err != nil {
+ return nil, nil, err
+ }
+
+ if aux.Author.Id() == "" || aux.Author.Id() == entity.UnsetId {
+ return nil, nil, fmt.Errorf("missing author")
+ }
+
+ author, err := resolver.ResolveIdentity(aux.Author.Id())
+ if err != nil {
+ return nil, nil, err
+ }
+
+ ops := make([]Operation, 0, len(aux.Operations))
+
+ for _, raw := range aux.Operations {
+ // delegate to specialized unmarshal function
+ op, err := def.OperationUnmarshaler(author, raw)
+ if err != nil {
+ return nil, nil, err
+ }
+ ops = append(ops, op)
+ }
+
+ return ops, author, nil
+}
+
+var _ openpgp.KeyRing = &PGPKeyring{}
+
+// PGPKeyring implement a openpgp.KeyRing from an slice of Key
+type PGPKeyring []*identity.Key
+
+func (pk PGPKeyring) KeysById(id uint64) []openpgp.Key {
+ var result []openpgp.Key
+ for _, key := range pk {
+ if key.Public().KeyId == id {
+ result = append(result, openpgp.Key{
+ PublicKey: key.Public(),
+ PrivateKey: key.Private(),
+ })
+ }
+ }
+ return result
+}
+
+func (pk PGPKeyring) KeysByIdUsage(id uint64, requiredUsage byte) []openpgp.Key {
+ // the only usage we care about is the ability to sign, which all keys should already be capable of
+ return pk.KeysById(id)
+}
+
+func (pk PGPKeyring) DecryptionKeys() []openpgp.Key {
+ result := make([]openpgp.Key, len(pk))
+ for i, key := range pk {
+ result[i] = openpgp.Key{
+ PublicKey: key.Public(),
+ PrivateKey: key.Private(),
+ }
+ }
+ return result
+}
diff --git a/entity/dag/operation_pack_test.go b/entity/dag/operation_pack_test.go
new file mode 100644
index 00000000..73960800
--- /dev/null
+++ b/entity/dag/operation_pack_test.go
@@ -0,0 +1,159 @@
+package dag
+
+import (
+ "math/rand"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/MichaelMure/git-bug/identity"
+ "github.com/MichaelMure/git-bug/repository"
+)
+
+func TestOperationPackReadWrite(t *testing.T) {
+ repo, id1, _, resolver, def := makeTestContext()
+
+ opp := &operationPack{
+ Author: id1,
+ Operations: []Operation{
+ newOp1(id1, "foo"),
+ newOp2(id1, "bar"),
+ },
+ CreateTime: 123,
+ EditTime: 456,
+ }
+
+ commitHash, err := opp.Write(def, repo)
+ require.NoError(t, err)
+
+ commit, err := repo.ReadCommit(commitHash)
+ require.NoError(t, err)
+
+ opp2, err := readOperationPack(def, repo, resolver, commit)
+ require.NoError(t, err)
+
+ require.Equal(t, opp, opp2)
+
+ // make sure we get the same Id with the same data
+ opp3 := &operationPack{
+ Author: id1,
+ Operations: []Operation{
+ newOp1(id1, "foo"),
+ newOp2(id1, "bar"),
+ },
+ CreateTime: 123,
+ EditTime: 456,
+ }
+ require.Equal(t, opp.Id(), opp3.Id())
+}
+
+func TestOperationPackSignedReadWrite(t *testing.T) {
+ repo, id1, _, resolver, def := makeTestContext()
+
+ err := id1.(*identity.Identity).Mutate(repo, func(orig *identity.Mutator) {
+ orig.Keys = append(orig.Keys, identity.GenerateKey())
+ })
+ require.NoError(t, err)
+
+ opp := &operationPack{
+ Author: id1,
+ Operations: []Operation{
+ newOp1(id1, "foo"),
+ newOp2(id1, "bar"),
+ },
+ CreateTime: 123,
+ EditTime: 456,
+ }
+
+ commitHash, err := opp.Write(def, repo)
+ require.NoError(t, err)
+
+ commit, err := repo.ReadCommit(commitHash)
+ require.NoError(t, err)
+
+ opp2, err := readOperationPack(def, repo, resolver, commit)
+ require.NoError(t, err)
+
+ require.Equal(t, opp, opp2)
+
+ // make sure we get the same Id with the same data
+ opp3 := &operationPack{
+ Author: id1,
+ Operations: []Operation{
+ newOp1(id1, "foo"),
+ newOp2(id1, "bar"),
+ },
+ CreateTime: 123,
+ EditTime: 456,
+ }
+ require.Equal(t, opp.Id(), opp3.Id())
+}
+
+func TestOperationPackFiles(t *testing.T) {
+ repo, id1, _, resolver, def := makeTestContext()
+
+ blobHash1, err := repo.StoreData(randomData())
+ require.NoError(t, err)
+
+ blobHash2, err := repo.StoreData(randomData())
+ require.NoError(t, err)
+
+ opp := &operationPack{
+ Author: id1,
+ Operations: []Operation{
+ newOp1(id1, "foo", blobHash1, blobHash2),
+ newOp1(id1, "foo", blobHash2),
+ },
+ CreateTime: 123,
+ EditTime: 456,
+ }
+
+ commitHash, err := opp.Write(def, repo)
+ require.NoError(t, err)
+
+ commit, err := repo.ReadCommit(commitHash)
+ require.NoError(t, err)
+
+ opp2, err := readOperationPack(def, repo, resolver, commit)
+ require.NoError(t, err)
+
+ require.Equal(t, opp, opp2)
+
+ require.ElementsMatch(t, opp2.Operations[0].(OperationWithFiles).GetFiles(), []repository.Hash{
+ blobHash1,
+ blobHash2,
+ })
+ require.ElementsMatch(t, opp2.Operations[1].(OperationWithFiles).GetFiles(), []repository.Hash{
+ blobHash2,
+ })
+
+ tree, err := repo.ReadTree(commit.TreeHash)
+ require.NoError(t, err)
+
+ extraTreeHash, ok := repository.SearchTreeEntry(tree, extraEntryName)
+ require.True(t, ok)
+
+ extraTree, err := repo.ReadTree(extraTreeHash.Hash)
+ require.NoError(t, err)
+ require.ElementsMatch(t, extraTree, []repository.TreeEntry{
+ {
+ ObjectType: repository.Blob,
+ Hash: blobHash1,
+ Name: "file0",
+ },
+ {
+ ObjectType: repository.Blob,
+ Hash: blobHash2,
+ Name: "file1",
+ },
+ })
+}
+
+func randomData() []byte {
+ var letterRunes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
+ b := make([]byte, 32)
+ for i := range b {
+ b[i] = letterRunes[rand.Intn(len(letterRunes))]
+ }
+ return b
+}