diff options
Diffstat (limited to 'entity/dag/entity.go')
-rw-r--r-- | entity/dag/entity.go | 439 |
1 files changed, 439 insertions, 0 deletions
diff --git a/entity/dag/entity.go b/entity/dag/entity.go new file mode 100644 index 00000000..c4368514 --- /dev/null +++ b/entity/dag/entity.go @@ -0,0 +1,439 @@ +// Package dag contains the base common code to define an entity stored +// in a chain of git objects, supporting actions like Push, Pull and Merge. +package dag + +import ( + "encoding/json" + "fmt" + "sort" + + "github.com/pkg/errors" + + "github.com/MichaelMure/git-bug/entity" + "github.com/MichaelMure/git-bug/identity" + "github.com/MichaelMure/git-bug/repository" + "github.com/MichaelMure/git-bug/util/lamport" +) + +const refsPattern = "refs/%s/%s" +const creationClockPattern = "%s-create" +const editClockPattern = "%s-edit" + +// Definition hold the details defining one specialization of an Entity. +type Definition struct { + // the name of the entity (bug, pull-request, ...) + Typename string + // the Namespace in git (bugs, prs, ...) + Namespace string + // a function decoding a JSON message into an Operation + OperationUnmarshaler func(author identity.Interface, raw json.RawMessage) (Operation, error) + // the expected format version number, that can be used for data migration/upgrade + FormatVersion uint +} + +// Entity is a data structure stored in a chain of git objects, supporting actions like Push, Pull and Merge. +type Entity struct { + // A Lamport clock is a logical clock that allow to order event + // inside a distributed system. + // It must be the first field in this struct due to https://github.com/golang/go/issues/36606 + createTime lamport.Time + editTime lamport.Time + + Definition + + // operations that are already stored in the repository + ops []Operation + // operations not yet stored in the repository + staging []Operation + + lastCommit repository.Hash +} + +// New create an empty Entity +func New(definition Definition) *Entity { + return &Entity{ + Definition: definition, + } +} + +// Read will read and decode a stored local Entity from a repository +func Read(def Definition, repo repository.ClockedRepo, resolver identity.Resolver, id entity.Id) (*Entity, error) { + if err := id.Validate(); err != nil { + return nil, errors.Wrap(err, "invalid id") + } + + ref := fmt.Sprintf("refs/%s/%s", def.Namespace, id.String()) + + return read(def, repo, resolver, ref) +} + +// readRemote will read and decode a stored remote Entity from a repository +func readRemote(def Definition, repo repository.ClockedRepo, resolver identity.Resolver, remote string, id entity.Id) (*Entity, error) { + if err := id.Validate(); err != nil { + return nil, errors.Wrap(err, "invalid id") + } + + ref := fmt.Sprintf("refs/remotes/%s/%s/%s", def.Namespace, remote, id.String()) + + return read(def, repo, resolver, ref) +} + +// read fetch from git and decode an Entity at an arbitrary git reference. +func read(def Definition, repo repository.ClockedRepo, resolver identity.Resolver, ref string) (*Entity, error) { + rootHash, err := repo.ResolveRef(ref) + if err != nil { + return nil, err + } + + // Perform a breadth-first search to get a topological order of the DAG where we discover the + // parents commit and go back in time up to the chronological root + + queue := make([]repository.Hash, 0, 32) + visited := make(map[repository.Hash]struct{}) + BFSOrder := make([]repository.Commit, 0, 32) + + queue = append(queue, rootHash) + visited[rootHash] = struct{}{} + + for len(queue) > 0 { + // pop + hash := queue[0] + queue = queue[1:] + + commit, err := repo.ReadCommit(hash) + if err != nil { + return nil, err + } + + BFSOrder = append(BFSOrder, commit) + + for _, parent := range commit.Parents { + if _, ok := visited[parent]; !ok { + queue = append(queue, parent) + // mark as visited + visited[parent] = struct{}{} + } + } + } + + // Now, we can reverse this topological order and read the commits in an order where + // we are sure to have read all the chronological ancestors when we read a commit. + + // Next step is to: + // 1) read the operationPacks + // 2) make sure that the clocks causality respect the DAG topology. + + oppMap := make(map[repository.Hash]*operationPack) + var opsCount int + + for i := len(BFSOrder) - 1; i >= 0; i-- { + commit := BFSOrder[i] + isFirstCommit := i == len(BFSOrder)-1 + isMerge := len(commit.Parents) > 1 + + // Verify DAG structure: single chronological root, so only the root + // can have no parents. Said otherwise, the DAG need to have exactly + // one leaf. + if !isFirstCommit && len(commit.Parents) == 0 { + return nil, fmt.Errorf("multiple leafs in the entity DAG") + } + + opp, err := readOperationPack(def, repo, resolver, commit) + if err != nil { + return nil, err + } + + err = opp.Validate() + if err != nil { + return nil, err + } + + if isMerge && len(opp.Operations) > 0 { + return nil, fmt.Errorf("merge commit cannot have operations") + } + + // Check that the create lamport clock is set (not checked in Validate() as it's optional) + if isFirstCommit && opp.CreateTime <= 0 { + return nil, fmt.Errorf("creation lamport time not set") + } + + // make sure that the lamport clocks causality match the DAG topology + for _, parentHash := range commit.Parents { + parentPack, ok := oppMap[parentHash] + if !ok { + panic("DFS failed") + } + + if parentPack.EditTime >= opp.EditTime { + return nil, fmt.Errorf("lamport clock ordering doesn't match the DAG") + } + + // to avoid an attack where clocks are pushed toward the uint64 rollover, make sure + // that the clocks don't jump too far in the future + // we ignore merge commits here to allow merging after a loooong time without breaking anything, + // as long as there is one valid chain of small hops, it's fine. + if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 { + return nil, fmt.Errorf("lamport clock jumping too far in the future, likely an attack") + } + } + + oppMap[commit.Hash] = opp + opsCount += len(opp.Operations) + } + + // The clocks are fine, we witness them + for _, opp := range oppMap { + err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), opp.CreateTime) + if err != nil { + return nil, err + } + err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), opp.EditTime) + if err != nil { + return nil, err + } + } + + // Now that we know that the topological order and clocks are fine, we order the operationPacks + // based on the logical clocks, entirely ignoring the DAG topology + + oppSlice := make([]*operationPack, 0, len(oppMap)) + for _, pack := range oppMap { + oppSlice = append(oppSlice, pack) + } + sort.Slice(oppSlice, func(i, j int) bool { + // Primary ordering with the EditTime. + if oppSlice[i].EditTime != oppSlice[j].EditTime { + return oppSlice[i].EditTime < oppSlice[j].EditTime + } + // We have equal EditTime, which means we have concurrent edition over different machines and we + // can't tell which one came first. So, what now? We still need a total ordering and the most stable possible. + // As a secondary ordering, we can order based on a hash of the serialized Operations in the + // operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse. + // This is a lexicographic ordering on the stringified ID. + return oppSlice[i].Id() < oppSlice[j].Id() + }) + + // Now that we ordered the operationPacks, we have the order of the Operations + + ops := make([]Operation, 0, opsCount) + var createTime lamport.Time + var editTime lamport.Time + for _, pack := range oppSlice { + for _, operation := range pack.Operations { + ops = append(ops, operation) + } + if pack.CreateTime > createTime { + createTime = pack.CreateTime + } + if pack.EditTime > editTime { + editTime = pack.EditTime + } + } + + return &Entity{ + Definition: def, + ops: ops, + lastCommit: rootHash, + createTime: createTime, + editTime: editTime, + }, nil +} + +type StreamedEntity struct { + Entity *Entity + Err error +} + +// ReadAll read and parse all local Entity +func ReadAll(def Definition, repo repository.ClockedRepo, resolver identity.Resolver) <-chan StreamedEntity { + out := make(chan StreamedEntity) + + go func() { + defer close(out) + + refPrefix := fmt.Sprintf("refs/%s/", def.Namespace) + + refs, err := repo.ListRefs(refPrefix) + if err != nil { + out <- StreamedEntity{Err: err} + return + } + + for _, ref := range refs { + e, err := read(def, repo, resolver, ref) + + if err != nil { + out <- StreamedEntity{Err: err} + return + } + + out <- StreamedEntity{Entity: e} + } + }() + + return out +} + +// Id return the Entity identifier +func (e *Entity) Id() entity.Id { + // id is the id of the first operation + return e.FirstOp().Id() +} + +// Validate check if the Entity data is valid +func (e *Entity) Validate() error { + // non-empty + if len(e.ops) == 0 && len(e.staging) == 0 { + return fmt.Errorf("entity has no operations") + } + + // check if each operations are valid + for _, op := range e.ops { + if err := op.Validate(); err != nil { + return err + } + } + + // check if staging is valid if needed + for _, op := range e.staging { + if err := op.Validate(); err != nil { + return err + } + } + + // Check that there is no colliding operation's ID + ids := make(map[entity.Id]struct{}) + for _, op := range e.Operations() { + if _, ok := ids[op.Id()]; ok { + return fmt.Errorf("id collision: %s", op.Id()) + } + ids[op.Id()] = struct{}{} + } + + return nil +} + +// Operations return the ordered operations +func (e *Entity) Operations() []Operation { + return append(e.ops, e.staging...) +} + +// FirstOp lookup for the very first operation of the Entity +func (e *Entity) FirstOp() Operation { + for _, op := range e.ops { + return op + } + for _, op := range e.staging { + return op + } + return nil +} + +// LastOp lookup for the very last operation of the Entity +func (e *Entity) LastOp() Operation { + if len(e.staging) > 0 { + return e.staging[len(e.staging)-1] + } + if len(e.ops) > 0 { + return e.ops[len(e.ops)-1] + } + return nil +} + +// Append add a new Operation to the Entity +func (e *Entity) Append(op Operation) { + e.staging = append(e.staging, op) +} + +// NeedCommit indicate if the in-memory state changed and need to be commit in the repository +func (e *Entity) NeedCommit() bool { + return len(e.staging) > 0 +} + +// CommitAsNeeded execute a Commit only if necessary. This function is useful to avoid getting an error if the Entity +// is already in sync with the repository. +func (e *Entity) CommitAsNeeded(repo repository.ClockedRepo) error { + if e.NeedCommit() { + return e.Commit(repo) + } + return nil +} + +// Commit write the appended operations in the repository +func (e *Entity) Commit(repo repository.ClockedRepo) error { + if !e.NeedCommit() { + return fmt.Errorf("can't commit an entity with no pending operation") + } + + err := e.Validate() + if err != nil { + return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.Typename) + } + + for len(e.staging) > 0 { + var author identity.Interface + var toCommit []Operation + + // Split into chunks with the same author + for len(e.staging) > 0 { + op := e.staging[0] + if author != nil && op.Author().Id() != author.Id() { + break + } + author = e.staging[0].Author() + toCommit = append(toCommit, op) + e.staging = e.staging[1:] + } + + e.editTime, err = repo.Increment(fmt.Sprintf(editClockPattern, e.Namespace)) + if err != nil { + return err + } + + opp := &operationPack{ + Author: author, + Operations: toCommit, + EditTime: e.editTime, + } + + if e.lastCommit == "" { + e.createTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.Namespace)) + if err != nil { + return err + } + opp.CreateTime = e.createTime + } + + var parentCommit []repository.Hash + if e.lastCommit != "" { + parentCommit = []repository.Hash{e.lastCommit} + } + + commitHash, err := opp.Write(e.Definition, repo, parentCommit...) + if err != nil { + return err + } + + e.lastCommit = commitHash + e.ops = append(e.ops, toCommit...) + } + + // not strictly necessary but make equality testing easier in tests + e.staging = nil + + // Create or update the Git reference for this entity + // When pushing later, the remote will ensure that this ref update + // is fast-forward, that is no data has been overwritten. + ref := fmt.Sprintf(refsPattern, e.Namespace, e.Id().String()) + return repo.UpdateRef(ref, e.lastCommit) +} + +// CreateLamportTime return the Lamport time of creation +func (e *Entity) CreateLamportTime() lamport.Time { + return e.createTime +} + +// EditLamportTime return the Lamport time of the last edition +func (e *Entity) EditLamportTime() lamport.Time { + return e.editTime +} |