From 8d63c983c982f93cc48d3996d6bd097ddeeb327f Mon Sep 17 00:00:00 2001 From: Michael Muré Date: Sun, 3 Jan 2021 23:59:25 +0100 Subject: WIP --- entity/TODO | 8 + entity/dag/common_test.go | 137 ++++++++++++++ entity/dag/entity.go | 389 ++++++++++++++++++++++++++++++++++++++ entity/dag/entity_actions.go | 227 ++++++++++++++++++++++ entity/dag/entity_test.go | 117 ++++++++++++ entity/dag/operation.go | 31 +++ entity/dag/operation_pack.go | 294 ++++++++++++++++++++++++++++ entity/dag/operation_pack_test.go | 44 +++++ entity/doc.go | 8 - entity/entity.go | 348 ---------------------------------- entity/entity_actions.go | 31 --- entity/entity_test.go | 107 ----------- entity/merge.go | 14 +- entity/operation_pack.go | 199 ------------------- entity/refs.go | 2 + 15 files changed, 1257 insertions(+), 699 deletions(-) create mode 100644 entity/TODO create mode 100644 entity/dag/common_test.go create mode 100644 entity/dag/entity.go create mode 100644 entity/dag/entity_actions.go create mode 100644 entity/dag/entity_test.go create mode 100644 entity/dag/operation.go create mode 100644 entity/dag/operation_pack.go create mode 100644 entity/dag/operation_pack_test.go delete mode 100644 entity/doc.go delete mode 100644 entity/entity.go delete mode 100644 entity/entity_actions.go delete mode 100644 entity/entity_test.go delete mode 100644 entity/operation_pack.go (limited to 'entity') diff --git a/entity/TODO b/entity/TODO new file mode 100644 index 00000000..fd3c9710 --- /dev/null +++ b/entity/TODO @@ -0,0 +1,8 @@ +- is the pack Lamport clock really useful vs only topological sort? + - topological order is enforced on the clocks, so what's the point? + - is EditTime equivalent to PackTime? no, avoid the gaps. Is it better? +- how to do commit signature? +- how to avoid id collision between Operations? +- write tests for actions +- migrate Bug to the new structure +- migrate Identity to the new structure? \ No newline at end of file diff --git a/entity/dag/common_test.go b/entity/dag/common_test.go new file mode 100644 index 00000000..29f1279e --- /dev/null +++ b/entity/dag/common_test.go @@ -0,0 +1,137 @@ +package dag + +import ( + "encoding/json" + "fmt" + + "github.com/MichaelMure/git-bug/entity" + "github.com/MichaelMure/git-bug/identity" + "github.com/MichaelMure/git-bug/repository" +) + +// This file contains an example dummy entity to be used in the tests + +/* + Operations +*/ + +type op1 struct { + author identity.Interface + + OperationType int `json:"type"` + Field1 string `json:"field_1"` +} + +func newOp1(author identity.Interface, field1 string) *op1 { + return &op1{author: author, OperationType: 1, Field1: field1} +} + +func (o op1) Id() entity.Id { + data, _ := json.Marshal(o) + return entity.DeriveId(data) +} + +func (o op1) Author() identity.Interface { + return o.author +} + +func (o op1) Validate() error { return nil } + +type op2 struct { + author identity.Interface + + OperationType int `json:"type"` + Field2 string `json:"field_2"` +} + +func newOp2(author identity.Interface, field2 string) *op2 { + return &op2{author: author, OperationType: 2, Field2: field2} +} + +func (o op2) Id() entity.Id { + data, _ := json.Marshal(o) + return entity.DeriveId(data) +} + +func (o op2) Author() identity.Interface { + return o.author +} + +func (o op2) Validate() error { return nil } + +func unmarshaler(author identity.Interface, raw json.RawMessage) (Operation, error) { + var t struct { + OperationType int `json:"type"` + } + + if err := json.Unmarshal(raw, &t); err != nil { + return nil, err + } + + switch t.OperationType { + case 1: + op := &op1{} + err := json.Unmarshal(raw, &op) + op.author = author + return op, err + case 2: + op := &op2{} + err := json.Unmarshal(raw, &op) + op.author = author + return op, err + default: + return nil, fmt.Errorf("unknown operation type %v", t.OperationType) + } +} + +/* + Identities + repo + definition +*/ + +func makeTestContext() (repository.ClockedRepo, identity.Interface, identity.Interface, Definition) { + repo := repository.NewMockRepo() + + id1, err := identity.NewIdentity(repo, "name1", "email1") + if err != nil { + panic(err) + } + err = id1.Commit(repo) + if err != nil { + panic(err) + } + id2, err := identity.NewIdentity(repo, "name2", "email2") + if err != nil { + panic(err) + } + err = id2.Commit(repo) + if err != nil { + panic(err) + } + + resolver := identityResolverFunc(func(id entity.Id) (identity.Interface, error) { + switch id { + case id1.Id(): + return id1, nil + case id2.Id(): + return id2, nil + default: + return nil, identity.ErrIdentityNotExist + } + }) + + def := Definition{ + typename: "foo", + namespace: "foos", + operationUnmarshaler: unmarshaler, + identityResolver: resolver, + formatVersion: 1, + } + + return repo, id1, id2, def +} + +type identityResolverFunc func(id entity.Id) (identity.Interface, error) + +func (fn identityResolverFunc) ResolveIdentity(id entity.Id) (identity.Interface, error) { + return fn(id) +} diff --git a/entity/dag/entity.go b/entity/dag/entity.go new file mode 100644 index 00000000..78347fa0 --- /dev/null +++ b/entity/dag/entity.go @@ -0,0 +1,389 @@ +// Package dag contains the base common code to define an entity stored +// in a chain of git objects, supporting actions like Push, Pull and Merge. +package dag + +import ( + "encoding/json" + "fmt" + "sort" + + "github.com/pkg/errors" + + "github.com/MichaelMure/git-bug/entity" + "github.com/MichaelMure/git-bug/identity" + "github.com/MichaelMure/git-bug/repository" + "github.com/MichaelMure/git-bug/util/lamport" +) + +const refsPattern = "refs/%s/%s" +const creationClockPattern = "%s-create" +const editClockPattern = "%s-edit" + +// Definition hold the details defining one specialization of an Entity. +type Definition struct { + // the name of the entity (bug, pull-request, ...) + typename string + // the namespace in git (bugs, prs, ...) + namespace string + // a function decoding a JSON message into an Operation + operationUnmarshaler func(author identity.Interface, raw json.RawMessage) (Operation, error) + // a function loading an identity.Identity from its Id + identityResolver identity.Resolver + // the expected format version number, that can be used for data migration/upgrade + formatVersion uint +} + +// Entity is a data structure stored in a chain of git objects, supporting actions like Push, Pull and Merge. +type Entity struct { + Definition + + // operations that are already stored in the repository + ops []Operation + // operations not yet stored in the repository + staging []Operation + + // TODO: add here createTime and editTime + + // // TODO: doesn't seems to actually be useful over the topological sort ? Timestamp can be generated from graph depth + // // TODO: maybe EditTime is better because it could spread ops in consecutive groups on the logical timeline --> avoid interleaving + // packClock lamport.Clock + lastCommit repository.Hash +} + +// New create an empty Entity +func New(definition Definition) *Entity { + return &Entity{ + Definition: definition, + // packClock: lamport.NewMemClock(), + } +} + +// Read will read and decode a stored Entity from a repository +func Read(def Definition, repo repository.ClockedRepo, id entity.Id) (*Entity, error) { + if err := id.Validate(); err != nil { + return nil, errors.Wrap(err, "invalid id") + } + + ref := fmt.Sprintf("refs/%s/%s", def.namespace, id.String()) + + return read(def, repo, ref) +} + +// read fetch from git and decode an Entity at an arbitrary git reference. +func read(def Definition, repo repository.ClockedRepo, ref string) (*Entity, error) { + rootHash, err := repo.ResolveRef(ref) + if err != nil { + return nil, err + } + + // Perform a depth-first search to get a topological order of the DAG where we discover the + // parents commit and go back in time up to the chronological root + + stack := make([]repository.Hash, 0, 32) + visited := make(map[repository.Hash]struct{}) + DFSOrder := make([]repository.Commit, 0, 32) + + stack = append(stack, rootHash) + + for len(stack) > 0 { + // pop + hash := stack[len(stack)-1] + stack = stack[:len(stack)-1] + + if _, ok := visited[hash]; ok { + continue + } + + // mark as visited + visited[hash] = struct{}{} + + commit, err := repo.ReadCommit(hash) + if err != nil { + return nil, err + } + + DFSOrder = append(DFSOrder, commit) + + for _, parent := range commit.Parents { + stack = append(stack, parent) + } + } + + // Now, we can reverse this topological order and read the commits in an order where + // we are sure to have read all the chronological ancestors when we read a commit. + + // Next step is to: + // 1) read the operationPacks + // 2) make sure that the clocks causality respect the DAG topology. + + oppMap := make(map[repository.Hash]*operationPack) + var opsCount int + // var packClock = lamport.NewMemClock() + + for i := len(DFSOrder) - 1; i >= 0; i-- { + commit := DFSOrder[i] + isFirstCommit := i == len(DFSOrder)-1 + isMerge := len(commit.Parents) > 1 + + // Verify DAG structure: single chronological root, so only the root + // can have no parents. Said otherwise, the DAG need to have exactly + // one leaf. + if !isFirstCommit && len(commit.Parents) == 0 { + return nil, fmt.Errorf("multiple leafs in the entity DAG") + } + + opp, err := readOperationPack(def, repo, commit) + if err != nil { + return nil, err + } + + err = opp.Validate() + if err != nil { + return nil, err + } + + // Check that the create lamport clock is set (not checked in Validate() as it's optional) + if isFirstCommit && opp.CreateTime <= 0 { + return nil, fmt.Errorf("creation lamport time not set") + } + + // make sure that the lamport clocks causality match the DAG topology + for _, parentHash := range commit.Parents { + parentPack, ok := oppMap[parentHash] + if !ok { + panic("DFS failed") + } + + if parentPack.EditTime >= opp.EditTime { + return nil, fmt.Errorf("lamport clock ordering doesn't match the DAG") + } + + // to avoid an attack where clocks are pushed toward the uint64 rollover, make sure + // that the clocks don't jump too far in the future + // we ignore merge commits here to allow merging after a loooong time without breaking anything, + // as long as there is one valid chain of small hops, it's fine. + if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 { + return nil, fmt.Errorf("lamport clock jumping too far in the future, likely an attack") + } + + // TODO: PackTime is not checked + } + + oppMap[commit.Hash] = opp + opsCount += len(opp.Operations) + } + + // The clocks are fine, we witness them + for _, opp := range oppMap { + err = repo.Witness(fmt.Sprintf(creationClockPattern, def.namespace), opp.CreateTime) + if err != nil { + return nil, err + } + err = repo.Witness(fmt.Sprintf(editClockPattern, def.namespace), opp.EditTime) + if err != nil { + return nil, err + } + // err = packClock.Witness(opp.PackTime) + // if err != nil { + // return nil, err + // } + } + + // Now that we know that the topological order and clocks are fine, we order the operationPacks + // based on the logical clocks, entirely ignoring the DAG topology + + oppSlice := make([]*operationPack, 0, len(oppMap)) + for _, pack := range oppMap { + oppSlice = append(oppSlice, pack) + } + sort.Slice(oppSlice, func(i, j int) bool { + // Primary ordering with the dedicated "pack" Lamport time that encode causality + // within the entity + // if oppSlice[i].PackTime != oppSlice[j].PackTime { + // return oppSlice[i].PackTime < oppSlice[i].PackTime + // } + // We have equal PackTime, which means we had a concurrent edition. We can't tell which exactly + // came first. As a secondary arbitrary ordering, we can use the EditTime. It's unlikely to be + // enough but it can give us an edge to approach what really happened. + if oppSlice[i].EditTime != oppSlice[j].EditTime { + return oppSlice[i].EditTime < oppSlice[j].EditTime + } + // Well, what now? We still need a total ordering and the most stable possible. + // As a last resort, we can order based on a hash of the serialized Operations in the + // operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse. + // This is a lexicographic ordering on the stringified ID. + return oppSlice[i].Id() < oppSlice[j].Id() + }) + + // Now that we ordered the operationPacks, we have the order of the Operations + + ops := make([]Operation, 0, opsCount) + for _, pack := range oppSlice { + for _, operation := range pack.Operations { + ops = append(ops, operation) + } + } + + return &Entity{ + Definition: def, + ops: ops, + // packClock: packClock, + lastCommit: rootHash, + }, nil +} + +// Id return the Entity identifier +func (e *Entity) Id() entity.Id { + // id is the id of the first operation + return e.FirstOp().Id() +} + +// Validate check if the Entity data is valid +func (e *Entity) Validate() error { + // non-empty + if len(e.ops) == 0 && len(e.staging) == 0 { + return fmt.Errorf("entity has no operations") + } + + // check if each operations are valid + for _, op := range e.ops { + if err := op.Validate(); err != nil { + return err + } + } + + // check if staging is valid if needed + for _, op := range e.staging { + if err := op.Validate(); err != nil { + return err + } + } + + // Check that there is no colliding operation's ID + ids := make(map[entity.Id]struct{}) + for _, op := range e.Operations() { + if _, ok := ids[op.Id()]; ok { + return fmt.Errorf("id collision: %s", op.Id()) + } + ids[op.Id()] = struct{}{} + } + + return nil +} + +// Operations return the ordered operations +func (e *Entity) Operations() []Operation { + return append(e.ops, e.staging...) +} + +// FirstOp lookup for the very first operation of the Entity +func (e *Entity) FirstOp() Operation { + for _, op := range e.ops { + return op + } + for _, op := range e.staging { + return op + } + return nil +} + +// LastOp lookup for the very last operation of the Entity +func (e *Entity) LastOp() Operation { + if len(e.staging) > 0 { + return e.staging[len(e.staging)-1] + } + if len(e.ops) > 0 { + return e.ops[len(e.ops)-1] + } + return nil +} + +// Append add a new Operation to the Entity +func (e *Entity) Append(op Operation) { + e.staging = append(e.staging, op) +} + +// NeedCommit indicate if the in-memory state changed and need to be commit in the repository +func (e *Entity) NeedCommit() bool { + return len(e.staging) > 0 +} + +// CommitAdNeeded execute a Commit only if necessary. This function is useful to avoid getting an error if the Entity +// is already in sync with the repository. +func (e *Entity) CommitAdNeeded(repo repository.ClockedRepo) error { + if e.NeedCommit() { + return e.Commit(repo) + } + return nil +} + +// Commit write the appended operations in the repository +// TODO: support commit signature +func (e *Entity) Commit(repo repository.ClockedRepo) error { + if !e.NeedCommit() { + return fmt.Errorf("can't commit an entity with no pending operation") + } + + if err := e.Validate(); err != nil { + return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.typename) + } + + var author identity.Interface + for _, op := range e.staging { + if author != nil && op.Author() != author { + return fmt.Errorf("operations with different author") + } + author = op.Author() + } + + // increment the various clocks for this new operationPack + // packTime, err := e.packClock.Increment() + // if err != nil { + // return err + // } + editTime, err := repo.Increment(fmt.Sprintf(editClockPattern, e.namespace)) + if err != nil { + return err + } + var creationTime lamport.Time + if e.lastCommit == "" { + creationTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.namespace)) + if err != nil { + return err + } + } + + opp := &operationPack{ + Author: author, + Operations: e.staging, + CreateTime: creationTime, + EditTime: editTime, + // PackTime: packTime, + } + + treeHash, err := opp.Write(e.Definition, repo) + if err != nil { + return err + } + + // Write a Git commit referencing the tree, with the previous commit as parent + var commitHash repository.Hash + if e.lastCommit != "" { + commitHash, err = repo.StoreCommit(treeHash, e.lastCommit) + } else { + commitHash, err = repo.StoreCommit(treeHash) + } + if err != nil { + return err + } + + e.lastCommit = commitHash + e.ops = append(e.ops, e.staging...) + e.staging = nil + + // Create or update the Git reference for this entity + // When pushing later, the remote will ensure that this ref update + // is fast-forward, that is no data has been overwritten. + ref := fmt.Sprintf(refsPattern, e.namespace, e.Id().String()) + return repo.UpdateRef(ref, commitHash) +} diff --git a/entity/dag/entity_actions.go b/entity/dag/entity_actions.go new file mode 100644 index 00000000..8dcf91e6 --- /dev/null +++ b/entity/dag/entity_actions.go @@ -0,0 +1,227 @@ +package dag + +import ( + "fmt" + + "github.com/pkg/errors" + + "github.com/MichaelMure/git-bug/entity" + "github.com/MichaelMure/git-bug/repository" +) + +func ListLocalIds(typename string, repo repository.RepoData) ([]entity.Id, error) { + refs, err := repo.ListRefs(fmt.Sprintf("refs/%s/", typename)) + if err != nil { + return nil, err + } + return entity.RefsToIds(refs), nil +} + +// Fetch retrieve updates from a remote +// This does not change the local entity state +func Fetch(def Definition, repo repository.Repo, remote string) (string, error) { + // "refs//*:refs/remotes///*" + fetchRefSpec := fmt.Sprintf("refs/%s/*:refs/remotes/%s/%s/*", + def.namespace, remote, def.namespace) + + return repo.FetchRefs(remote, fetchRefSpec) +} + +// Push update a remote with the local changes +func Push(def Definition, repo repository.Repo, remote string) (string, error) { + // "refs//*:refs//*" + refspec := fmt.Sprintf("refs/%s/*:refs/%s/*", + def.namespace, def.namespace) + + return repo.PushRefs(remote, refspec) +} + +// Pull will do a Fetch + MergeAll +// Contrary to MergeAll, this function will return an error if a merge fail. +func Pull(def Definition, repo repository.ClockedRepo, remote string) error { + _, err := Fetch(def, repo, remote) + if err != nil { + return err + } + + for merge := range MergeAll(def, repo, remote) { + if merge.Err != nil { + return merge.Err + } + if merge.Status == entity.MergeStatusInvalid { + return errors.Errorf("merge failure: %s", merge.Reason) + } + } + + return nil +} + +func MergeAll(def Definition, repo repository.ClockedRepo, remote string) <-chan entity.MergeResult { + out := make(chan entity.MergeResult) + + // no caching for the merge, we load everything from git even if that means multiple + // copy of the same entity in memory. The cache layer will intercept the results to + // invalidate entities if necessary. + + go func() { + defer close(out) + + remoteRefSpec := fmt.Sprintf("refs/remotes/%s/%s/", remote, def.namespace) + remoteRefs, err := repo.ListRefs(remoteRefSpec) + if err != nil { + out <- entity.MergeResult{Err: err} + return + } + + for _, remoteRef := range remoteRefs { + out <- merge(def, repo, remoteRef) + } + }() + + return out +} + +func merge(def Definition, repo repository.ClockedRepo, remoteRef string) entity.MergeResult { + id := entity.RefToId(remoteRef) + + if err := id.Validate(); err != nil { + return entity.NewMergeInvalidStatus(id, errors.Wrap(err, "invalid ref").Error()) + } + + remoteEntity, err := read(def, repo, remoteRef) + if err != nil { + return entity.NewMergeInvalidStatus(id, + errors.Wrapf(err, "remote %s is not readable", def.typename).Error()) + } + + // Check for error in remote data + if err := remoteEntity.Validate(); err != nil { + return entity.NewMergeInvalidStatus(id, + errors.Wrapf(err, "remote %s data is invalid", def.typename).Error()) + } + + localRef := fmt.Sprintf("refs/%s/%s", def.namespace, id.String()) + + localExist, err := repo.RefExist(localRef) + if err != nil { + return entity.NewMergeError(err, id) + } + + // the bug is not local yet, simply create the reference + if !localExist { + err := repo.CopyRef(remoteRef, localRef) + if err != nil { + return entity.NewMergeError(err, id) + } + + return entity.NewMergeStatus(entity.MergeStatusNew, id, remoteEntity) + } + + // var updated bool + // err = repo.MergeRef(localRef, remoteRef, func() repository.Hash { + // updated = true + // + // }) + // if err != nil { + // return entity.NewMergeError(err, id) + // } + // + // if updated { + // return entity.NewMergeStatus(entity.MergeStatusUpdated, id, ) + // } else { + // return entity.NewMergeStatus(entity.MergeStatusNothing, id, ) + // } + + localCommit, err := repo.ResolveRef(localRef) + if err != nil { + return entity.NewMergeError(err, id) + } + + remoteCommit, err := repo.ResolveRef(remoteRef) + if err != nil { + return entity.NewMergeError(err, id) + } + + if localCommit == remoteCommit { + // nothing to merge + return entity.NewMergeStatus(entity.MergeStatusNothing, id, remoteEntity) + } + + // fast-forward is possible if otherRef include ref + + remoteCommits, err := repo.ListCommits(remoteRef) + if err != nil { + return entity.NewMergeError(err, id) + } + + fastForwardPossible := false + for _, hash := range remoteCommits { + if hash == localCommit { + fastForwardPossible = true + break + } + } + + if fastForwardPossible { + err = repo.UpdateRef(localRef, remoteCommit) + if err != nil { + return entity.NewMergeError(err, id) + } + return entity.NewMergeStatus(entity.MergeStatusUpdated, id, remoteEntity) + } + + // fast-forward is not possible, we need to create a merge commit + // For simplicity when reading and to have clocks that record this change, we store + // an empty operationPack. + // First step is to collect those clocks. + + localEntity, err := read(def, repo, localRef) + if err != nil { + return entity.NewMergeError(err, id) + } + + // err = localEntity.packClock.Witness(remoteEntity.packClock.Time()) + // if err != nil { + // return entity.NewMergeError(err, id) + // } + // + // packTime, err := localEntity.packClock.Increment() + // if err != nil { + // return entity.NewMergeError(err, id) + // } + + editTime, err := repo.Increment(fmt.Sprintf(editClockPattern, def.namespace)) + if err != nil { + return entity.NewMergeError(err, id) + } + + opp := &operationPack{ + Operations: nil, + CreateTime: 0, + EditTime: editTime, + // PackTime: packTime, + } + + treeHash, err := opp.Write(def, repo) + if err != nil { + return entity.NewMergeError(err, id) + } + + // Create the merge commit with two parents + newHash, err := repo.StoreCommit(treeHash, localCommit, remoteCommit) + if err != nil { + return entity.NewMergeError(err, id) + } + + // finally update the ref + err = repo.UpdateRef(localRef, newHash) + if err != nil { + return entity.NewMergeError(err, id) + } + + return entity.NewMergeStatus(entity.MergeStatusUpdated, id, localEntity) +} + +func Remove() error { + panic("") +} diff --git a/entity/dag/entity_test.go b/entity/dag/entity_test.go new file mode 100644 index 00000000..c5c83567 --- /dev/null +++ b/entity/dag/entity_test.go @@ -0,0 +1,117 @@ +package dag + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestWriteRead(t *testing.T) { + repo, id1, id2, def := makeTestContext() + + entity := New(def) + require.False(t, entity.NeedCommit()) + + entity.Append(newOp1(id1, "foo")) + entity.Append(newOp2(id1, "bar")) + + require.True(t, entity.NeedCommit()) + require.NoError(t, entity.CommitAdNeeded(repo)) + require.False(t, entity.NeedCommit()) + + entity.Append(newOp2(id2, "foobar")) + require.True(t, entity.NeedCommit()) + require.NoError(t, entity.CommitAdNeeded(repo)) + require.False(t, entity.NeedCommit()) + + read, err := Read(def, repo, entity.Id()) + require.NoError(t, err) + + assertEqualEntities(t, entity, read) +} + +func assertEqualEntities(t *testing.T, a, b *Entity) { + // testify doesn't support comparing functions and systematically fail if they are not nil + // so we have to set them to nil temporarily + + backOpUnA := a.Definition.operationUnmarshaler + backOpUnB := b.Definition.operationUnmarshaler + + a.Definition.operationUnmarshaler = nil + b.Definition.operationUnmarshaler = nil + + backIdResA := a.Definition.identityResolver + backIdResB := b.Definition.identityResolver + + a.Definition.identityResolver = nil + b.Definition.identityResolver = nil + + defer func() { + a.Definition.operationUnmarshaler = backOpUnA + b.Definition.operationUnmarshaler = backOpUnB + a.Definition.identityResolver = backIdResA + b.Definition.identityResolver = backIdResB + }() + + require.Equal(t, a, b) +} + +// // Merge +// +// merge1 := makeCommit(t, repo) +// merge1 = makeCommit(t, repo, merge1) +// err = repo.UpdateRef("merge1", merge1) +// require.NoError(t, err) +// +// err = repo.UpdateRef("merge2", merge1) +// require.NoError(t, err) +// +// // identical merge +// err = repo.MergeRef("merge1", "merge2") +// require.NoError(t, err) +// +// refMerge1, err := repo.ResolveRef("merge1") +// require.NoError(t, err) +// require.Equal(t, merge1, refMerge1) +// refMerge2, err := repo.ResolveRef("merge2") +// require.NoError(t, err) +// require.Equal(t, merge1, refMerge2) +// +// // fast-forward merge +// merge2 := makeCommit(t, repo, merge1) +// merge2 = makeCommit(t, repo, merge2) +// +// err = repo.UpdateRef("merge2", merge2) +// require.NoError(t, err) +// +// err = repo.MergeRef("merge1", "merge2") +// require.NoError(t, err) +// +// refMerge1, err = repo.ResolveRef("merge1") +// require.NoError(t, err) +// require.Equal(t, merge2, refMerge1) +// refMerge2, err = repo.ResolveRef("merge2") +// require.NoError(t, err) +// require.Equal(t, merge2, refMerge2) +// +// // merge commit +// merge1 = makeCommit(t, repo, merge1) +// err = repo.UpdateRef("merge1", merge1) +// require.NoError(t, err) +// +// merge2 = makeCommit(t, repo, merge2) +// err = repo.UpdateRef("merge2", merge2) +// require.NoError(t, err) +// +// err = repo.MergeRef("merge1", "merge2") +// require.NoError(t, err) +// +// refMerge1, err = repo.ResolveRef("merge1") +// require.NoError(t, err) +// require.NotEqual(t, merge1, refMerge1) +// commitRefMerge1, err := repo.ReadCommit(refMerge1) +// require.NoError(t, err) +// require.ElementsMatch(t, commitRefMerge1.Parents, []Hash{merge1, merge2}) +// refMerge2, err = repo.ResolveRef("merge2") +// require.NoError(t, err) +// require.Equal(t, merge2, refMerge2) diff --git a/entity/dag/operation.go b/entity/dag/operation.go new file mode 100644 index 00000000..9fcc055b --- /dev/null +++ b/entity/dag/operation.go @@ -0,0 +1,31 @@ +package dag + +import ( + "github.com/MichaelMure/git-bug/entity" + "github.com/MichaelMure/git-bug/identity" +) + +// Operation is a piece of data defining a change to reflect on the state of an Entity. +// What this Operation or Entity's state looks like is not of the resort of this package as it only deals with the +// data structure and storage. +type Operation interface { + // Id return the Operation identifier + // Some care need to be taken to define a correct Id derivation and enough entropy in the data used to avoid + // collisions. Notably: + // - the Id of the first Operation will be used as the Id of the Entity. Collision need to be avoided across Entities. + // - collisions can also happen within the set of Operations of an Entity. Simple Operation might not have enough + // entropy to yield unique Ids. + // A common way to derive an Id will be to use the DeriveId function on the serialized operation data. + Id() entity.Id + // Validate check if the Operation data is valid + Validate() error + + Author() identity.Interface +} + +type operationBase struct { + author identity.Interface + + // Not serialized. Store the op's id in memory. + id entity.Id +} diff --git a/entity/dag/operation_pack.go b/entity/dag/operation_pack.go new file mode 100644 index 00000000..7cf4ee58 --- /dev/null +++ b/entity/dag/operation_pack.go @@ -0,0 +1,294 @@ +package dag + +import ( + "encoding/json" + "fmt" + "strconv" + "strings" + + "github.com/pkg/errors" + "golang.org/x/crypto/openpgp" + + "github.com/MichaelMure/git-bug/entity" + "github.com/MichaelMure/git-bug/identity" + "github.com/MichaelMure/git-bug/repository" + "github.com/MichaelMure/git-bug/util/lamport" +) + +// TODO: extra data tree +const extraEntryName = "extra" + +const opsEntryName = "ops" +const versionEntryPrefix = "version-" +const createClockEntryPrefix = "create-clock-" +const editClockEntryPrefix = "edit-clock-" +const packClockEntryPrefix = "pack-clock-" + +// operationPack is a wrapper structure to store multiple operations in a single git blob. +// Additionally, it holds and store the metadata for those operations. +type operationPack struct { + // An identifier, taken from a hash of the serialized Operations. + id entity.Id + + // The author of the Operations. Must be the same author for all the Operations. + Author identity.Interface + // The list of Operation stored in the operationPack + Operations []Operation + // Encode the entity's logical time of creation across all entities of the same type. + // Only exist on the root operationPack + CreateTime lamport.Time + // Encode the entity's logical time of last edition across all entities of the same type. + // Exist on all operationPack + EditTime lamport.Time + // // Encode the operationPack's logical time of creation withing this entity. + // // Exist on all operationPack + // PackTime lamport.Time +} + +func (opp *operationPack) Id() entity.Id { + if opp.id == "" || opp.id == entity.UnsetId { + // This means we are trying to get the opp's Id *before* it has been stored. + // As the Id is computed based on the actual bytes written on the disk, we are going to predict + // those and then get the Id. This is safe as it will be the exact same code writing on disk later. + + data, err := json.Marshal(opp) + if err != nil { + panic(err) + } + opp.id = entity.DeriveId(data) + } + + return opp.id +} + +func (opp *operationPack) MarshalJSON() ([]byte, error) { + return json.Marshal(struct { + Author identity.Interface `json:"author"` + Operations []Operation `json:"ops"` + }{ + Author: opp.Author, + Operations: opp.Operations, + }) +} + +func (opp *operationPack) Validate() error { + if opp.Author == nil { + return fmt.Errorf("missing author") + } + for _, op := range opp.Operations { + if op.Author() != opp.Author { + return fmt.Errorf("operation has different author than the operationPack's") + } + } + if opp.EditTime == 0 { + return fmt.Errorf("lamport edit time is zero") + } + return nil +} + +func (opp *operationPack) Write(def Definition, repo repository.RepoData, parentCommit ...repository.Hash) (repository.Hash, error) { + if err := opp.Validate(); err != nil { + return "", err + } + + // For different reason, we store the clocks and format version directly in the git tree. + // Version has to be accessible before any attempt to decode to return early with a unique error. + // Clocks could possibly be stored in the git blob but it's nice to separate data and metadata, and + // we are storing something directly in the tree already so why not. + // + // To have a valid Tree, we point the "fake" entries to always the same value, the empty blob. + emptyBlobHash, err := repo.StoreData([]byte{}) + if err != nil { + return "", err + } + + // Write the Ops as a Git blob containing the serialized array of operations + data, err := json.Marshal(opp) + if err != nil { + return "", err + } + + // compute the Id while we have the serialized data + opp.id = entity.DeriveId(data) + + hash, err := repo.StoreData(data) + if err != nil { + return "", err + } + + // Make a Git tree referencing this blob and encoding the other values: + // - format version + // - clocks + tree := []repository.TreeEntry{ + {ObjectType: repository.Blob, Hash: emptyBlobHash, + Name: fmt.Sprintf(versionEntryPrefix+"%d", def.formatVersion)}, + {ObjectType: repository.Blob, Hash: hash, + Name: opsEntryName}, + {ObjectType: repository.Blob, Hash: emptyBlobHash, + Name: fmt.Sprintf(editClockEntryPrefix+"%d", opp.EditTime)}, + // {ObjectType: repository.Blob, Hash: emptyBlobHash, + // Name: fmt.Sprintf(packClockEntryPrefix+"%d", opp.PackTime)}, + } + if opp.CreateTime > 0 { + tree = append(tree, repository.TreeEntry{ + ObjectType: repository.Blob, + Hash: emptyBlobHash, + Name: fmt.Sprintf(createClockEntryPrefix+"%d", opp.CreateTime), + }) + } + + // Store the tree + treeHash, err := repo.StoreTree(tree) + if err != nil { + return "", err + } + + // Write a Git commit referencing the tree, with the previous commit as parent + // If we have keys, sign. + var commitHash repository.Hash + + // Sign the commit if we have a key + if opp.Author.SigningKey() != nil { + commitHash, err = repo.StoreSignedCommit(treeHash, opp.Author.SigningKey().PGPEntity(), parentCommit...) + } else { + commitHash, err = repo.StoreCommit(treeHash, parentCommit...) + } + + if err != nil { + return "", err + } + + return commitHash, nil +} + +// readOperationPack read the operationPack encoded in git at the given Tree hash. +// +// Validity of the Lamport clocks is left for the caller to decide. +func readOperationPack(def Definition, repo repository.RepoData, commit repository.Commit) (*operationPack, error) { + entries, err := repo.ReadTree(commit.TreeHash) + if err != nil { + return nil, err + } + + // check the format version first, fail early instead of trying to read something + var version uint + for _, entry := range entries { + if strings.HasPrefix(entry.Name, versionEntryPrefix) { + v, err := strconv.ParseUint(strings.TrimPrefix(entry.Name, versionEntryPrefix), 10, 64) + if err != nil { + return nil, errors.Wrap(err, "can't read format version") + } + if v > 1<<12 { + return nil, fmt.Errorf("format version too big") + } + version = uint(v) + break + } + } + if version == 0 { + return nil, entity.NewErrUnknowFormat(def.formatVersion) + } + if version != def.formatVersion { + return nil, entity.NewErrInvalidFormat(version, def.formatVersion) + } + + var id entity.Id + var author identity.Interface + var ops []Operation + var createTime lamport.Time + var editTime lamport.Time + // var packTime lamport.Time + + for _, entry := range entries { + switch { + case entry.Name == opsEntryName: + data, err := repo.ReadData(entry.Hash) + if err != nil { + return nil, errors.Wrap(err, "failed to read git blob data") + } + ops, author, err = unmarshallPack(def, data) + if err != nil { + return nil, err + } + id = entity.DeriveId(data) + + case strings.HasPrefix(entry.Name, createClockEntryPrefix): + v, err := strconv.ParseUint(strings.TrimPrefix(entry.Name, createClockEntryPrefix), 10, 64) + if err != nil { + return nil, errors.Wrap(err, "can't read creation lamport time") + } + createTime = lamport.Time(v) + + case strings.HasPrefix(entry.Name, editClockEntryPrefix): + v, err := strconv.ParseUint(strings.TrimPrefix(entry.Name, editClockEntryPrefix), 10, 64) + if err != nil { + return nil, errors.Wrap(err, "can't read edit lamport time") + } + editTime = lamport.Time(v) + + // case strings.HasPrefix(entry.Name, packClockEntryPrefix): + // found &= 1 << 3 + // + // v, err := strconv.ParseUint(strings.TrimPrefix(entry.Name, packClockEntryPrefix), 10, 64) + // if err != nil { + // return nil, errors.Wrap(err, "can't read pack lamport time") + // } + // packTime = lamport.Time(v) + } + } + + // Verify signature if we expect one + keys := author.ValidKeysAtTime(fmt.Sprintf(editClockPattern, def.namespace), editTime) + if len(keys) > 0 { + keyring := identity.PGPKeyring(keys) + _, err = openpgp.CheckDetachedSignature(keyring, commit.SignedData, commit.Signature) + if err != nil { + return nil, fmt.Errorf("signature failure: %v", err) + } + } + + return &operationPack{ + id: id, + Author: author, + Operations: ops, + CreateTime: createTime, + EditTime: editTime, + // PackTime: packTime, + }, nil +} + +// unmarshallPack delegate the unmarshalling of the Operation's JSON to the decoding +// function provided by the concrete entity. This gives access to the concrete type of each +// Operation. +func unmarshallPack(def Definition, data []byte) ([]Operation, identity.Interface, error) { + aux := struct { + Author identity.IdentityStub `json:"author"` + Operations []json.RawMessage `json:"ops"` + }{} + + if err := json.Unmarshal(data, &aux); err != nil { + return nil, nil, err + } + + if aux.Author.Id() == "" || aux.Author.Id() == entity.UnsetId { + return nil, nil, fmt.Errorf("missing author") + } + + author, err := def.identityResolver.ResolveIdentity(aux.Author.Id()) + if err != nil { + return nil, nil, err + } + + ops := make([]Operation, 0, len(aux.Operations)) + + for _, raw := range aux.Operations { + // delegate to specialized unmarshal function + op, err := def.operationUnmarshaler(author, raw) + if err != nil { + return nil, nil, err + } + ops = append(ops, op) + } + + return ops, author, nil +} diff --git a/entity/dag/operation_pack_test.go b/entity/dag/operation_pack_test.go new file mode 100644 index 00000000..ad2a9859 --- /dev/null +++ b/entity/dag/operation_pack_test.go @@ -0,0 +1,44 @@ +package dag + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestOperationPackReadWrite(t *testing.T) { + repo, id1, _, def := makeTestContext() + + opp := &operationPack{ + Author: id1, + Operations: []Operation{ + newOp1(id1, "foo"), + newOp2(id1, "bar"), + }, + CreateTime: 123, + EditTime: 456, + } + + commitHash, err := opp.Write(def, repo) + require.NoError(t, err) + + commit, err := repo.ReadCommit(commitHash) + require.NoError(t, err) + + opp2, err := readOperationPack(def, repo, commit) + require.NoError(t, err) + + require.Equal(t, opp, opp2) + + // make sure we get the same Id with the same data + opp3 := &operationPack{ + Author: id1, + Operations: []Operation{ + newOp1(id1, "foo"), + newOp2(id1, "bar"), + }, + CreateTime: 123, + EditTime: 456, + } + require.Equal(t, opp.Id(), opp3.Id()) +} diff --git a/entity/doc.go b/entity/doc.go deleted file mode 100644 index 4682d545..00000000 --- a/entity/doc.go +++ /dev/null @@ -1,8 +0,0 @@ -// Package entity contains the base common code to define an entity stored -// in a chain of git objects, supporting actions like Push, Pull and Merge. -package entity - -// TODO: Bug and Identity are very similar, right ? I expect that this package -// will eventually hold the common code to define an entity and the related -// helpers, errors and so on. When this work is done, it will become easier -// to add new entities, for example to support pull requests. diff --git a/entity/entity.go b/entity/entity.go deleted file mode 100644 index a1e8e57e..00000000 --- a/entity/entity.go +++ /dev/null @@ -1,348 +0,0 @@ -package entity - -import ( - "encoding/json" - "fmt" - "sort" - - "github.com/pkg/errors" - - "github.com/MichaelMure/git-bug/repository" - "github.com/MichaelMure/git-bug/util/lamport" -) - -const refsPattern = "refs/%s/%s" -const creationClockPattern = "%s-create" -const editClockPattern = "%s-edit" - -type Operation interface { - Id() Id - // MarshalJSON() ([]byte, error) - Validate() error -} - -// Definition hold the details defining one specialization of an Entity. -type Definition struct { - // the name of the entity (bug, pull-request, ...) - typename string - // the namespace in git (bugs, prs, ...) - namespace string - // a function decoding a JSON message into an Operation - operationUnmarshaler func(raw json.RawMessage) (Operation, error) - // the expected format version number - formatVersion uint -} - -type Entity struct { - Definition - - ops []Operation - staging []Operation - - packClock lamport.Clock - lastCommit repository.Hash -} - -func New(definition Definition) *Entity { - return &Entity{ - Definition: definition, - packClock: lamport.NewMemClock(), - } -} - -func Read(def Definition, repo repository.ClockedRepo, id Id) (*Entity, error) { - if err := id.Validate(); err != nil { - return nil, errors.Wrap(err, "invalid id") - } - - ref := fmt.Sprintf("refs/%s/%s", def.namespace, id.String()) - - return read(def, repo, ref) -} - -// read fetch from git and decode an Entity at an arbitrary git reference. -func read(def Definition, repo repository.ClockedRepo, ref string) (*Entity, error) { - rootHash, err := repo.ResolveRef(ref) - if err != nil { - return nil, err - } - - // Perform a depth-first search to get a topological order of the DAG where we discover the - // parents commit and go back in time up to the chronological root - - stack := make([]repository.Hash, 0, 32) - visited := make(map[repository.Hash]struct{}) - DFSOrder := make([]repository.Commit, 0, 32) - - stack = append(stack, rootHash) - - for len(stack) > 0 { - // pop - hash := stack[len(stack)-1] - stack = stack[:len(stack)-1] - - if _, ok := visited[hash]; ok { - continue - } - - // mark as visited - visited[hash] = struct{}{} - - commit, err := repo.ReadCommit(hash) - if err != nil { - return nil, err - } - - DFSOrder = append(DFSOrder, commit) - - for _, parent := range commit.Parents { - stack = append(stack, parent) - } - } - - // Now, we can reverse this topological order and read the commits in an order where - // we are sure to have read all the chronological ancestors when we read a commit. - - // Next step is to: - // 1) read the operationPacks - // 2) make sure that the clocks causality respect the DAG topology. - - oppMap := make(map[repository.Hash]*operationPack) - var opsCount int - var packClock = lamport.NewMemClock() - - for i := len(DFSOrder) - 1; i >= 0; i-- { - commit := DFSOrder[i] - firstCommit := i == len(DFSOrder)-1 - - // Verify DAG structure: single chronological root, so only the root - // can have no parents - if !firstCommit && len(commit.Parents) == 0 { - return nil, fmt.Errorf("multiple root in the entity DAG") - } - - opp, err := readOperationPack(def, repo, commit.TreeHash) - if err != nil { - return nil, err - } - - // Check that the lamport clocks are set - if firstCommit && opp.CreateTime <= 0 { - return nil, fmt.Errorf("creation lamport time not set") - } - if opp.EditTime <= 0 { - return nil, fmt.Errorf("edition lamport time not set") - } - if opp.PackTime <= 0 { - return nil, fmt.Errorf("pack lamport time not set") - } - - // make sure that the lamport clocks causality match the DAG topology - for _, parentHash := range commit.Parents { - parentPack, ok := oppMap[parentHash] - if !ok { - panic("DFS failed") - } - - if parentPack.EditTime >= opp.EditTime { - return nil, fmt.Errorf("lamport clock ordering doesn't match the DAG") - } - - // to avoid an attack where clocks are pushed toward the uint64 rollover, make sure - // that the clocks don't jump too far in the future - if opp.EditTime-parentPack.EditTime > 10_000 { - return nil, fmt.Errorf("lamport clock jumping too far in the future, likely an attack") - } - } - - oppMap[commit.Hash] = opp - opsCount += len(opp.Operations) - } - - // The clocks are fine, we witness them - for _, opp := range oppMap { - err = repo.Witness(fmt.Sprintf(creationClockPattern, def.namespace), opp.CreateTime) - if err != nil { - return nil, err - } - err = repo.Witness(fmt.Sprintf(editClockPattern, def.namespace), opp.EditTime) - if err != nil { - return nil, err - } - err = packClock.Witness(opp.PackTime) - if err != nil { - return nil, err - } - } - - // Now that we know that the topological order and clocks are fine, we order the operationPacks - // based on the logical clocks, entirely ignoring the DAG topology - - oppSlice := make([]*operationPack, 0, len(oppMap)) - for _, pack := range oppMap { - oppSlice = append(oppSlice, pack) - } - sort.Slice(oppSlice, func(i, j int) bool { - // Primary ordering with the dedicated "pack" Lamport time that encode causality - // within the entity - if oppSlice[i].PackTime != oppSlice[j].PackTime { - return oppSlice[i].PackTime < oppSlice[i].PackTime - } - // We have equal PackTime, which means we had a concurrent edition. We can't tell which exactly - // came first. As a secondary arbitrary ordering, we can use the EditTime. It's unlikely to be - // enough but it can give us an edge to approach what really happened. - if oppSlice[i].EditTime != oppSlice[j].EditTime { - return oppSlice[i].EditTime < oppSlice[j].EditTime - } - // Well, what now? We still need a total ordering, the most stable possible. - // As a last resort, we can order based on a hash of the serialized Operations in the - // operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse. - // This is a lexicographic ordering. - return oppSlice[i].Id < oppSlice[j].Id - }) - - // Now that we ordered the operationPacks, we have the order of the Operations - - ops := make([]Operation, 0, opsCount) - for _, pack := range oppSlice { - for _, operation := range pack.Operations { - ops = append(ops, operation) - } - } - - return &Entity{ - Definition: def, - ops: ops, - lastCommit: rootHash, - }, nil -} - -// Id return the Entity identifier -func (e *Entity) Id() Id { - // id is the id of the first operation - return e.FirstOp().Id() -} - -func (e *Entity) Validate() error { - // non-empty - if len(e.ops) == 0 && len(e.staging) == 0 { - return fmt.Errorf("entity has no operations") - } - - // check if each operations are valid - for _, op := range e.ops { - if err := op.Validate(); err != nil { - return err - } - } - - // check if staging is valid if needed - for _, op := range e.staging { - if err := op.Validate(); err != nil { - return err - } - } - - // Check that there is no colliding operation's ID - ids := make(map[Id]struct{}) - for _, op := range e.Operations() { - if _, ok := ids[op.Id()]; ok { - return fmt.Errorf("id collision: %s", op.Id()) - } - ids[op.Id()] = struct{}{} - } - - return nil -} - -// return the ordered operations -func (e *Entity) Operations() []Operation { - return append(e.ops, e.staging...) -} - -// Lookup for the very first operation of the Entity. -func (e *Entity) FirstOp() Operation { - for _, op := range e.ops { - return op - } - for _, op := range e.staging { - return op - } - return nil -} - -func (e *Entity) Append(op Operation) { - e.staging = append(e.staging, op) -} - -func (e *Entity) NeedCommit() bool { - return len(e.staging) > 0 -} - -func (e *Entity) CommitAdNeeded(repo repository.ClockedRepo) error { - if e.NeedCommit() { - return e.Commit(repo) - } - return nil -} - -// TODO: support commit signature -func (e *Entity) Commit(repo repository.ClockedRepo) error { - if !e.NeedCommit() { - return fmt.Errorf("can't commit an entity with no pending operation") - } - - if err := e.Validate(); err != nil { - return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.typename) - } - - // increment the various clocks for this new operationPack - packTime, err := e.packClock.Increment() - if err != nil { - return err - } - editTime, err := repo.Increment(fmt.Sprintf(editClockPattern, e.namespace)) - if err != nil { - return err - } - var creationTime lamport.Time - if e.lastCommit == "" { - creationTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.namespace)) - if err != nil { - return err - } - } - - opp := &operationPack{ - Operations: e.staging, - CreateTime: creationTime, - EditTime: editTime, - PackTime: packTime, - } - - treeHash, err := opp.write(e.Definition, repo) - if err != nil { - return err - } - - // Write a Git commit referencing the tree, with the previous commit as parent - var commitHash repository.Hash - if e.lastCommit != "" { - commitHash, err = repo.StoreCommitWithParent(treeHash, e.lastCommit) - } else { - commitHash, err = repo.StoreCommit(treeHash) - } - if err != nil { - return err - } - - e.lastCommit = commitHash - e.ops = append(e.ops, e.staging...) - e.staging = nil - - // Create or update the Git reference for this entity - // When pushing later, the remote will ensure that this ref update - // is fast-forward, that is no data has been overwritten. - ref := fmt.Sprintf(refsPattern, e.namespace, e.Id().String()) - return repo.UpdateRef(ref, commitHash) -} diff --git a/entity/entity_actions.go b/entity/entity_actions.go deleted file mode 100644 index 34e76a62..00000000 --- a/entity/entity_actions.go +++ /dev/null @@ -1,31 +0,0 @@ -package entity - -import ( - "fmt" - - "github.com/MichaelMure/git-bug/repository" -) - -func ListLocalIds(typename string, repo repository.RepoData) ([]Id, error) { - refs, err := repo.ListRefs(fmt.Sprintf("refs/%s/", typename)) - if err != nil { - return nil, err - } - return RefsToIds(refs), nil -} - -func Fetch() { - -} - -func Pull() { - -} - -func Push() { - -} - -func Remove() error { - panic("") -} diff --git a/entity/entity_test.go b/entity/entity_test.go deleted file mode 100644 index 92a53179..00000000 --- a/entity/entity_test.go +++ /dev/null @@ -1,107 +0,0 @@ -package entity - -import ( - "encoding/json" - "fmt" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/MichaelMure/git-bug/repository" -) - -// func TestFoo(t *testing.T) { -// repo, err := repository.OpenGoGitRepo("~/dev/git-bug", nil) -// require.NoError(t, err) -// -// b, err := ReadBug(repo, Id("8b22e548c93a6ed23c31fd4e337c6286c3d1e5c9cae5537bc8e5842e11bd1099")) -// require.NoError(t, err) -// -// fmt.Println(b) -// } - -type op1 struct { - OperationType int `json:"type"` - Field1 string `json:"field_1"` -} - -func newOp1(field1 string) *op1 { - return &op1{OperationType: 1, Field1: field1} -} - -func (o op1) Id() Id { - data, _ := json.Marshal(o) - return DeriveId(data) -} - -func (o op1) Validate() error { return nil } - -type op2 struct { - OperationType int `json:"type"` - Field2 string `json:"field_2"` -} - -func newOp2(field2 string) *op2 { - return &op2{OperationType: 2, Field2: field2} -} - -func (o op2) Id() Id { - data, _ := json.Marshal(o) - return DeriveId(data) -} - -func (o op2) Validate() error { return nil } - -var def = Definition{ - typename: "foo", - namespace: "foos", - operationUnmarshaler: unmarshaller, - formatVersion: 1, -} - -func unmarshaller(raw json.RawMessage) (Operation, error) { - var t struct { - OperationType int `json:"type"` - } - - if err := json.Unmarshal(raw, &t); err != nil { - return nil, err - } - - switch t.OperationType { - case 1: - op := &op1{} - err := json.Unmarshal(raw, &op) - return op, err - case 2: - op := &op2{} - err := json.Unmarshal(raw, &op) - return op, err - default: - return nil, fmt.Errorf("unknown operation type %v", t.OperationType) - } -} - -func TestWriteRead(t *testing.T) { - repo := repository.NewMockRepo() - - entity := New(def) - require.False(t, entity.NeedCommit()) - - entity.Append(newOp1("foo")) - entity.Append(newOp2("bar")) - - require.True(t, entity.NeedCommit()) - require.NoError(t, entity.CommitAdNeeded(repo)) - require.False(t, entity.NeedCommit()) - - entity.Append(newOp2("foobar")) - require.True(t, entity.NeedCommit()) - require.NoError(t, entity.CommitAdNeeded(repo)) - require.False(t, entity.NeedCommit()) - - read, err := Read(def, repo, entity.Id()) - require.NoError(t, err) - - fmt.Println(*read) -} diff --git a/entity/merge.go b/entity/merge.go index 3ce8edac..7d1f3f43 100644 --- a/entity/merge.go +++ b/entity/merge.go @@ -8,14 +8,15 @@ import ( type MergeStatus int const ( - _ MergeStatus = iota - MergeStatusNew - MergeStatusInvalid - MergeStatusUpdated - MergeStatusNothing - MergeStatusError + _ MergeStatus = iota + MergeStatusNew // a new Entity was created locally + MergeStatusInvalid // the remote data is invalid + MergeStatusUpdated // a local Entity has been updated + MergeStatusNothing // no changes were made to a local Entity (already up to date) + MergeStatusError // a terminal error happened ) +// MergeResult hold the result of a merge operation on an Entity. type MergeResult struct { // Err is set when a terminal error occur in the process Err error @@ -55,6 +56,7 @@ func NewMergeError(err error, id Id) MergeResult { } } +// TODO: Interface --> *Entity ? func NewMergeStatus(status MergeStatus, id Id, entity Interface) MergeResult { return MergeResult{ Id: id, diff --git a/entity/operation_pack.go b/entity/operation_pack.go deleted file mode 100644 index 0a16dd61..00000000 --- a/entity/operation_pack.go +++ /dev/null @@ -1,199 +0,0 @@ -package entity - -import ( - "encoding/json" - "fmt" - "strconv" - "strings" - - "github.com/pkg/errors" - - "github.com/MichaelMure/git-bug/repository" - "github.com/MichaelMure/git-bug/util/lamport" -) - -// TODO: extra data tree -const extraEntryName = "extra" - -const opsEntryName = "ops" -const versionEntryPrefix = "version-" -const createClockEntryPrefix = "create-clock-" -const editClockEntryPrefix = "edit-clock-" -const packClockEntryPrefix = "pack-clock-" - -type operationPack struct { - Operations []Operation - // Encode the entity's logical time of creation across all entities of the same type. - // Only exist on the root operationPack - CreateTime lamport.Time - // Encode the entity's logical time of last edition across all entities of the same type. - // Exist on all operationPack - EditTime lamport.Time - // Encode the operationPack's logical time of creation withing this entity. - // Exist on all operationPack - PackTime lamport.Time -} - -func (opp operationPack) write(def Definition, repo repository.RepoData) (repository.Hash, error) { - // For different reason, we store the clocks and format version directly in the git tree. - // Version has to be accessible before any attempt to decode to return early with a unique error. - // Clocks could possibly be stored in the git blob but it's nice to separate data and metadata, and - // we are storing something directly in the tree already so why not. - // - // To have a valid Tree, we point the "fake" entries to always the same value, the empty blob. - emptyBlobHash, err := repo.StoreData([]byte{}) - if err != nil { - return "", err - } - - // Write the Ops as a Git blob containing the serialized array - data, err := json.Marshal(struct { - Operations []Operation `json:"ops"` - }{ - Operations: opp.Operations, - }) - if err != nil { - return "", err - } - hash, err := repo.StoreData(data) - if err != nil { - return "", err - } - - // Make a Git tree referencing this blob and encoding the other values: - // - format version - // - clocks - tree := []repository.TreeEntry{ - {ObjectType: repository.Blob, Hash: emptyBlobHash, - Name: fmt.Sprintf(versionEntryPrefix+"%d", def.formatVersion)}, - {ObjectType: repository.Blob, Hash: hash, - Name: opsEntryName}, - {ObjectType: repository.Blob, Hash: emptyBlobHash, - Name: fmt.Sprintf(editClockEntryPrefix+"%d", opp.EditTime)}, - {ObjectType: repository.Blob, Hash: emptyBlobHash, - Name: fmt.Sprintf(packClockEntryPrefix+"%d", opp.PackTime)}, - } - if opp.CreateTime > 0 { - tree = append(tree, repository.TreeEntry{ - ObjectType: repository.Blob, - Hash: emptyBlobHash, - Name: fmt.Sprintf(createClockEntryPrefix+"%d", opp.CreateTime), - }) - } - - // Store the tree - return repo.StoreTree(tree) -} - -// readOperationPack read the operationPack encoded in git at the given Tree hash. -// -// Validity of the Lamport clocks is left for the caller to decide. -func readOperationPack(def Definition, repo repository.RepoData, treeHash repository.Hash) (*operationPack, error) { - entries, err := repo.ReadTree(treeHash) - if err != nil { - return nil, err - } - - // check the format version first, fail early instead of trying to read something - var version uint - for _, entry := range entries { - if strings.HasPrefix(entry.Name, versionEntryPrefix) { - v, err := strconv.ParseUint(strings.TrimPrefix(entry.Name, versionEntryPrefix), 10, 64) - if err != nil { - return nil, errors.Wrap(err, "can't read format version") - } - if v > 1<<12 { - return nil, fmt.Errorf("format version too big") - } - version = uint(v) - break - } - } - if version == 0 { - return nil, NewErrUnknowFormat(def.formatVersion) - } - if version != def.formatVersion { - return nil, NewErrInvalidFormat(version, def.formatVersion) - } - - var ops []Operation - var createTime lamport.Time - var editTime lamport.Time - var packTime lamport.Time - - for _, entry := range entries { - if entry.Name == opsEntryName { - data, err := repo.ReadData(entry.Hash) - if err != nil { - return nil, errors.Wrap(err, "failed to read git blob data") - } - - ops, err = unmarshallOperations(def, data) - if err != nil { - return nil, err - } - continue - } - - if strings.HasPrefix(entry.Name, createClockEntryPrefix) { - v, err := strconv.ParseUint(strings.TrimPrefix(entry.Name, createClockEntryPrefix), 10, 64) - if err != nil { - return nil, errors.Wrap(err, "can't read creation lamport time") - } - createTime = lamport.Time(v) - continue - } - - if strings.HasPrefix(entry.Name, editClockEntryPrefix) { - v, err := strconv.ParseUint(strings.TrimPrefix(entry.Name, editClockEntryPrefix), 10, 64) - if err != nil { - return nil, errors.Wrap(err, "can't read edit lamport time") - } - editTime = lamport.Time(v) - continue - } - - if strings.HasPrefix(entry.Name, packClockEntryPrefix) { - v, err := strconv.ParseUint(strings.TrimPrefix(entry.Name, packClockEntryPrefix), 10, 64) - if err != nil { - return nil, errors.Wrap(err, "can't read pack lamport time") - } - packTime = lamport.Time(v) - continue - } - } - - return &operationPack{ - Operations: ops, - CreateTime: createTime, - EditTime: editTime, - PackTime: packTime, - }, nil -} - -// unmarshallOperations delegate the unmarshalling of the Operation's JSON to the decoding -// function provided by the concrete entity. This gives access to the concrete type of each -// Operation. -func unmarshallOperations(def Definition, data []byte) ([]Operation, error) { - aux := struct { - Operations []json.RawMessage `json:"ops"` - }{} - - if err := json.Unmarshal(data, &aux); err != nil { - return nil, err - } - - ops := make([]Operation, 0, len(aux.Operations)) - - for _, raw := range aux.Operations { - // delegate to specialized unmarshal function - op, err := def.operationUnmarshaler(raw) - if err != nil { - return nil, err - } - - ops = append(ops, op) - } - - return ops, nil -} diff --git a/entity/refs.go b/entity/refs.go index f505dbf0..070d4dba 100644 --- a/entity/refs.go +++ b/entity/refs.go @@ -2,6 +2,7 @@ package entity import "strings" +// RefsToIds parse a slice of git references and return the corresponding Entity's Id. func RefsToIds(refs []string) []Id { ids := make([]Id, len(refs)) @@ -12,6 +13,7 @@ func RefsToIds(refs []string) []Id { return ids } +// RefsToIds parse a git reference and return the corresponding Entity's Id. func RefToId(ref string) Id { split := strings.Split(ref, "/") return Id(split[len(split)-1]) -- cgit