diff options
Diffstat (limited to 'entity/dag')
-rw-r--r-- | entity/dag/clock.go | 38 | ||||
-rw-r--r-- | entity/dag/common_test.go | 173 | ||||
-rw-r--r-- | entity/dag/entity.go | 439 | ||||
-rw-r--r-- | entity/dag/entity_actions.go | 260 | ||||
-rw-r--r-- | entity/dag/entity_actions_test.go | 412 | ||||
-rw-r--r-- | entity/dag/entity_test.go | 68 | ||||
-rw-r--r-- | entity/dag/operation.go | 48 | ||||
-rw-r--r-- | entity/dag/operation_pack.go | 358 | ||||
-rw-r--r-- | entity/dag/operation_pack_test.go | 159 |
9 files changed, 1955 insertions, 0 deletions
diff --git a/entity/dag/clock.go b/entity/dag/clock.go new file mode 100644 index 00000000..793fa1bf --- /dev/null +++ b/entity/dag/clock.go @@ -0,0 +1,38 @@ +package dag + +import ( + "fmt" + + "github.com/MichaelMure/git-bug/identity" + "github.com/MichaelMure/git-bug/repository" +) + +// ClockLoader is the repository.ClockLoader for Entity +func ClockLoader(defs ...Definition) repository.ClockLoader { + clocks := make([]string, 0, len(defs)*2) + for _, def := range defs { + clocks = append(clocks, fmt.Sprintf(creationClockPattern, def.Namespace)) + clocks = append(clocks, fmt.Sprintf(editClockPattern, def.Namespace)) + } + + return repository.ClockLoader{ + Clocks: clocks, + Witnesser: func(repo repository.ClockedRepo) error { + // we need to actually load the identities because of the commit signature check when reading, + // which require the full identities with crypto keys + resolver := identity.NewCachedResolver(identity.NewSimpleResolver(repo)) + + for _, def := range defs { + // we actually just need to read all entities, + // as that will create and update the clocks + // TODO: concurrent loading to be faster? + for b := range ReadAll(def, repo, resolver) { + if b.Err != nil { + return b.Err + } + } + } + return nil + }, + } +} diff --git a/entity/dag/common_test.go b/entity/dag/common_test.go new file mode 100644 index 00000000..25289b76 --- /dev/null +++ b/entity/dag/common_test.go @@ -0,0 +1,173 @@ +package dag + +import ( + "encoding/json" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/MichaelMure/git-bug/entity" + "github.com/MichaelMure/git-bug/identity" + "github.com/MichaelMure/git-bug/repository" +) + +// This file contains an example dummy entity to be used in the tests + +/* + Operations +*/ + +type op1 struct { + author identity.Interface + + OperationType int `json:"type"` + Field1 string `json:"field_1"` + Files []repository.Hash `json:"files"` +} + +func newOp1(author identity.Interface, field1 string, files ...repository.Hash) *op1 { + return &op1{author: author, OperationType: 1, Field1: field1, Files: files} +} + +func (o *op1) Id() entity.Id { + data, _ := json.Marshal(o) + return entity.DeriveId(data) +} + +func (o *op1) Validate() error { return nil } + +func (o *op1) Author() identity.Interface { + return o.author +} + +func (o *op1) GetFiles() []repository.Hash { + return o.Files +} + +type op2 struct { + author identity.Interface + + OperationType int `json:"type"` + Field2 string `json:"field_2"` +} + +func newOp2(author identity.Interface, field2 string) *op2 { + return &op2{author: author, OperationType: 2, Field2: field2} +} + +func (o *op2) Id() entity.Id { + data, _ := json.Marshal(o) + return entity.DeriveId(data) +} + +func (o *op2) Validate() error { return nil } + +func (o *op2) Author() identity.Interface { + return o.author +} + +func unmarshaler(author identity.Interface, raw json.RawMessage) (Operation, error) { + var t struct { + OperationType int `json:"type"` + } + + if err := json.Unmarshal(raw, &t); err != nil { + return nil, err + } + + switch t.OperationType { + case 1: + op := &op1{} + err := json.Unmarshal(raw, &op) + op.author = author + return op, err + case 2: + op := &op2{} + err := json.Unmarshal(raw, &op) + op.author = author + return op, err + default: + return nil, fmt.Errorf("unknown operation type %v", t.OperationType) + } +} + +/* + Identities + repo + definition +*/ + +func makeTestContext() (repository.ClockedRepo, identity.Interface, identity.Interface, identity.Resolver, Definition) { + repo := repository.NewMockRepo() + id1, id2, resolver, def := makeTestContextInternal(repo) + return repo, id1, id2, resolver, def +} + +func makeTestContextRemote(t *testing.T) (repository.ClockedRepo, repository.ClockedRepo, repository.ClockedRepo, identity.Interface, identity.Interface, identity.Resolver, Definition) { + repoA := repository.CreateGoGitTestRepo(false) + repoB := repository.CreateGoGitTestRepo(false) + remote := repository.CreateGoGitTestRepo(true) + + err := repoA.AddRemote("remote", remote.GetLocalRemote()) + require.NoError(t, err) + err = repoA.AddRemote("repoB", repoB.GetLocalRemote()) + require.NoError(t, err) + err = repoB.AddRemote("remote", remote.GetLocalRemote()) + require.NoError(t, err) + err = repoB.AddRemote("repoA", repoA.GetLocalRemote()) + require.NoError(t, err) + + id1, id2, resolver, def := makeTestContextInternal(repoA) + + // distribute the identities + _, err = identity.Push(repoA, "remote") + require.NoError(t, err) + err = identity.Pull(repoB, "remote") + require.NoError(t, err) + + return repoA, repoB, remote, id1, id2, resolver, def +} + +func makeTestContextInternal(repo repository.ClockedRepo) (identity.Interface, identity.Interface, identity.Resolver, Definition) { + id1, err := identity.NewIdentity(repo, "name1", "email1") + if err != nil { + panic(err) + } + err = id1.Commit(repo) + if err != nil { + panic(err) + } + id2, err := identity.NewIdentity(repo, "name2", "email2") + if err != nil { + panic(err) + } + err = id2.Commit(repo) + if err != nil { + panic(err) + } + + resolver := identityResolverFunc(func(id entity.Id) (identity.Interface, error) { + switch id { + case id1.Id(): + return id1, nil + case id2.Id(): + return id2, nil + default: + return nil, identity.ErrIdentityNotExist + } + }) + + def := Definition{ + Typename: "foo", + Namespace: "foos", + OperationUnmarshaler: unmarshaler, + FormatVersion: 1, + } + + return id1, id2, resolver, def +} + +type identityResolverFunc func(id entity.Id) (identity.Interface, error) + +func (fn identityResolverFunc) ResolveIdentity(id entity.Id) (identity.Interface, error) { + return fn(id) +} diff --git a/entity/dag/entity.go b/entity/dag/entity.go new file mode 100644 index 00000000..c4368514 --- /dev/null +++ b/entity/dag/entity.go @@ -0,0 +1,439 @@ +// Package dag contains the base common code to define an entity stored +// in a chain of git objects, supporting actions like Push, Pull and Merge. +package dag + +import ( + "encoding/json" + "fmt" + "sort" + + "github.com/pkg/errors" + + "github.com/MichaelMure/git-bug/entity" + "github.com/MichaelMure/git-bug/identity" + "github.com/MichaelMure/git-bug/repository" + "github.com/MichaelMure/git-bug/util/lamport" +) + +const refsPattern = "refs/%s/%s" +const creationClockPattern = "%s-create" +const editClockPattern = "%s-edit" + +// Definition hold the details defining one specialization of an Entity. +type Definition struct { + // the name of the entity (bug, pull-request, ...) + Typename string + // the Namespace in git (bugs, prs, ...) + Namespace string + // a function decoding a JSON message into an Operation + OperationUnmarshaler func(author identity.Interface, raw json.RawMessage) (Operation, error) + // the expected format version number, that can be used for data migration/upgrade + FormatVersion uint +} + +// Entity is a data structure stored in a chain of git objects, supporting actions like Push, Pull and Merge. +type Entity struct { + // A Lamport clock is a logical clock that allow to order event + // inside a distributed system. + // It must be the first field in this struct due to https://github.com/golang/go/issues/36606 + createTime lamport.Time + editTime lamport.Time + + Definition + + // operations that are already stored in the repository + ops []Operation + // operations not yet stored in the repository + staging []Operation + + lastCommit repository.Hash +} + +// New create an empty Entity +func New(definition Definition) *Entity { + return &Entity{ + Definition: definition, + } +} + +// Read will read and decode a stored local Entity from a repository +func Read(def Definition, repo repository.ClockedRepo, resolver identity.Resolver, id entity.Id) (*Entity, error) { + if err := id.Validate(); err != nil { + return nil, errors.Wrap(err, "invalid id") + } + + ref := fmt.Sprintf("refs/%s/%s", def.Namespace, id.String()) + + return read(def, repo, resolver, ref) +} + +// readRemote will read and decode a stored remote Entity from a repository +func readRemote(def Definition, repo repository.ClockedRepo, resolver identity.Resolver, remote string, id entity.Id) (*Entity, error) { + if err := id.Validate(); err != nil { + return nil, errors.Wrap(err, "invalid id") + } + + ref := fmt.Sprintf("refs/remotes/%s/%s/%s", def.Namespace, remote, id.String()) + + return read(def, repo, resolver, ref) +} + +// read fetch from git and decode an Entity at an arbitrary git reference. +func read(def Definition, repo repository.ClockedRepo, resolver identity.Resolver, ref string) (*Entity, error) { + rootHash, err := repo.ResolveRef(ref) + if err != nil { + return nil, err + } + + // Perform a breadth-first search to get a topological order of the DAG where we discover the + // parents commit and go back in time up to the chronological root + + queue := make([]repository.Hash, 0, 32) + visited := make(map[repository.Hash]struct{}) + BFSOrder := make([]repository.Commit, 0, 32) + + queue = append(queue, rootHash) + visited[rootHash] = struct{}{} + + for len(queue) > 0 { + // pop + hash := queue[0] + queue = queue[1:] + + commit, err := repo.ReadCommit(hash) + if err != nil { + return nil, err + } + + BFSOrder = append(BFSOrder, commit) + + for _, parent := range commit.Parents { + if _, ok := visited[parent]; !ok { + queue = append(queue, parent) + // mark as visited + visited[parent] = struct{}{} + } + } + } + + // Now, we can reverse this topological order and read the commits in an order where + // we are sure to have read all the chronological ancestors when we read a commit. + + // Next step is to: + // 1) read the operationPacks + // 2) make sure that the clocks causality respect the DAG topology. + + oppMap := make(map[repository.Hash]*operationPack) + var opsCount int + + for i := len(BFSOrder) - 1; i >= 0; i-- { + commit := BFSOrder[i] + isFirstCommit := i == len(BFSOrder)-1 + isMerge := len(commit.Parents) > 1 + + // Verify DAG structure: single chronological root, so only the root + // can have no parents. Said otherwise, the DAG need to have exactly + // one leaf. + if !isFirstCommit && len(commit.Parents) == 0 { + return nil, fmt.Errorf("multiple leafs in the entity DAG") + } + + opp, err := readOperationPack(def, repo, resolver, commit) + if err != nil { + return nil, err + } + + err = opp.Validate() + if err != nil { + return nil, err + } + + if isMerge && len(opp.Operations) > 0 { + return nil, fmt.Errorf("merge commit cannot have operations") + } + + // Check that the create lamport clock is set (not checked in Validate() as it's optional) + if isFirstCommit && opp.CreateTime <= 0 { + return nil, fmt.Errorf("creation lamport time not set") + } + + // make sure that the lamport clocks causality match the DAG topology + for _, parentHash := range commit.Parents { + parentPack, ok := oppMap[parentHash] + if !ok { + panic("DFS failed") + } + + if parentPack.EditTime >= opp.EditTime { + return nil, fmt.Errorf("lamport clock ordering doesn't match the DAG") + } + + // to avoid an attack where clocks are pushed toward the uint64 rollover, make sure + // that the clocks don't jump too far in the future + // we ignore merge commits here to allow merging after a loooong time without breaking anything, + // as long as there is one valid chain of small hops, it's fine. + if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 { + return nil, fmt.Errorf("lamport clock jumping too far in the future, likely an attack") + } + } + + oppMap[commit.Hash] = opp + opsCount += len(opp.Operations) + } + + // The clocks are fine, we witness them + for _, opp := range oppMap { + err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), opp.CreateTime) + if err != nil { + return nil, err + } + err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), opp.EditTime) + if err != nil { + return nil, err + } + } + + // Now that we know that the topological order and clocks are fine, we order the operationPacks + // based on the logical clocks, entirely ignoring the DAG topology + + oppSlice := make([]*operationPack, 0, len(oppMap)) + for _, pack := range oppMap { + oppSlice = append(oppSlice, pack) + } + sort.Slice(oppSlice, func(i, j int) bool { + // Primary ordering with the EditTime. + if oppSlice[i].EditTime != oppSlice[j].EditTime { + return oppSlice[i].EditTime < oppSlice[j].EditTime + } + // We have equal EditTime, which means we have concurrent edition over different machines and we + // can't tell which one came first. So, what now? We still need a total ordering and the most stable possible. + // As a secondary ordering, we can order based on a hash of the serialized Operations in the + // operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse. + // This is a lexicographic ordering on the stringified ID. + return oppSlice[i].Id() < oppSlice[j].Id() + }) + + // Now that we ordered the operationPacks, we have the order of the Operations + + ops := make([]Operation, 0, opsCount) + var createTime lamport.Time + var editTime lamport.Time + for _, pack := range oppSlice { + for _, operation := range pack.Operations { + ops = append(ops, operation) + } + if pack.CreateTime > createTime { + createTime = pack.CreateTime + } + if pack.EditTime > editTime { + editTime = pack.EditTime + } + } + + return &Entity{ + Definition: def, + ops: ops, + lastCommit: rootHash, + createTime: createTime, + editTime: editTime, + }, nil +} + +type StreamedEntity struct { + Entity *Entity + Err error +} + +// ReadAll read and parse all local Entity +func ReadAll(def Definition, repo repository.ClockedRepo, resolver identity.Resolver) <-chan StreamedEntity { + out := make(chan StreamedEntity) + + go func() { + defer close(out) + + refPrefix := fmt.Sprintf("refs/%s/", def.Namespace) + + refs, err := repo.ListRefs(refPrefix) + if err != nil { + out <- StreamedEntity{Err: err} + return + } + + for _, ref := range refs { + e, err := read(def, repo, resolver, ref) + + if err != nil { + out <- StreamedEntity{Err: err} + return + } + + out <- StreamedEntity{Entity: e} + } + }() + + return out +} + +// Id return the Entity identifier +func (e *Entity) Id() entity.Id { + // id is the id of the first operation + return e.FirstOp().Id() +} + +// Validate check if the Entity data is valid +func (e *Entity) Validate() error { + // non-empty + if len(e.ops) == 0 && len(e.staging) == 0 { + return fmt.Errorf("entity has no operations") + } + + // check if each operations are valid + for _, op := range e.ops { + if err := op.Validate(); err != nil { + return err + } + } + + // check if staging is valid if needed + for _, op := range e.staging { + if err := op.Validate(); err != nil { + return err + } + } + + // Check that there is no colliding operation's ID + ids := make(map[entity.Id]struct{}) + for _, op := range e.Operations() { + if _, ok := ids[op.Id()]; ok { + return fmt.Errorf("id collision: %s", op.Id()) + } + ids[op.Id()] = struct{}{} + } + + return nil +} + +// Operations return the ordered operations +func (e *Entity) Operations() []Operation { + return append(e.ops, e.staging...) +} + +// FirstOp lookup for the very first operation of the Entity +func (e *Entity) FirstOp() Operation { + for _, op := range e.ops { + return op + } + for _, op := range e.staging { + return op + } + return nil +} + +// LastOp lookup for the very last operation of the Entity +func (e *Entity) LastOp() Operation { + if len(e.staging) > 0 { + return e.staging[len(e.staging)-1] + } + if len(e.ops) > 0 { + return e.ops[len(e.ops)-1] + } + return nil +} + +// Append add a new Operation to the Entity +func (e *Entity) Append(op Operation) { + e.staging = append(e.staging, op) +} + +// NeedCommit indicate if the in-memory state changed and need to be commit in the repository +func (e *Entity) NeedCommit() bool { + return len(e.staging) > 0 +} + +// CommitAsNeeded execute a Commit only if necessary. This function is useful to avoid getting an error if the Entity +// is already in sync with the repository. +func (e *Entity) CommitAsNeeded(repo repository.ClockedRepo) error { + if e.NeedCommit() { + return e.Commit(repo) + } + return nil +} + +// Commit write the appended operations in the repository +func (e *Entity) Commit(repo repository.ClockedRepo) error { + if !e.NeedCommit() { + return fmt.Errorf("can't commit an entity with no pending operation") + } + + err := e.Validate() + if err != nil { + return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.Typename) + } + + for len(e.staging) > 0 { + var author identity.Interface + var toCommit []Operation + + // Split into chunks with the same author + for len(e.staging) > 0 { + op := e.staging[0] + if author != nil && op.Author().Id() != author.Id() { + break + } + author = e.staging[0].Author() + toCommit = append(toCommit, op) + e.staging = e.staging[1:] + } + + e.editTime, err = repo.Increment(fmt.Sprintf(editClockPattern, e.Namespace)) + if err != nil { + return err + } + + opp := &operationPack{ + Author: author, + Operations: toCommit, + EditTime: e.editTime, + } + + if e.lastCommit == "" { + e.createTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.Namespace)) + if err != nil { + return err + } + opp.CreateTime = e.createTime + } + + var parentCommit []repository.Hash + if e.lastCommit != "" { + parentCommit = []repository.Hash{e.lastCommit} + } + + commitHash, err := opp.Write(e.Definition, repo, parentCommit...) + if err != nil { + return err + } + + e.lastCommit = commitHash + e.ops = append(e.ops, toCommit...) + } + + // not strictly necessary but make equality testing easier in tests + e.staging = nil + + // Create or update the Git reference for this entity + // When pushing later, the remote will ensure that this ref update + // is fast-forward, that is no data has been overwritten. + ref := fmt.Sprintf(refsPattern, e.Namespace, e.Id().String()) + return repo.UpdateRef(ref, e.lastCommit) +} + +// CreateLamportTime return the Lamport time of creation +func (e *Entity) CreateLamportTime() lamport.Time { + return e.createTime +} + +// EditLamportTime return the Lamport time of the last edition +func (e *Entity) EditLamportTime() lamport.Time { + return e.editTime +} diff --git a/entity/dag/entity_actions.go b/entity/dag/entity_actions.go new file mode 100644 index 00000000..2926e992 --- /dev/null +++ b/entity/dag/entity_actions.go @@ -0,0 +1,260 @@ +package dag + +import ( + "fmt" + + "github.com/pkg/errors" + + "github.com/MichaelMure/git-bug/entity" + "github.com/MichaelMure/git-bug/identity" + "github.com/MichaelMure/git-bug/repository" +) + +// ListLocalIds list all the available local Entity's Id +func ListLocalIds(def Definition, repo repository.RepoData) ([]entity.Id, error) { + refs, err := repo.ListRefs(fmt.Sprintf("refs/%s/", def.Namespace)) + if err != nil { + return nil, err + } + return entity.RefsToIds(refs), nil +} + +// Fetch retrieve updates from a remote +// This does not change the local entity state +func Fetch(def Definition, repo repository.Repo, remote string) (string, error) { + return repo.FetchRefs(remote, def.Namespace) +} + +// Push update a remote with the local changes +func Push(def Definition, repo repository.Repo, remote string) (string, error) { + return repo.PushRefs(remote, def.Namespace) +} + +// Pull will do a Fetch + MergeAll +// Contrary to MergeAll, this function will return an error if a merge fail. +func Pull(def Definition, repo repository.ClockedRepo, resolver identity.Resolver, remote string, author identity.Interface) error { + _, err := Fetch(def, repo, remote) + if err != nil { + return err + } + + for merge := range MergeAll(def, repo, resolver, remote, author) { + if merge.Err != nil { + return merge.Err + } + if merge.Status == entity.MergeStatusInvalid { + return errors.Errorf("merge failure: %s", merge.Reason) + } + } + + return nil +} + +// MergeAll will merge all the available remote Entity: +// +// Multiple scenario exist: +// 1. if the remote Entity doesn't exist locally, it's created +// --> emit entity.MergeStatusNew +// 2. if the remote and local Entity have the same state, nothing is changed +// --> emit entity.MergeStatusNothing +// 3. if the local Entity has new commits but the remote don't, nothing is changed +// --> emit entity.MergeStatusNothing +// 4. if the remote has new commit, the local bug is updated to match the same history +// (fast-forward update) +// --> emit entity.MergeStatusUpdated +// 5. if both local and remote Entity have new commits (that is, we have a concurrent edition), +// a merge commit with an empty operationPack is created to join both branch and form a DAG. +// --> emit entity.MergeStatusUpdated +// +// Note: an author is necessary for the case where a merge commit is created, as this commit will +// have an author and may be signed if a signing key is available. +func MergeAll(def Definition, repo repository.ClockedRepo, resolver identity.Resolver, remote string, author identity.Interface) <-chan entity.MergeResult { + out := make(chan entity.MergeResult) + + go func() { + defer close(out) + + remoteRefSpec := fmt.Sprintf("refs/remotes/%s/%s/", remote, def.Namespace) + remoteRefs, err := repo.ListRefs(remoteRefSpec) + if err != nil { + out <- entity.MergeResult{Err: err} + return + } + + for _, remoteRef := range remoteRefs { + out <- merge(def, repo, resolver, remoteRef, author) + } + }() + + return out +} + +// merge perform a merge to make sure a local Entity is up to date. +// See MergeAll for more details. +func merge(def Definition, repo repository.ClockedRepo, resolver identity.Resolver, remoteRef string, author identity.Interface) entity.MergeResult { + id := entity.RefToId(remoteRef) + + if err := id.Validate(); err != nil { + return entity.NewMergeInvalidStatus(id, errors.Wrap(err, "invalid ref").Error()) + } + + remoteEntity, err := read(def, repo, resolver, remoteRef) + if err != nil { + return entity.NewMergeInvalidStatus(id, + errors.Wrapf(err, "remote %s is not readable", def.Typename).Error()) + } + + // Check for error in remote data + if err := remoteEntity.Validate(); err != nil { + return entity.NewMergeInvalidStatus(id, + errors.Wrapf(err, "remote %s data is invalid", def.Typename).Error()) + } + + localRef := fmt.Sprintf("refs/%s/%s", def.Namespace, id.String()) + + // SCENARIO 1 + // if the remote Entity doesn't exist locally, it's created + + localExist, err := repo.RefExist(localRef) + if err != nil { + return entity.NewMergeError(err, id) + } + + if !localExist { + // the bug is not local yet, simply create the reference + err := repo.CopyRef(remoteRef, localRef) + if err != nil { + return entity.NewMergeError(err, id) + } + + return entity.NewMergeNewStatus(id, remoteEntity) + } + + localCommit, err := repo.ResolveRef(localRef) + if err != nil { + return entity.NewMergeError(err, id) + } + + remoteCommit, err := repo.ResolveRef(remoteRef) + if err != nil { + return entity.NewMergeError(err, id) + } + + // SCENARIO 2 + // if the remote and local Entity have the same state, nothing is changed + + if localCommit == remoteCommit { + // nothing to merge + return entity.NewMergeNothingStatus(id) + } + + // SCENARIO 3 + // if the local Entity has new commits but the remote don't, nothing is changed + + localCommits, err := repo.ListCommits(localRef) + if err != nil { + return entity.NewMergeError(err, id) + } + + for _, hash := range localCommits { + if hash == remoteCommit { + return entity.NewMergeNothingStatus(id) + } + } + + // SCENARIO 4 + // if the remote has new commit, the local bug is updated to match the same history + // (fast-forward update) + + remoteCommits, err := repo.ListCommits(remoteRef) + if err != nil { + return entity.NewMergeError(err, id) + } + + // fast-forward is possible if otherRef include ref + fastForwardPossible := false + for _, hash := range remoteCommits { + if hash == localCommit { + fastForwardPossible = true + break + } + } + + if fastForwardPossible { + err = repo.UpdateRef(localRef, remoteCommit) + if err != nil { + return entity.NewMergeError(err, id) + } + return entity.NewMergeUpdatedStatus(id, remoteEntity) + } + + // SCENARIO 5 + // if both local and remote Entity have new commits (that is, we have a concurrent edition), + // a merge commit with an empty operationPack is created to join both branch and form a DAG. + + // fast-forward is not possible, we need to create a merge commit + // For simplicity when reading and to have clocks that record this change, we store + // an empty operationPack. + // First step is to collect those clocks. + + localEntity, err := read(def, repo, resolver, localRef) + if err != nil { + return entity.NewMergeError(err, id) + } + + editTime, err := repo.Increment(fmt.Sprintf(editClockPattern, def.Namespace)) + if err != nil { + return entity.NewMergeError(err, id) + } + + opp := &operationPack{ + Author: author, + Operations: nil, + CreateTime: 0, + EditTime: editTime, + } + + commitHash, err := opp.Write(def, repo, localCommit, remoteCommit) + if err != nil { + return entity.NewMergeError(err, id) + } + + // finally update the ref + err = repo.UpdateRef(localRef, commitHash) + if err != nil { + return entity.NewMergeError(err, id) + } + + // Note: we don't need to update localEntity state (lastCommit, operations...) as we + // discard it entirely anyway. + + return entity.NewMergeUpdatedStatus(id, localEntity) +} + +// Remove delete an Entity. +// Remove is idempotent. +func Remove(def Definition, repo repository.ClockedRepo, id entity.Id) error { + var matches []string + + ref := fmt.Sprintf("refs/%s/%s", def.Namespace, id.String()) + matches = append(matches, ref) + + remotes, err := repo.GetRemotes() + if err != nil { + return err + } + + for remote := range remotes { + ref = fmt.Sprintf("refs/remotes/%s/%s/%s", remote, def.Namespace, id.String()) + matches = append(matches, ref) + } + + for _, ref = range matches { + err = repo.RemoveRef(ref) + if err != nil { + return err + } + } + + return nil +} diff --git a/entity/dag/entity_actions_test.go b/entity/dag/entity_actions_test.go new file mode 100644 index 00000000..45e69c7d --- /dev/null +++ b/entity/dag/entity_actions_test.go @@ -0,0 +1,412 @@ +package dag + +import ( + "sort" + "strings" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/MichaelMure/git-bug/entity" + "github.com/MichaelMure/git-bug/repository" +) + +func allEntities(t testing.TB, bugs <-chan StreamedEntity) []*Entity { + t.Helper() + + var result []*Entity + for streamed := range bugs { + require.NoError(t, streamed.Err) + + result = append(result, streamed.Entity) + } + return result +} + +func TestEntityPushPull(t *testing.T) { + repoA, repoB, remote, id1, id2, resolver, def := makeTestContextRemote(t) + defer repository.CleanupTestRepos(repoA, repoB, remote) + + // A --> remote --> B + e := New(def) + e.Append(newOp1(id1, "foo")) + + err := e.Commit(repoA) + require.NoError(t, err) + + _, err = Push(def, repoA, "remote") + require.NoError(t, err) + + err = Pull(def, repoB, resolver, "remote", id1) + require.NoError(t, err) + + entities := allEntities(t, ReadAll(def, repoB, resolver)) + require.Len(t, entities, 1) + + // B --> remote --> A + e = New(def) + e.Append(newOp2(id2, "bar")) + + err = e.Commit(repoB) + require.NoError(t, err) + + _, err = Push(def, repoB, "remote") + require.NoError(t, err) + + err = Pull(def, repoA, resolver, "remote", id1) + require.NoError(t, err) + + entities = allEntities(t, ReadAll(def, repoB, resolver)) + require.Len(t, entities, 2) +} + +func TestListLocalIds(t *testing.T) { + repoA, repoB, remote, id1, id2, resolver, def := makeTestContextRemote(t) + defer repository.CleanupTestRepos(repoA, repoB, remote) + + // A --> remote --> B + e := New(def) + e.Append(newOp1(id1, "foo")) + err := e.Commit(repoA) + require.NoError(t, err) + + e = New(def) + e.Append(newOp2(id2, "bar")) + err = e.Commit(repoA) + require.NoError(t, err) + + listLocalIds(t, def, repoA, 2) + listLocalIds(t, def, repoB, 0) + + _, err = Push(def, repoA, "remote") + require.NoError(t, err) + + _, err = Fetch(def, repoB, "remote") + require.NoError(t, err) + + listLocalIds(t, def, repoA, 2) + listLocalIds(t, def, repoB, 0) + + err = Pull(def, repoB, resolver, "remote", id1) + require.NoError(t, err) + + listLocalIds(t, def, repoA, 2) + listLocalIds(t, def, repoB, 2) +} + +func listLocalIds(t *testing.T, def Definition, repo repository.RepoData, expectedCount int) { + ids, err := ListLocalIds(def, repo) + require.NoError(t, err) + require.Len(t, ids, expectedCount) +} + +func assertMergeResults(t *testing.T, expected []entity.MergeResult, results <-chan entity.MergeResult) { + t.Helper() + + var allResults []entity.MergeResult + for result := range results { + allResults = append(allResults, result) + } + + require.Equal(t, len(expected), len(allResults)) + + sort.Slice(allResults, func(i, j int) bool { + return allResults[i].Id < allResults[j].Id + }) + sort.Slice(expected, func(i, j int) bool { + return expected[i].Id < expected[j].Id + }) + + for i, result := range allResults { + require.NoError(t, result.Err) + + require.Equal(t, expected[i].Id, result.Id) + require.Equal(t, expected[i].Status, result.Status) + + switch result.Status { + case entity.MergeStatusNew, entity.MergeStatusUpdated: + require.NotNil(t, result.Entity) + require.Equal(t, expected[i].Id, result.Entity.Id()) + } + + i++ + } +} + +func assertEqualRefs(t *testing.T, repoA, repoB repository.RepoData, prefix string) { + t.Helper() + + refsA, err := repoA.ListRefs("") + require.NoError(t, err) + + var refsAFiltered []string + for _, ref := range refsA { + if strings.HasPrefix(ref, prefix) { + refsAFiltered = append(refsAFiltered, ref) + } + } + + refsB, err := repoB.ListRefs("") + require.NoError(t, err) + + var refsBFiltered []string + for _, ref := range refsB { + if strings.HasPrefix(ref, prefix) { + refsBFiltered = append(refsBFiltered, ref) + } + } + + require.NotEmpty(t, refsAFiltered) + require.Equal(t, refsAFiltered, refsBFiltered) + + for _, ref := range refsAFiltered { + commitA, err := repoA.ResolveRef(ref) + require.NoError(t, err) + commitB, err := repoB.ResolveRef(ref) + require.NoError(t, err) + + require.Equal(t, commitA, commitB) + } +} + +func assertNotEqualRefs(t *testing.T, repoA, repoB repository.RepoData, prefix string) { + t.Helper() + + refsA, err := repoA.ListRefs("") + require.NoError(t, err) + + var refsAFiltered []string + for _, ref := range refsA { + if strings.HasPrefix(ref, prefix) { + refsAFiltered = append(refsAFiltered, ref) + } + } + + refsB, err := repoB.ListRefs("") + require.NoError(t, err) + + var refsBFiltered []string + for _, ref := range refsB { + if strings.HasPrefix(ref, prefix) { + refsBFiltered = append(refsBFiltered, ref) + } + } + + require.NotEmpty(t, refsAFiltered) + require.Equal(t, refsAFiltered, refsBFiltered) + + for _, ref := range refsAFiltered { + commitA, err := repoA.ResolveRef(ref) + require.NoError(t, err) + commitB, err := repoB.ResolveRef(ref) + require.NoError(t, err) + + require.NotEqual(t, commitA, commitB) + } +} + +func TestMerge(t *testing.T) { + repoA, repoB, remote, id1, id2, resolver, def := makeTestContextRemote(t) + defer repository.CleanupTestRepos(repoA, repoB, remote) + + // SCENARIO 1 + // if the remote Entity doesn't exist locally, it's created + + // 2 entities in repoA + push to remote + e1A := New(def) + e1A.Append(newOp1(id1, "foo")) + err := e1A.Commit(repoA) + require.NoError(t, err) + + e2A := New(def) + e2A.Append(newOp2(id2, "bar")) + err = e2A.Commit(repoA) + require.NoError(t, err) + + _, err = Push(def, repoA, "remote") + require.NoError(t, err) + + // repoB: fetch + merge from remote + + _, err = Fetch(def, repoB, "remote") + require.NoError(t, err) + + results := MergeAll(def, repoB, resolver, "remote", id1) + + assertMergeResults(t, []entity.MergeResult{ + { + Id: e1A.Id(), + Status: entity.MergeStatusNew, + }, + { + Id: e2A.Id(), + Status: entity.MergeStatusNew, + }, + }, results) + + assertEqualRefs(t, repoA, repoB, "refs/"+def.Namespace) + + // SCENARIO 2 + // if the remote and local Entity have the same state, nothing is changed + + results = MergeAll(def, repoB, resolver, "remote", id1) + + assertMergeResults(t, []entity.MergeResult{ + { + Id: e1A.Id(), + Status: entity.MergeStatusNothing, + }, + { + Id: e2A.Id(), + Status: entity.MergeStatusNothing, + }, + }, results) + + assertEqualRefs(t, repoA, repoB, "refs/"+def.Namespace) + + // SCENARIO 3 + // if the local Entity has new commits but the remote don't, nothing is changed + + e1A.Append(newOp1(id1, "barbar")) + err = e1A.Commit(repoA) + require.NoError(t, err) + + e2A.Append(newOp2(id2, "barbarbar")) + err = e2A.Commit(repoA) + require.NoError(t, err) + + results = MergeAll(def, repoA, resolver, "remote", id1) + + assertMergeResults(t, []entity.MergeResult{ + { + Id: e1A.Id(), + Status: entity.MergeStatusNothing, + }, + { + Id: e2A.Id(), + Status: entity.MergeStatusNothing, + }, + }, results) + + assertNotEqualRefs(t, repoA, repoB, "refs/"+def.Namespace) + + // SCENARIO 4 + // if the remote has new commit, the local bug is updated to match the same history + // (fast-forward update) + + _, err = Push(def, repoA, "remote") + require.NoError(t, err) + + _, err = Fetch(def, repoB, "remote") + require.NoError(t, err) + + results = MergeAll(def, repoB, resolver, "remote", id1) + + assertMergeResults(t, []entity.MergeResult{ + { + Id: e1A.Id(), + Status: entity.MergeStatusUpdated, + }, + { + Id: e2A.Id(), + Status: entity.MergeStatusUpdated, + }, + }, results) + + assertEqualRefs(t, repoA, repoB, "refs/"+def.Namespace) + + // SCENARIO 5 + // if both local and remote Entity have new commits (that is, we have a concurrent edition), + // a merge commit with an empty operationPack is created to join both branch and form a DAG. + + e1A.Append(newOp1(id1, "barbarfoo")) + err = e1A.Commit(repoA) + require.NoError(t, err) + + e2A.Append(newOp2(id2, "barbarbarfoo")) + err = e2A.Commit(repoA) + require.NoError(t, err) + + e1B, err := Read(def, repoB, resolver, e1A.Id()) + require.NoError(t, err) + + e2B, err := Read(def, repoB, resolver, e2A.Id()) + require.NoError(t, err) + + e1B.Append(newOp1(id1, "barbarfoofoo")) + err = e1B.Commit(repoB) + require.NoError(t, err) + + e2B.Append(newOp2(id2, "barbarbarfoofoo")) + err = e2B.Commit(repoB) + require.NoError(t, err) + + _, err = Push(def, repoA, "remote") + require.NoError(t, err) + + _, err = Fetch(def, repoB, "remote") + require.NoError(t, err) + + results = MergeAll(def, repoB, resolver, "remote", id1) + + assertMergeResults(t, []entity.MergeResult{ + { + Id: e1A.Id(), + Status: entity.MergeStatusUpdated, + }, + { + Id: e2A.Id(), + Status: entity.MergeStatusUpdated, + }, + }, results) + + assertNotEqualRefs(t, repoA, repoB, "refs/"+def.Namespace) + + _, err = Push(def, repoB, "remote") + require.NoError(t, err) + + _, err = Fetch(def, repoA, "remote") + require.NoError(t, err) + + results = MergeAll(def, repoA, resolver, "remote", id1) + + assertMergeResults(t, []entity.MergeResult{ + { + Id: e1A.Id(), + Status: entity.MergeStatusUpdated, + }, + { + Id: e2A.Id(), + Status: entity.MergeStatusUpdated, + }, + }, results) + + // make sure that the graphs become stable over multiple repo, due to the + // fast-forward + assertEqualRefs(t, repoA, repoB, "refs/"+def.Namespace) +} + +func TestRemove(t *testing.T) { + repoA, repoB, remote, id1, _, resolver, def := makeTestContextRemote(t) + defer repository.CleanupTestRepos(repoA, repoB, remote) + + e := New(def) + e.Append(newOp1(id1, "foo")) + require.NoError(t, e.Commit(repoA)) + + _, err := Push(def, repoA, "remote") + require.NoError(t, err) + + err = Remove(def, repoA, e.Id()) + require.NoError(t, err) + + _, err = Read(def, repoA, resolver, e.Id()) + require.Error(t, err) + + _, err = readRemote(def, repoA, resolver, "remote", e.Id()) + require.Error(t, err) + + // Remove is idempotent + err = Remove(def, repoA, e.Id()) + require.NoError(t, err) +} diff --git a/entity/dag/entity_test.go b/entity/dag/entity_test.go new file mode 100644 index 00000000..6d621bbe --- /dev/null +++ b/entity/dag/entity_test.go @@ -0,0 +1,68 @@ +package dag + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestWriteRead(t *testing.T) { + repo, id1, id2, resolver, def := makeTestContext() + + entity := New(def) + require.False(t, entity.NeedCommit()) + + entity.Append(newOp1(id1, "foo")) + entity.Append(newOp2(id1, "bar")) + + require.True(t, entity.NeedCommit()) + require.NoError(t, entity.CommitAsNeeded(repo)) + require.False(t, entity.NeedCommit()) + + entity.Append(newOp2(id2, "foobar")) + require.True(t, entity.NeedCommit()) + require.NoError(t, entity.CommitAsNeeded(repo)) + require.False(t, entity.NeedCommit()) + + read, err := Read(def, repo, resolver, entity.Id()) + require.NoError(t, err) + + assertEqualEntities(t, entity, read) +} + +func TestWriteReadMultipleAuthor(t *testing.T) { + repo, id1, id2, resolver, def := makeTestContext() + + entity := New(def) + + entity.Append(newOp1(id1, "foo")) + entity.Append(newOp2(id2, "bar")) + + require.NoError(t, entity.CommitAsNeeded(repo)) + + entity.Append(newOp2(id1, "foobar")) + require.NoError(t, entity.CommitAsNeeded(repo)) + + read, err := Read(def, repo, resolver, entity.Id()) + require.NoError(t, err) + + assertEqualEntities(t, entity, read) +} + +func assertEqualEntities(t *testing.T, a, b *Entity) { + // testify doesn't support comparing functions and systematically fail if they are not nil + // so we have to set them to nil temporarily + + backOpUnA := a.Definition.OperationUnmarshaler + backOpUnB := b.Definition.OperationUnmarshaler + + a.Definition.OperationUnmarshaler = nil + b.Definition.OperationUnmarshaler = nil + + defer func() { + a.Definition.OperationUnmarshaler = backOpUnA + b.Definition.OperationUnmarshaler = backOpUnB + }() + + require.Equal(t, a, b) +} diff --git a/entity/dag/operation.go b/entity/dag/operation.go new file mode 100644 index 00000000..a320859f --- /dev/null +++ b/entity/dag/operation.go @@ -0,0 +1,48 @@ +package dag + +import ( + "github.com/MichaelMure/git-bug/entity" + "github.com/MichaelMure/git-bug/identity" + "github.com/MichaelMure/git-bug/repository" +) + +// Operation is a piece of data defining a change to reflect on the state of an Entity. +// What this Operation or Entity's state looks like is not of the resort of this package as it only deals with the +// data structure and storage. +type Operation interface { + // Id return the Operation identifier + // + // Some care need to be taken to define a correct Id derivation and enough entropy in the data used to avoid + // collisions. Notably: + // - the Id of the first Operation will be used as the Id of the Entity. Collision need to be avoided across entities + // of the same type (example: no collision within the "bug" namespace). + // - collisions can also happen within the set of Operations of an Entity. Simple Operation might not have enough + // entropy to yield unique Ids (example: two "close" operation within the same second, same author). + // If this is a concern, it is recommended to include a piece of random data in the operation's data, to guarantee + // a minimal amount of entropy and avoid collision. + // + // Author's note: I tried to find a clever way around that inelegance (stuffing random useless data into the stored + // structure is not exactly elegant) but I failed to find a proper way. Essentially, anything that would reuse some + // other data (parent operation's Id, lamport clock) or the graph structure (depth) impose that the Id would only + // make sense in the context of the graph and yield some deep coupling between Entity and Operation. This in turn + // make the whole thing even less elegant. + // + // A common way to derive an Id will be to use the entity.DeriveId() function on the serialized operation data. + Id() entity.Id + // Validate check if the Operation data is valid + Validate() error + // Author returns the author of this operation + Author() identity.Interface +} + +// OperationWithFiles is an extended Operation that has files dependency, stored in git. +type OperationWithFiles interface { + Operation + + // GetFiles return the files needed by this operation + // This implies that the Operation maintain and store internally the references to those files. This is how + // this information is read later, when loading from storage. + // For example, an operation that has a text value referencing some files would maintain a mapping (text ref --> + // hash). + GetFiles() []repository.Hash +} diff --git a/entity/dag/operation_pack.go b/entity/dag/operation_pack.go new file mode 100644 index 00000000..72063c60 --- /dev/null +++ b/entity/dag/operation_pack.go @@ -0,0 +1,358 @@ +package dag + +import ( + "encoding/json" + "fmt" + "strconv" + "strings" + + "github.com/pkg/errors" + "golang.org/x/crypto/openpgp" + + "github.com/MichaelMure/git-bug/entity" + "github.com/MichaelMure/git-bug/identity" + "github.com/MichaelMure/git-bug/repository" + "github.com/MichaelMure/git-bug/util/lamport" +) + +const opsEntryName = "ops" +const extraEntryName = "extra" +const versionEntryPrefix = "version-" +const createClockEntryPrefix = "create-clock-" +const editClockEntryPrefix = "edit-clock-" + +// operationPack is a wrapper structure to store multiple operations in a single git blob. +// Additionally, it holds and store the metadata for those operations. +type operationPack struct { + // An identifier, taken from a hash of the serialized Operations. + id entity.Id + + // The author of the Operations. Must be the same author for all the Operations. + Author identity.Interface + // The list of Operation stored in the operationPack + Operations []Operation + // Encode the entity's logical time of creation across all entities of the same type. + // Only exist on the root operationPack + CreateTime lamport.Time + // Encode the entity's logical time of last edition across all entities of the same type. + // Exist on all operationPack + EditTime lamport.Time +} + +func (opp *operationPack) Id() entity.Id { + if opp.id == "" || opp.id == entity.UnsetId { + // This means we are trying to get the opp's Id *before* it has been stored. + // As the Id is computed based on the actual bytes written on the disk, we are going to predict + // those and then get the Id. This is safe as it will be the exact same code writing on disk later. + + data, err := json.Marshal(opp) + if err != nil { + panic(err) + } + opp.id = entity.DeriveId(data) + } + + return opp.id +} + +func (opp *operationPack) MarshalJSON() ([]byte, error) { + return json.Marshal(struct { + Author identity.Interface `json:"author"` + Operations []Operation `json:"ops"` + }{ + Author: opp.Author, + Operations: opp.Operations, + }) +} + +func (opp *operationPack) Validate() error { + if opp.Author == nil { + return fmt.Errorf("missing author") + } + for _, op := range opp.Operations { + if op.Author().Id() != opp.Author.Id() { + return fmt.Errorf("operation has different author than the operationPack's") + } + } + if opp.EditTime == 0 { + return fmt.Errorf("lamport edit time is zero") + } + return nil +} + +// Write write the OperationPack in git, with zero, one or more parent commits. +// If the repository has a keypair able to sign (that is, with a private key), the resulting commit is signed with that key. +// Return the hash of the created commit. +func (opp *operationPack) Write(def Definition, repo repository.Repo, parentCommit ...repository.Hash) (repository.Hash, error) { + if err := opp.Validate(); err != nil { + return "", err + } + + // For different reason, we store the clocks and format version directly in the git tree. + // Version has to be accessible before any attempt to decode to return early with a unique error. + // Clocks could possibly be stored in the git blob but it's nice to separate data and metadata, and + // we are storing something directly in the tree already so why not. + // + // To have a valid Tree, we point the "fake" entries to always the same value, the empty blob. + emptyBlobHash, err := repo.StoreData([]byte{}) + if err != nil { + return "", err + } + + // Write the Ops as a Git blob containing the serialized array of operations + data, err := json.Marshal(opp) + if err != nil { + return "", err + } + + // compute the Id while we have the serialized data + opp.id = entity.DeriveId(data) + + hash, err := repo.StoreData(data) + if err != nil { + return "", err + } + + // Make a Git tree referencing this blob and encoding the other values: + // - format version + // - clocks + // - extra data + tree := []repository.TreeEntry{ + {ObjectType: repository.Blob, Hash: emptyBlobHash, + Name: fmt.Sprintf(versionEntryPrefix+"%d", def.FormatVersion)}, + {ObjectType: repository.Blob, Hash: hash, + Name: opsEntryName}, + {ObjectType: repository.Blob, Hash: emptyBlobHash, + Name: fmt.Sprintf(editClockEntryPrefix+"%d", opp.EditTime)}, + } + if opp.CreateTime > 0 { + tree = append(tree, repository.TreeEntry{ + ObjectType: repository.Blob, + Hash: emptyBlobHash, + Name: fmt.Sprintf(createClockEntryPrefix+"%d", opp.CreateTime), + }) + } + if extraTree := opp.makeExtraTree(); len(extraTree) > 0 { + extraTreeHash, err := repo.StoreTree(extraTree) + if err != nil { + return "", err + } + tree = append(tree, repository.TreeEntry{ + ObjectType: repository.Tree, + Hash: extraTreeHash, + Name: extraEntryName, + }) + } + + // Store the tree + treeHash, err := repo.StoreTree(tree) + if err != nil { + return "", err + } + + // Write a Git commit referencing the tree, with the previous commit as parent + // If we have keys, sign. + var commitHash repository.Hash + + // Sign the commit if we have a key + signingKey, err := opp.Author.SigningKey(repo) + if err != nil { + return "", err + } + + if signingKey != nil { + commitHash, err = repo.StoreSignedCommit(treeHash, signingKey.PGPEntity(), parentCommit...) + } else { + commitHash, err = repo.StoreCommit(treeHash, parentCommit...) + } + + if err != nil { + return "", err + } + + return commitHash, nil +} + +func (opp *operationPack) makeExtraTree() []repository.TreeEntry { + var tree []repository.TreeEntry + counter := 0 + added := make(map[repository.Hash]interface{}) + + for _, ops := range opp.Operations { + ops, ok := ops.(OperationWithFiles) + if !ok { + continue + } + + for _, file := range ops.GetFiles() { + if _, has := added[file]; !has { + tree = append(tree, repository.TreeEntry{ + ObjectType: repository.Blob, + Hash: file, + // The name is not important here, we only need to + // reference the blob. + Name: fmt.Sprintf("file%d", counter), + }) + counter++ + added[file] = struct{}{} + } + } + } + + return tree +} + +// readOperationPack read the operationPack encoded in git at the given Tree hash. +// +// Validity of the Lamport clocks is left for the caller to decide. +func readOperationPack(def Definition, repo repository.RepoData, resolver identity.Resolver, commit repository.Commit) (*operationPack, error) { + entries, err := repo.ReadTree(commit.TreeHash) + if err != nil { + return nil, err + } + + // check the format version first, fail early instead of trying to read something + var version uint + for _, entry := range entries { + if strings.HasPrefix(entry.Name, versionEntryPrefix) { + v, err := strconv.ParseUint(strings.TrimPrefix(entry.Name, versionEntryPrefix), 10, 64) + if err != nil { + return nil, errors.Wrap(err, "can't read format version") + } + if v > 1<<12 { + return nil, fmt.Errorf("format version too big") + } + version = uint(v) + break + } + } + if version == 0 { + return nil, entity.NewErrUnknownFormat(def.FormatVersion) + } + if version != def.FormatVersion { + return nil, entity.NewErrInvalidFormat(version, def.FormatVersion) + } + + var id entity.Id + var author identity.Interface + var ops []Operation + var createTime lamport.Time + var editTime lamport.Time + + for _, entry := range entries { + switch { + case entry.Name == opsEntryName: + data, err := repo.ReadData(entry.Hash) + if err != nil { + return nil, errors.Wrap(err, "failed to read git blob data") + } + ops, author, err = unmarshallPack(def, resolver, data) + if err != nil { + return nil, err + } + id = entity.DeriveId(data) + + case strings.HasPrefix(entry.Name, createClockEntryPrefix): + v, err := strconv.ParseUint(strings.TrimPrefix(entry.Name, createClockEntryPrefix), 10, 64) + if err != nil { + return nil, errors.Wrap(err, "can't read creation lamport time") + } + createTime = lamport.Time(v) + + case strings.HasPrefix(entry.Name, editClockEntryPrefix): + v, err := strconv.ParseUint(strings.TrimPrefix(entry.Name, editClockEntryPrefix), 10, 64) + if err != nil { + return nil, errors.Wrap(err, "can't read edit lamport time") + } + editTime = lamport.Time(v) + } + } + + // Verify signature if we expect one + keys := author.ValidKeysAtTime(fmt.Sprintf(editClockPattern, def.Namespace), editTime) + if len(keys) > 0 { + keyring := PGPKeyring(keys) + _, err = openpgp.CheckDetachedSignature(keyring, commit.SignedData, commit.Signature) + if err != nil { + return nil, fmt.Errorf("signature failure: %v", err) + } + } + + return &operationPack{ + id: id, + Author: author, + Operations: ops, + CreateTime: createTime, + EditTime: editTime, + }, nil +} + +// unmarshallPack delegate the unmarshalling of the Operation's JSON to the decoding +// function provided by the concrete entity. This gives access to the concrete type of each +// Operation. +func unmarshallPack(def Definition, resolver identity.Resolver, data []byte) ([]Operation, identity.Interface, error) { + aux := struct { + Author identity.IdentityStub `json:"author"` + Operations []json.RawMessage `json:"ops"` + }{} + + if err := json.Unmarshal(data, &aux); err != nil { + return nil, nil, err + } + + if aux.Author.Id() == "" || aux.Author.Id() == entity.UnsetId { + return nil, nil, fmt.Errorf("missing author") + } + + author, err := resolver.ResolveIdentity(aux.Author.Id()) + if err != nil { + return nil, nil, err + } + + ops := make([]Operation, 0, len(aux.Operations)) + + for _, raw := range aux.Operations { + // delegate to specialized unmarshal function + op, err := def.OperationUnmarshaler(author, raw) + if err != nil { + return nil, nil, err + } + ops = append(ops, op) + } + + return ops, author, nil +} + +var _ openpgp.KeyRing = &PGPKeyring{} + +// PGPKeyring implement a openpgp.KeyRing from an slice of Key +type PGPKeyring []*identity.Key + +func (pk PGPKeyring) KeysById(id uint64) []openpgp.Key { + var result []openpgp.Key + for _, key := range pk { + if key.Public().KeyId == id { + result = append(result, openpgp.Key{ + PublicKey: key.Public(), + PrivateKey: key.Private(), + }) + } + } + return result +} + +func (pk PGPKeyring) KeysByIdUsage(id uint64, requiredUsage byte) []openpgp.Key { + // the only usage we care about is the ability to sign, which all keys should already be capable of + return pk.KeysById(id) +} + +func (pk PGPKeyring) DecryptionKeys() []openpgp.Key { + result := make([]openpgp.Key, len(pk)) + for i, key := range pk { + result[i] = openpgp.Key{ + PublicKey: key.Public(), + PrivateKey: key.Private(), + } + } + return result +} diff --git a/entity/dag/operation_pack_test.go b/entity/dag/operation_pack_test.go new file mode 100644 index 00000000..73960800 --- /dev/null +++ b/entity/dag/operation_pack_test.go @@ -0,0 +1,159 @@ +package dag + +import ( + "math/rand" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/MichaelMure/git-bug/identity" + "github.com/MichaelMure/git-bug/repository" +) + +func TestOperationPackReadWrite(t *testing.T) { + repo, id1, _, resolver, def := makeTestContext() + + opp := &operationPack{ + Author: id1, + Operations: []Operation{ + newOp1(id1, "foo"), + newOp2(id1, "bar"), + }, + CreateTime: 123, + EditTime: 456, + } + + commitHash, err := opp.Write(def, repo) + require.NoError(t, err) + + commit, err := repo.ReadCommit(commitHash) + require.NoError(t, err) + + opp2, err := readOperationPack(def, repo, resolver, commit) + require.NoError(t, err) + + require.Equal(t, opp, opp2) + + // make sure we get the same Id with the same data + opp3 := &operationPack{ + Author: id1, + Operations: []Operation{ + newOp1(id1, "foo"), + newOp2(id1, "bar"), + }, + CreateTime: 123, + EditTime: 456, + } + require.Equal(t, opp.Id(), opp3.Id()) +} + +func TestOperationPackSignedReadWrite(t *testing.T) { + repo, id1, _, resolver, def := makeTestContext() + + err := id1.(*identity.Identity).Mutate(repo, func(orig *identity.Mutator) { + orig.Keys = append(orig.Keys, identity.GenerateKey()) + }) + require.NoError(t, err) + + opp := &operationPack{ + Author: id1, + Operations: []Operation{ + newOp1(id1, "foo"), + newOp2(id1, "bar"), + }, + CreateTime: 123, + EditTime: 456, + } + + commitHash, err := opp.Write(def, repo) + require.NoError(t, err) + + commit, err := repo.ReadCommit(commitHash) + require.NoError(t, err) + + opp2, err := readOperationPack(def, repo, resolver, commit) + require.NoError(t, err) + + require.Equal(t, opp, opp2) + + // make sure we get the same Id with the same data + opp3 := &operationPack{ + Author: id1, + Operations: []Operation{ + newOp1(id1, "foo"), + newOp2(id1, "bar"), + }, + CreateTime: 123, + EditTime: 456, + } + require.Equal(t, opp.Id(), opp3.Id()) +} + +func TestOperationPackFiles(t *testing.T) { + repo, id1, _, resolver, def := makeTestContext() + + blobHash1, err := repo.StoreData(randomData()) + require.NoError(t, err) + + blobHash2, err := repo.StoreData(randomData()) + require.NoError(t, err) + + opp := &operationPack{ + Author: id1, + Operations: []Operation{ + newOp1(id1, "foo", blobHash1, blobHash2), + newOp1(id1, "foo", blobHash2), + }, + CreateTime: 123, + EditTime: 456, + } + + commitHash, err := opp.Write(def, repo) + require.NoError(t, err) + + commit, err := repo.ReadCommit(commitHash) + require.NoError(t, err) + + opp2, err := readOperationPack(def, repo, resolver, commit) + require.NoError(t, err) + + require.Equal(t, opp, opp2) + + require.ElementsMatch(t, opp2.Operations[0].(OperationWithFiles).GetFiles(), []repository.Hash{ + blobHash1, + blobHash2, + }) + require.ElementsMatch(t, opp2.Operations[1].(OperationWithFiles).GetFiles(), []repository.Hash{ + blobHash2, + }) + + tree, err := repo.ReadTree(commit.TreeHash) + require.NoError(t, err) + + extraTreeHash, ok := repository.SearchTreeEntry(tree, extraEntryName) + require.True(t, ok) + + extraTree, err := repo.ReadTree(extraTreeHash.Hash) + require.NoError(t, err) + require.ElementsMatch(t, extraTree, []repository.TreeEntry{ + { + ObjectType: repository.Blob, + Hash: blobHash1, + Name: "file0", + }, + { + ObjectType: repository.Blob, + Hash: blobHash2, + Name: "file1", + }, + }) +} + +func randomData() []byte { + var letterRunes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" + b := make([]byte, 32) + for i := range b { + b[i] = letterRunes[rand.Intn(len(letterRunes))] + } + return b +} |