// Package dag contains the base common code to define an entity stored // in a chain of git objects, supporting actions like Push, Pull and Merge. package dag import ( "encoding/json" "fmt" "sort" "github.com/pkg/errors" "github.com/MichaelMure/git-bug/entities/identity" "github.com/MichaelMure/git-bug/entity" "github.com/MichaelMure/git-bug/repository" "github.com/MichaelMure/git-bug/util/lamport" ) const refsPattern = "refs/%s/%s" const creationClockPattern = "%s-create" const editClockPattern = "%s-edit" type OperationUnmarshaler func(raw json.RawMessage, resolver entity.Resolvers) (Operation, error) // Definition hold the details defining one specialization of an Entity. type Definition struct { // the name of the entity (bug, pull-request, ...), for human consumption Typename string // the Namespace in git references (bugs, prs, ...) Namespace string // a function decoding a JSON message into an Operation OperationUnmarshaler OperationUnmarshaler // the expected format version number, that can be used for data migration/upgrade FormatVersion uint } // Entity is a data structure stored in a chain of git objects, supporting actions like Push, Pull and Merge. type Entity struct { // A Lamport clock is a logical clock that allow to order event // inside a distributed system. // It must be the first field in this struct due to https://github.com/golang/go/issues/36606 createTime lamport.Time editTime lamport.Time Definition // operations that are already stored in the repository ops []Operation // operations not yet stored in the repository staging []Operation lastCommit repository.Hash } // New create an empty Entity func New(definition Definition) *Entity { return &Entity{ Definition: definition, } } // Read will read and decode a stored local Entity from a repository func Read[EntityT entity.Interface](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error) { if err := id.Validate(); err != nil { return *new(EntityT), errors.Wrap(err, "invalid id") } ref := fmt.Sprintf("refs/%s/%s", def.Namespace, id.String()) return read[EntityT](def, wrapper, repo, resolvers, ref) } // readRemote will read and decode a stored remote Entity from a repository func readRemote[EntityT entity.Interface](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, id entity.Id) (EntityT, error) { if err := id.Validate(); err != nil { return *new(EntityT), errors.Wrap(err, "invalid id") } ref := fmt.Sprintf("refs/remotes/%s/%s/%s", def.Namespace, remote, id.String()) return read[EntityT](def, wrapper, repo, resolvers, ref) } // read fetch from git and decode an Entity at an arbitrary git reference. func read[EntityT entity.Interface](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, ref string) (EntityT, error) { rootHash, err := repo.ResolveRef(ref) if err == repository.ErrNotFound { return *new(EntityT), entity.NewErrNotFound(def.Typename) } if err != nil { return *new(EntityT), err } // Perform a breadth-first search to get a topological order of the DAG where we discover the // parents commit and go back in time up to the chronological root queue := make([]repository.Hash, 0, 32) visited := make(map[repository.Hash]struct{}) BFSOrder := make([]repository.Commit, 0, 32) queue = append(queue, rootHash) visited[rootHash] = struct{}{} for len(queue) > 0 { // pop hash := queue[0] queue = queue[1:] commit, err := repo.ReadCommit(hash) if err != nil { return *new(EntityT), err } BFSOrder = append(BFSOrder, commit) for _, parent := range commit.Parents { if _, ok := visited[parent]; !ok { queue = append(queue, parent) // mark as visited visited[parent] = struct{}{} } } } // Now, we can reverse this topological order and read the commits in an order where // we are sure to have read all the chronological ancestors when we read a commit. // Next step is to: // 1) read the operationPacks // 2) make sure that clocks causality respect the DAG topology. oppMap := make(map[repository.Hash]*operationPack) var opsCount int for i := len(BFSOrder) - 1; i >= 0; i-- { commit := BFSOrder[i] isFirstCommit := i == len(BFSOrder)-1 isMerge := len(commit.Parents) > 1 // Verify DAG structure: single chronological root, so only the root // can have no parents. Said otherwise, the DAG need to have exactly // one leaf. if !isFirstCommit && len(commit.Parents) == 0 { return *new(EntityT), fmt.Errorf("multiple leafs in the entity DAG") } opp, err := readOperationPack(def, repo, resolvers, commit) if err != nil { return *new(EntityT), err } err = opp.Validate() if err != nil { return *new(EntityT), err } if isMerge && len(opp.Operations) > 0 { return *new(EntityT), fmt.Errorf("merge commit cannot have operations") } // Check that the create lamport clock is set (not checked in Validate() as it's optional) if isFirstCommit && opp.CreateTime <= 0 { return *new(EntityT), fmt.Errorf("creation lamport time not set") } // make sure that the lamport clocks causality match the DAG topology for _, parentHash := range commit.Parents { parentPack, ok := oppMap[parentHash] if !ok { panic("DFS failed") } if parentPack.EditTime >= opp.EditTime { return *new(EntityT), fmt.Errorf("lamport clock ordering doesn't match the DAG") } // to avoid an attack where clocks are pushed toward the uint64 rollover, make sure // that the clocks don't jump too far in the future // we ignore merge commits here to allow merging after a loooong time without breaking anything, // as long as there is one valid chain of small hops, it's fine. if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 { return *new(EntityT), fmt.Errorf("lamport clock jumping too far in the future, likely an attack") } } oppMap[commit.Hash] = opp opsCount += len(opp.Operations) } // The clocks are fine, we witness them for _, opp := range oppMap { err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), opp.CreateTime) if err != nil { return *new(EntityT), err } err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), opp.EditTime) if err != nil { return *new(EntityT), err } } // Now that we know that the topological order and clocks are fine, we order the operationPacks // based on the logical clocks, entirely ignoring the DAG topology oppSlice := make([]*operationPack, 0, len(oppMap)) for _, pack := range oppMap { oppSlice = append(oppSlice, pack) } sort.Slice(oppSlice, func(i, j int) bool { // Primary ordering with the EditTime. if oppSlice[i].EditTime != oppSlice[j].EditTime { return oppSlice[i].EditTime < oppSlice[j].EditTime } // We have equal EditTime, which means we have concurrent edition over different machines, and we // can't tell which one came first. So, what now? We still need a total ordering and the most stable possible. // As a secondary ordering, we can order based on a hash of the serialized Operations in the // operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse. // This is a lexicographic ordering on the stringified ID. return oppSlice[i].Id() < oppSlice[j].Id() }) // Now that we ordered the operationPacks, we have the order of the Operations ops := make([]Operation, 0, opsCount) var createTime lamport.Time var editTime lamport.Time for _, pack := range oppSlice { for _, operation := range pack.Operations { ops = append(ops, operation) } if pack.CreateTime > createTime { createTime = pack.CreateTime } if pack.EditTime > editTime { editTime = pack.EditTime } } return wrapper(&Entity{ Definition: def, ops: ops, lastCommit: rootHash, createTime: createTime, editTime: editTime, }), nil } // readClockNoCheck fetch from git, read and witness the clocks of an Entity at an arbitrary git reference. // Note: readClockNoCheck does not verify the integrity of the Entity and could witness incorrect or incomplete // clocks if so. If data integrity check is a requirement, a flow similar to read without actually reading/decoding // operation blobs can be implemented instead. func readClockNoCheck(def Definition, repo repository.ClockedRepo, ref string) error { rootHash, err := repo.ResolveRef(ref) if err == repository.ErrNotFound { return entity.NewErrNotFound(def.Typename) } if err != nil { return err } commit, err := repo.ReadCommit(rootHash) if err != nil { return err } createTime, editTime, err := readOperationPackClock(repo, commit) if err != nil { return err } // if we have more than one commit, we need to find the root to have the create time if len(commit.Parents) > 0 { for len(commit.Parents) > 0 { // The path to the root is irrelevant. commit, err = repo.ReadCommit(commit.Parents[0]) if err != nil { return err } } createTime, _, err = readOperationPackClock(repo, commit) if err != nil { return err } } if createTime <= 0 { return fmt.Errorf("creation lamport time not set") } if editTime <= 0 { return fmt.Errorf("creation lamport time not set") } err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), createTime) if err != nil { return err } err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), editTime) if err != nil { return err } return nil } // ReadAll read and parse all local Entity func ReadAll[EntityT entity.Interface](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan entity.StreamedEntity[EntityT] { out := make(chan entity.StreamedEntity[EntityT]) go func() { defer close(out) refPrefix := fmt.Sprintf("refs/%s/", def.Namespace) refs, err := repo.ListRefs(refPrefix) if err != nil { out <- entity.StreamedEntity[EntityT]{Err: err} return } for _, ref := range refs { e, err := read[EntityT](def, wrapper, repo, resolvers, ref) if err != nil { out <- entity.StreamedEntity[EntityT]{Err: err} return } out <- entity.StreamedEntity[EntityT]{Entity: e} } }() return out } // ReadAllClocksNoCheck goes over all entities matching Definition and read/witness the corresponding clocks so that the // repo end up with correct clocks for the next write. func ReadAllClocksNoCheck(def Definition, repo repository.ClockedRepo) error { refPrefix := fmt.Sprintf("refs/%s/", def.Namespace) refs, err := repo.ListRefs(refPrefix) if err != nil { return err } for _, ref := range refs { err = readClockNoCheck(def, repo, ref) if err != nil { return err } } return nil } // Id return the Entity identifier func (e *Entity) Id() entity.Id { // id is the id of the first operation return e.FirstOp().Id() } // Validate check if the Entity data is valid func (e *Entity) Validate() error { // non-empty if len(e.ops) == 0 && len(e.staging) == 0 { return fmt.Errorf("entity has no operations") } // check if each operation are valid for _, op := range e.ops { if err := op.Validate(); err != nil { return err } } // check if staging is valid if needed for _, op := range e.staging { if err := op.Validate(); err != nil { return err } } // Check that there is no colliding operation's ID ids := make(map[entity.Id]struct{}) for _, op := range e.Operations() { if _, ok := ids[op.Id()]; ok { return fmt.Errorf("id collision: %s", op.Id()) } ids[op.Id()] = struct{}{} } return nil } // Operations return the ordered operations func (e *Entity) Operations() []Operation { return append(e.ops, e.staging...) } // FirstOp lookup for the very first operation of the Entity func (e *Entity) FirstOp() Operation { for _, op := range e.ops { return op } for _, op := range e.staging { return op } return nil } // LastOp lookup for the very last operation of the Entity func (e *Entity) LastOp() Operation { if len(e.staging) > 0 { return e.staging[len(e.staging)-1] } if len(e.ops) > 0 { return e.ops[len(e.ops)-1] } return nil } // Append add a new Operation to the Entity func (e *Entity) Append(op Operation) { e.staging = append(e.staging, op) } // NeedCommit indicate if the in-memory state changed and need to be commit in the repository func (e *Entity) NeedCommit() bool { return len(e.staging) > 0 } // CommitAsNeeded execute a Commit only if necessary. This function is useful to avoid getting an error if the Entity // is already in sync with the repository. func (e *Entity) CommitAsNeeded(repo repository.ClockedRepo) error { if e.NeedCommit() { return e.Commit(repo) } return nil } // Commit write the appended operations in the repository func (e *Entity) Commit(repo repository.ClockedRepo) error { if !e.NeedCommit() { return fmt.Errorf("can't commit an entity with no pending operation") } err := e.Validate() if err != nil { return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.Typename) } for len(e.staging) > 0 { var author identity.Interface var toCommit []Operation // Split into chunks with the same author for len(e.staging) > 0 { op := e.staging[0] if author != nil && op.Author().Id() != author.Id() { break } author = e.staging[0].Author() toCommit = append(toCommit, op) e.staging = e.staging[1:] } e.editTime, err = repo.Increment(fmt.Sprintf(editClockPattern, e.Namespace)) if err != nil { return err } opp := &operationPack{ Author: author, Operations: toCommit, EditTime: e.editTime, } if e.lastCommit == "" { e.createTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.Namespace)) if err != nil { return err } opp.CreateTime = e.createTime } var parentCommit []repository.Hash if e.lastCommit != "" { parentCommit = []repository.Hash{e.lastCommit} } commitHash, err := opp.Write(e.Definition, repo, parentCommit...) if err != nil { return err } e.lastCommit = commitHash e.ops = append(e.ops, toCommit...) } // not strictly necessary but make equality testing easier in tests e.staging = nil // Create or update the Git reference for this entity // When pushing later, the remote will ensure that this ref update // is fast-forward, that is no data has been overwritten. ref := fmt.Sprintf(refsPattern, e.Namespace, e.Id().String()) return repo.UpdateRef(ref, e.lastCommit) } // CreateLamportTime return the Lamport time of creation func (e *Entity) CreateLamportTime() lamport.Time { return e.createTime } // EditLamportTime return the Lamport time of the last edition func (e *Entity) EditLamportTime() lamport.Time { return e.editTime }