From 51ece149089f9075d3d6ba1bb09fda726efde8ad Mon Sep 17 00:00:00 2001 From: Michael Muré Date: Mon, 21 Dec 2020 18:42:04 +0100 Subject: entity: clocks and write --- entity/entity.go | 196 ++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 173 insertions(+), 23 deletions(-) (limited to 'entity/entity.go') diff --git a/entity/entity.go b/entity/entity.go index c12dc2c4..9ba536d6 100644 --- a/entity/entity.go +++ b/entity/entity.go @@ -8,34 +8,47 @@ import ( "github.com/pkg/errors" "github.com/MichaelMure/git-bug/repository" + "github.com/MichaelMure/git-bug/util/lamport" ) +const refsPattern = "refs/%s/%s" +const creationClockPattern = "%s-create" +const editClockPattern = "%s-edit" + type Operation interface { - // Id() Id + Id() Id // MarshalJSON() ([]byte, error) Validate() error - - base() *OpBase } type OperationIterator struct { } type Definition struct { - namespace string + // the name of the entity (bug, pull-request, ...) + typename string + // the namespace in git (bugs, prs, ...) + namespace string + // a function decoding a JSON message into an Operation operationUnmarshaler func(raw json.RawMessage) (Operation, error) - formatVersion uint + // the expected format version number + formatVersion uint } type Entity struct { Definition - ops []Operation + ops []Operation + staging []Operation + + packClock lamport.Clock + lastCommit repository.Hash } func New(definition Definition) *Entity { return &Entity{ Definition: definition, + packClock: lamport.NewMemClock(), } } @@ -93,19 +106,15 @@ func Read(def Definition, repo repository.ClockedRepo, id Id) (*Entity, error) { oppMap := make(map[repository.Hash]*operationPack) var opsCount int + var packClock = lamport.NewMemClock() - rootCommit := DFSOrder[len(DFSOrder)-1] - rootOpp, err := readOperationPack(def, repo, rootCommit.TreeHash) - if err != nil { - return nil, err - } - oppMap[rootCommit.Hash] = rootOpp - - for i := len(DFSOrder) - 2; i >= 0; i-- { + for i := len(DFSOrder) - 1; i >= 0; i-- { commit := DFSOrder[i] + firstCommit := i == len(DFSOrder)-1 - // Verify DAG structure: single chronological root - if len(commit.Parents) == 0 { + // Verify DAG structure: single chronological root, so only the root + // can have no parents + if !firstCommit && len(commit.Parents) == 0 { return nil, fmt.Errorf("multiple root in the entity DAG") } @@ -114,6 +123,17 @@ func Read(def Definition, repo repository.ClockedRepo, id Id) (*Entity, error) { return nil, err } + // Check that the lamport clocks are set + if firstCommit && opp.CreateTime <= 0 { + return nil, fmt.Errorf("creation lamport time not set") + } + if opp.EditTime <= 0 { + return nil, fmt.Errorf("edition lamport time not set") + } + if opp.PackTime <= 0 { + return nil, fmt.Errorf("pack lamport time not set") + } + // make sure that the lamport clocks causality match the DAG topology for _, parentHash := range commit.Parents { parentPack, ok := oppMap[parentHash] @@ -136,6 +156,22 @@ func Read(def Definition, repo repository.ClockedRepo, id Id) (*Entity, error) { opsCount += len(opp.Operations) } + // The clocks are fine, we witness them + for _, opp := range oppMap { + err = repo.Witness(fmt.Sprintf(creationClockPattern, def.namespace), opp.CreateTime) + if err != nil { + return nil, err + } + err = repo.Witness(fmt.Sprintf(editClockPattern, def.namespace), opp.EditTime) + if err != nil { + return nil, err + } + err = packClock.Witness(opp.PackTime) + if err != nil { + return nil, err + } + } + // Now that we know that the topological order and clocks are fine, we order the operationPacks // based on the logical clocks, entirely ignoring the DAG topology @@ -145,7 +181,8 @@ func Read(def Definition, repo repository.ClockedRepo, id Id) (*Entity, error) { } sort.Slice(oppSlice, func(i, j int) bool { // TODO: no secondary ordering? - return oppSlice[i].EditTime < oppSlice[i].EditTime + // might be useful for stable ordering + return oppSlice[i].PackTime < oppSlice[i].PackTime }) // Now that we ordered the operationPacks, we have the order of the Operations @@ -160,22 +197,135 @@ func Read(def Definition, repo repository.ClockedRepo, id Id) (*Entity, error) { return &Entity{ Definition: def, ops: ops, + lastCommit: rootHash, }, nil } -func Remove() error { - panic("") +// Id return the Entity identifier +func (e *Entity) Id() Id { + // id is the id of the first operation + return e.FirstOp().Id() } -func (e *Entity) Id() { +func (e *Entity) Validate() error { + // non-empty + if len(e.ops) == 0 && len(e.staging) == 0 { + return fmt.Errorf("entity has no operations") + } + + // check if each operations are valid + for _, op := range e.ops { + if err := op.Validate(); err != nil { + return err + } + } + + // check if staging is valid if needed + for _, op := range e.staging { + if err := op.Validate(); err != nil { + return err + } + } + + // Check that there is no colliding operation's ID + ids := make(map[Id]struct{}) + for _, op := range e.Operations() { + if _, ok := ids[op.Id()]; ok { + return fmt.Errorf("id collision: %s", op.Id()) + } + ids[op.Id()] = struct{}{} + } + return nil } // return the ordered operations func (e *Entity) Operations() []Operation { - return e.ops + return append(e.ops, e.staging...) } -func (e *Entity) Commit() error { - panic("") +// Lookup for the very first operation of the Entity. +func (e *Entity) FirstOp() Operation { + for _, op := range e.ops { + return op + } + for _, op := range e.staging { + return op + } + return nil +} + +func (e *Entity) Append(op Operation) { + e.staging = append(e.staging, op) +} + +func (e *Entity) NeedCommit() bool { + return len(e.staging) > 0 +} + +func (e *Entity) CommitAdNeeded(repo repository.ClockedRepo) error { + if e.NeedCommit() { + return e.Commit(repo) + } + return nil +} + +func (e *Entity) Commit(repo repository.ClockedRepo) error { + if !e.NeedCommit() { + return fmt.Errorf("can't commit an entity with no pending operation") + } + + if err := e.Validate(); err != nil { + return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.typename) + } + + // increment the various clocks for this new operationPack + packTime, err := e.packClock.Increment() + if err != nil { + return err + } + editTime, err := repo.Increment(fmt.Sprintf(editClockPattern, e.namespace)) + if err != nil { + return err + } + var creationTime lamport.Time + if e.lastCommit == "" { + creationTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.namespace)) + if err != nil { + return err + } + } + + opp := &operationPack{ + Operations: e.staging, + CreateTime: creationTime, + EditTime: editTime, + PackTime: packTime, + } + + treeHash, err := opp.write(e.Definition, repo) + if err != nil { + return err + } + + // Write a Git commit referencing the tree, with the previous commit as parent + var commitHash repository.Hash + if e.lastCommit != "" { + commitHash, err = repo.StoreCommitWithParent(treeHash, e.lastCommit) + } else { + commitHash, err = repo.StoreCommit(treeHash) + } + if err != nil { + return err + } + + e.lastCommit = commitHash + e.ops = append(e.ops, e.staging...) + e.staging = nil + + // Create or update the Git reference for this entity + // When pushing later, the remote will ensure that this ref update + // is fast-forward, that is no data has been overwritten. + ref := fmt.Sprintf(refsPattern, e.namespace, e.Id().String()) + return repo.UpdateRef(ref, commitHash) } -- cgit