diff options
Diffstat (limited to 'entity')
-rw-r--r-- | entity/entity.go | 196 | ||||
-rw-r--r-- | entity/entity_actions.go | 4 | ||||
-rw-r--r-- | entity/entity_test.go | 107 | ||||
-rw-r--r-- | entity/operation_pack.go | 132 |
4 files changed, 387 insertions, 52 deletions
diff --git a/entity/entity.go b/entity/entity.go index c12dc2c4..9ba536d6 100644 --- a/entity/entity.go +++ b/entity/entity.go @@ -8,34 +8,47 @@ import ( "github.com/pkg/errors" "github.com/MichaelMure/git-bug/repository" + "github.com/MichaelMure/git-bug/util/lamport" ) +const refsPattern = "refs/%s/%s" +const creationClockPattern = "%s-create" +const editClockPattern = "%s-edit" + type Operation interface { - // Id() Id + Id() Id // MarshalJSON() ([]byte, error) Validate() error - - base() *OpBase } type OperationIterator struct { } type Definition struct { - namespace string + // the name of the entity (bug, pull-request, ...) + typename string + // the namespace in git (bugs, prs, ...) + namespace string + // a function decoding a JSON message into an Operation operationUnmarshaler func(raw json.RawMessage) (Operation, error) - formatVersion uint + // the expected format version number + formatVersion uint } type Entity struct { Definition - ops []Operation + ops []Operation + staging []Operation + + packClock lamport.Clock + lastCommit repository.Hash } func New(definition Definition) *Entity { return &Entity{ Definition: definition, + packClock: lamport.NewMemClock(), } } @@ -93,19 +106,15 @@ func Read(def Definition, repo repository.ClockedRepo, id Id) (*Entity, error) { oppMap := make(map[repository.Hash]*operationPack) var opsCount int + var packClock = lamport.NewMemClock() - rootCommit := DFSOrder[len(DFSOrder)-1] - rootOpp, err := readOperationPack(def, repo, rootCommit.TreeHash) - if err != nil { - return nil, err - } - oppMap[rootCommit.Hash] = rootOpp - - for i := len(DFSOrder) - 2; i >= 0; i-- { + for i := len(DFSOrder) - 1; i >= 0; i-- { commit := DFSOrder[i] + firstCommit := i == len(DFSOrder)-1 - // Verify DAG structure: single chronological root - if len(commit.Parents) == 0 { + // Verify DAG structure: single chronological root, so only the root + // can have no parents + if !firstCommit && len(commit.Parents) == 0 { return nil, fmt.Errorf("multiple root in the entity DAG") } @@ -114,6 +123,17 @@ func Read(def Definition, repo repository.ClockedRepo, id Id) (*Entity, error) { return nil, err } + // Check that the lamport clocks are set + if firstCommit && opp.CreateTime <= 0 { + return nil, fmt.Errorf("creation lamport time not set") + } + if opp.EditTime <= 0 { + return nil, fmt.Errorf("edition lamport time not set") + } + if opp.PackTime <= 0 { + return nil, fmt.Errorf("pack lamport time not set") + } + // make sure that the lamport clocks causality match the DAG topology for _, parentHash := range commit.Parents { parentPack, ok := oppMap[parentHash] @@ -136,6 +156,22 @@ func Read(def Definition, repo repository.ClockedRepo, id Id) (*Entity, error) { opsCount += len(opp.Operations) } + // The clocks are fine, we witness them + for _, opp := range oppMap { + err = repo.Witness(fmt.Sprintf(creationClockPattern, def.namespace), opp.CreateTime) + if err != nil { + return nil, err + } + err = repo.Witness(fmt.Sprintf(editClockPattern, def.namespace), opp.EditTime) + if err != nil { + return nil, err + } + err = packClock.Witness(opp.PackTime) + if err != nil { + return nil, err + } + } + // Now that we know that the topological order and clocks are fine, we order the operationPacks // based on the logical clocks, entirely ignoring the DAG topology @@ -145,7 +181,8 @@ func Read(def Definition, repo repository.ClockedRepo, id Id) (*Entity, error) { } sort.Slice(oppSlice, func(i, j int) bool { // TODO: no secondary ordering? - return oppSlice[i].EditTime < oppSlice[i].EditTime + // might be useful for stable ordering + return oppSlice[i].PackTime < oppSlice[i].PackTime }) // Now that we ordered the operationPacks, we have the order of the Operations @@ -160,22 +197,135 @@ func Read(def Definition, repo repository.ClockedRepo, id Id) (*Entity, error) { return &Entity{ Definition: def, ops: ops, + lastCommit: rootHash, }, nil } -func Remove() error { - panic("") +// Id return the Entity identifier +func (e *Entity) Id() Id { + // id is the id of the first operation + return e.FirstOp().Id() } -func (e *Entity) Id() { +func (e *Entity) Validate() error { + // non-empty + if len(e.ops) == 0 && len(e.staging) == 0 { + return fmt.Errorf("entity has no operations") + } + + // check if each operations are valid + for _, op := range e.ops { + if err := op.Validate(); err != nil { + return err + } + } + + // check if staging is valid if needed + for _, op := range e.staging { + if err := op.Validate(); err != nil { + return err + } + } + + // Check that there is no colliding operation's ID + ids := make(map[Id]struct{}) + for _, op := range e.Operations() { + if _, ok := ids[op.Id()]; ok { + return fmt.Errorf("id collision: %s", op.Id()) + } + ids[op.Id()] = struct{}{} + } + return nil } // return the ordered operations func (e *Entity) Operations() []Operation { - return e.ops + return append(e.ops, e.staging...) } -func (e *Entity) Commit() error { - panic("") +// Lookup for the very first operation of the Entity. +func (e *Entity) FirstOp() Operation { + for _, op := range e.ops { + return op + } + for _, op := range e.staging { + return op + } + return nil +} + +func (e *Entity) Append(op Operation) { + e.staging = append(e.staging, op) +} + +func (e *Entity) NeedCommit() bool { + return len(e.staging) > 0 +} + +func (e *Entity) CommitAdNeeded(repo repository.ClockedRepo) error { + if e.NeedCommit() { + return e.Commit(repo) + } + return nil +} + +func (e *Entity) Commit(repo repository.ClockedRepo) error { + if !e.NeedCommit() { + return fmt.Errorf("can't commit an entity with no pending operation") + } + + if err := e.Validate(); err != nil { + return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.typename) + } + + // increment the various clocks for this new operationPack + packTime, err := e.packClock.Increment() + if err != nil { + return err + } + editTime, err := repo.Increment(fmt.Sprintf(editClockPattern, e.namespace)) + if err != nil { + return err + } + var creationTime lamport.Time + if e.lastCommit == "" { + creationTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.namespace)) + if err != nil { + return err + } + } + + opp := &operationPack{ + Operations: e.staging, + CreateTime: creationTime, + EditTime: editTime, + PackTime: packTime, + } + + treeHash, err := opp.write(e.Definition, repo) + if err != nil { + return err + } + + // Write a Git commit referencing the tree, with the previous commit as parent + var commitHash repository.Hash + if e.lastCommit != "" { + commitHash, err = repo.StoreCommitWithParent(treeHash, e.lastCommit) + } else { + commitHash, err = repo.StoreCommit(treeHash) + } + if err != nil { + return err + } + + e.lastCommit = commitHash + e.ops = append(e.ops, e.staging...) + e.staging = nil + + // Create or update the Git reference for this entity + // When pushing later, the remote will ensure that this ref update + // is fast-forward, that is no data has been overwritten. + ref := fmt.Sprintf(refsPattern, e.namespace, e.Id().String()) + return repo.UpdateRef(ref, commitHash) } diff --git a/entity/entity_actions.go b/entity/entity_actions.go index 02e76487..34e76a62 100644 --- a/entity/entity_actions.go +++ b/entity/entity_actions.go @@ -25,3 +25,7 @@ func Pull() { func Push() { } + +func Remove() error { + panic("") +} diff --git a/entity/entity_test.go b/entity/entity_test.go new file mode 100644 index 00000000..92a53179 --- /dev/null +++ b/entity/entity_test.go @@ -0,0 +1,107 @@ +package entity + +import ( + "encoding/json" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/MichaelMure/git-bug/repository" +) + +// func TestFoo(t *testing.T) { +// repo, err := repository.OpenGoGitRepo("~/dev/git-bug", nil) +// require.NoError(t, err) +// +// b, err := ReadBug(repo, Id("8b22e548c93a6ed23c31fd4e337c6286c3d1e5c9cae5537bc8e5842e11bd1099")) +// require.NoError(t, err) +// +// fmt.Println(b) +// } + +type op1 struct { + OperationType int `json:"type"` + Field1 string `json:"field_1"` +} + +func newOp1(field1 string) *op1 { + return &op1{OperationType: 1, Field1: field1} +} + +func (o op1) Id() Id { + data, _ := json.Marshal(o) + return DeriveId(data) +} + +func (o op1) Validate() error { return nil } + +type op2 struct { + OperationType int `json:"type"` + Field2 string `json:"field_2"` +} + +func newOp2(field2 string) *op2 { + return &op2{OperationType: 2, Field2: field2} +} + +func (o op2) Id() Id { + data, _ := json.Marshal(o) + return DeriveId(data) +} + +func (o op2) Validate() error { return nil } + +var def = Definition{ + typename: "foo", + namespace: "foos", + operationUnmarshaler: unmarshaller, + formatVersion: 1, +} + +func unmarshaller(raw json.RawMessage) (Operation, error) { + var t struct { + OperationType int `json:"type"` + } + + if err := json.Unmarshal(raw, &t); err != nil { + return nil, err + } + + switch t.OperationType { + case 1: + op := &op1{} + err := json.Unmarshal(raw, &op) + return op, err + case 2: + op := &op2{} + err := json.Unmarshal(raw, &op) + return op, err + default: + return nil, fmt.Errorf("unknown operation type %v", t.OperationType) + } +} + +func TestWriteRead(t *testing.T) { + repo := repository.NewMockRepo() + + entity := New(def) + require.False(t, entity.NeedCommit()) + + entity.Append(newOp1("foo")) + entity.Append(newOp2("bar")) + + require.True(t, entity.NeedCommit()) + require.NoError(t, entity.CommitAdNeeded(repo)) + require.False(t, entity.NeedCommit()) + + entity.Append(newOp2("foobar")) + require.True(t, entity.NeedCommit()) + require.NoError(t, entity.CommitAdNeeded(repo)) + require.False(t, entity.NeedCommit()) + + read, err := Read(def, repo, entity.Id()) + require.NoError(t, err) + + fmt.Println(*read) +} diff --git a/entity/operation_pack.go b/entity/operation_pack.go index 2377167f..0a16dd61 100644 --- a/entity/operation_pack.go +++ b/entity/operation_pack.go @@ -2,6 +2,7 @@ package entity import ( "encoding/json" + "fmt" "strconv" "strings" @@ -11,25 +12,82 @@ import ( "github.com/MichaelMure/git-bug/util/lamport" ) +// TODO: extra data tree +const extraEntryName = "extra" + const opsEntryName = "ops" const versionEntryPrefix = "version-" const createClockEntryPrefix = "create-clock-" const editClockEntryPrefix = "edit-clock-" +const packClockEntryPrefix = "pack-clock-" type operationPack struct { Operations []Operation + // Encode the entity's logical time of creation across all entities of the same type. + // Only exist on the root operationPack CreateTime lamport.Time - EditTime lamport.Time + // Encode the entity's logical time of last edition across all entities of the same type. + // Exist on all operationPack + EditTime lamport.Time + // Encode the operationPack's logical time of creation withing this entity. + // Exist on all operationPack + PackTime lamport.Time } -// func (opp *operationPack) MarshalJSON() ([]byte, error) { -// return json.Marshal(struct { -// Operations []Operation `json:"ops"` -// }{ -// Operations: opp.Operations, -// }) -// } +func (opp operationPack) write(def Definition, repo repository.RepoData) (repository.Hash, error) { + // For different reason, we store the clocks and format version directly in the git tree. + // Version has to be accessible before any attempt to decode to return early with a unique error. + // Clocks could possibly be stored in the git blob but it's nice to separate data and metadata, and + // we are storing something directly in the tree already so why not. + // + // To have a valid Tree, we point the "fake" entries to always the same value, the empty blob. + emptyBlobHash, err := repo.StoreData([]byte{}) + if err != nil { + return "", err + } + + // Write the Ops as a Git blob containing the serialized array + data, err := json.Marshal(struct { + Operations []Operation `json:"ops"` + }{ + Operations: opp.Operations, + }) + if err != nil { + return "", err + } + hash, err := repo.StoreData(data) + if err != nil { + return "", err + } + + // Make a Git tree referencing this blob and encoding the other values: + // - format version + // - clocks + tree := []repository.TreeEntry{ + {ObjectType: repository.Blob, Hash: emptyBlobHash, + Name: fmt.Sprintf(versionEntryPrefix+"%d", def.formatVersion)}, + {ObjectType: repository.Blob, Hash: hash, + Name: opsEntryName}, + {ObjectType: repository.Blob, Hash: emptyBlobHash, + Name: fmt.Sprintf(editClockEntryPrefix+"%d", opp.EditTime)}, + {ObjectType: repository.Blob, Hash: emptyBlobHash, + Name: fmt.Sprintf(packClockEntryPrefix+"%d", opp.PackTime)}, + } + if opp.CreateTime > 0 { + tree = append(tree, repository.TreeEntry{ + ObjectType: repository.Blob, + Hash: emptyBlobHash, + Name: fmt.Sprintf(createClockEntryPrefix+"%d", opp.CreateTime), + }) + } + // Store the tree + return repo.StoreTree(tree) +} + +// readOperationPack read the operationPack encoded in git at the given Tree hash. +// +// Validity of the Lamport clocks is left for the caller to decide. func readOperationPack(def Definition, repo repository.RepoData, treeHash repository.Hash) (*operationPack, error) { entries, err := repo.ReadTree(treeHash) if err != nil { @@ -37,30 +95,31 @@ func readOperationPack(def Definition, repo repository.RepoData, treeHash reposi } // check the format version first, fail early instead of trying to read something - // var version uint - // for _, entry := range entries { - // if strings.HasPrefix(entry.Name, versionEntryPrefix) { - // v, err := strconv.ParseUint(strings.TrimPrefix(entry.Name, versionEntryPrefix), 10, 64) - // if err != nil { - // return nil, errors.Wrap(err, "can't read format version") - // } - // if v > 1<<12 { - // return nil, fmt.Errorf("format version too big") - // } - // version = uint(v) - // break - // } - // } - // if version == 0 { - // return nil, NewErrUnknowFormat(def.formatVersion) - // } - // if version != def.formatVersion { - // return nil, NewErrInvalidFormat(version, def.formatVersion) - // } + var version uint + for _, entry := range entries { + if strings.HasPrefix(entry.Name, versionEntryPrefix) { + v, err := strconv.ParseUint(strings.TrimPrefix(entry.Name, versionEntryPrefix), 10, 64) + if err != nil { + return nil, errors.Wrap(err, "can't read format version") + } + if v > 1<<12 { + return nil, fmt.Errorf("format version too big") + } + version = uint(v) + break + } + } + if version == 0 { + return nil, NewErrUnknowFormat(def.formatVersion) + } + if version != def.formatVersion { + return nil, NewErrInvalidFormat(version, def.formatVersion) + } var ops []Operation var createTime lamport.Time var editTime lamport.Time + var packTime lamport.Time for _, entry := range entries { if entry.Name == opsEntryName { @@ -73,7 +132,7 @@ func readOperationPack(def Definition, repo repository.RepoData, treeHash reposi if err != nil { return nil, err } - break + continue } if strings.HasPrefix(entry.Name, createClockEntryPrefix) { @@ -82,6 +141,7 @@ func readOperationPack(def Definition, repo repository.RepoData, treeHash reposi return nil, errors.Wrap(err, "can't read creation lamport time") } createTime = lamport.Time(v) + continue } if strings.HasPrefix(entry.Name, editClockEntryPrefix) { @@ -90,6 +150,16 @@ func readOperationPack(def Definition, repo repository.RepoData, treeHash reposi return nil, errors.Wrap(err, "can't read edit lamport time") } editTime = lamport.Time(v) + continue + } + + if strings.HasPrefix(entry.Name, packClockEntryPrefix) { + v, err := strconv.ParseUint(strings.TrimPrefix(entry.Name, packClockEntryPrefix), 10, 64) + if err != nil { + return nil, errors.Wrap(err, "can't read pack lamport time") + } + packTime = lamport.Time(v) + continue } } @@ -97,9 +167,13 @@ func readOperationPack(def Definition, repo repository.RepoData, treeHash reposi Operations: ops, CreateTime: createTime, EditTime: editTime, + PackTime: packTime, }, nil } +// unmarshallOperations delegate the unmarshalling of the Operation's JSON to the decoding +// function provided by the concrete entity. This gives access to the concrete type of each +// Operation. func unmarshallOperations(def Definition, data []byte) ([]Operation, error) { aux := struct { Operations []json.RawMessage `json:"ops"` |