// Package dag contains the base common code to define an entity stored
// in a chain of git objects, supporting actions like Push, Pull and Merge.
package dag
import (
"encoding/json"
"fmt"
"sort"
"github.com/pkg/errors"
"github.com/MichaelMure/git-bug/entities/identity"
"github.com/MichaelMure/git-bug/entity"
"github.com/MichaelMure/git-bug/repository"
"github.com/MichaelMure/git-bug/util/lamport"
)
const refsPattern = "refs/%s/%s"
const creationClockPattern = "%s-create"
const editClockPattern = "%s-edit"
type OperationUnmarshaler func(raw json.RawMessage, resolver entity.Resolvers) (Operation, error)
// Definition hold the details defining one specialization of an Entity.
type Definition struct {
// the name of the entity (bug, pull-request, ...), for human consumption
Typename string
// the Namespace in git references (bugs, prs, ...)
Namespace string
// a function decoding a JSON message into an Operation
OperationUnmarshaler OperationUnmarshaler
// the expected format version number, that can be used for data migration/upgrade
FormatVersion uint
}
type Actions[EntityT entity.Interface] struct {
Wrap func(e *Entity) EntityT
New func() EntityT
Read func(repo repository.ClockedRepo, id entity.Id) (EntityT, error)
ReadWithResolver func(repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error)
ReadAll func(repo repository.ClockedRepo) <-chan StreamedEntity[EntityT]
ListLocalIds func(repo repository.Repo) ([]entity.Id, error)
Fetch func(repo repository.Repo, remote string) (string, error)
Push func(repo repository.Repo, remote string) (string, error)
Pull func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) error
MergeAll func(repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, mergeAuthor identity.Interface) <-chan entity.MergeResult
}
// Entity is a data structure stored in a chain of git objects, supporting actions like Push, Pull and Merge.
type Entity struct {
// A Lamport clock is a logical clock that allow to order event
// inside a distributed system.
// It must be the first field in this struct due to https://github.com/golang/go/issues/36606
createTime lamport.Time
editTime lamport.Time
Definition
// operations that are already stored in the repository
ops []Operation
// operations not yet stored in the repository
staging []Operation
lastCommit repository.Hash
}
// New create an empty Entity
func New(definition Definition) *Entity {
return &Entity{
Definition: definition,
}
}
// Read will read and decode a stored local Entity from a repository
func Read[EntityT entity.Interface](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, id entity.Id) (EntityT, error) {
if err := id.Validate(); err != nil {
return *new(EntityT), errors.Wrap(err, "invalid id")
}
ref := fmt.Sprintf("refs/%s/%s", def.Namespace, id.String())
return read[EntityT](def, wrapper, repo, resolvers, ref)
}
// readRemote will read and decode a stored remote Entity from a repository
func readRemote[EntityT entity.Interface](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, remote string, id entity.Id) (EntityT, error) {
if err := id.Validate(); err != nil {
return *new(EntityT), errors.Wrap(err, "invalid id")
}
ref := fmt.Sprintf("refs/remotes/%s/%s/%s", def.Namespace, remote, id.String())
return read[EntityT](def, wrapper, repo, resolvers, ref)
}
// read fetch from git and decode an Entity at an arbitrary git reference.
func read[EntityT entity.Interface](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers, ref string) (EntityT, error) {
rootHash, err := repo.ResolveRef(ref)
if err != nil {
return *new(EntityT), err
}
// Perform a breadth-first search to get a topological order of the DAG where we discover the
// parents commit and go back in time up to the chronological root
queue := make([]repository.Hash, 0, 32)
visited := make(map[repository.Hash]struct{})
BFSOrder := make([]repository.Commit, 0, 32)
queue = append(queue, rootHash)
visited[rootHash] = struct{}{}
for len(queue) > 0 {
// pop
hash := queue[0]
queue = queue[1:]
commit, err := repo.ReadCommit(hash)
if err != nil {
return *new(EntityT), err
}
BFSOrder = append(BFSOrder, commit)
for _, parent := range commit.Parents {
if _, ok := visited[parent]; !ok {
queue = append(queue, parent)
// mark as visited
visited[parent] = struct{}{}
}
}
}
// Now, we can reverse this topological order and read the commits in an order where
// we are sure to have read all the chronological ancestors when we read a commit.
// Next step is to:
// 1) read the operationPacks
// 2) make sure that clocks causality respect the DAG topology.
oppMap := make(map[repository.Hash]*operationPack)
var opsCount int
for i := len(BFSOrder) - 1; i >= 0; i-- {
commit := BFSOrder[i]
isFirstCommit := i == len(BFSOrder)-1
isMerge := len(commit.Parents) > 1
// Verify DAG structure: single chronological root, so only the root
// can have no parents. Said otherwise, the DAG need to have exactly
// one leaf.
if !isFirstCommit && len(commit.Parents) == 0 {
return *new(EntityT), fmt.Errorf("multiple leafs in the entity DAG")
}
opp, err := readOperationPack(def, repo, resolvers, commit)
if err != nil {
return *new(EntityT), err
}
err = opp.Validate()
if err != nil {
return *new(EntityT), err
}
if isMerge && len(opp.Operations) > 0 {
return *new(EntityT), fmt.Errorf("merge commit cannot have operations")
}
// Check that the create lamport clock is set (not checked in Validate() as it's optional)
if isFirstCommit && opp.CreateTime <= 0 {
return *new(EntityT), fmt.Errorf("creation lamport time not set")
}
// make sure that the lamport clocks causality match the DAG topology
for _, parentHash := range commit.Parents {
parentPack, ok := oppMap[parentHash]
if !ok {
panic("DFS failed")
}
if parentPack.EditTime >= opp.EditTime {
return *new(EntityT), fmt.Errorf("lamport clock ordering doesn't match the DAG")
}
// to avoid an attack where clocks are pushed toward the uint64 rollover, make sure
// that the clocks don't jump too far in the future
// we ignore merge commits here to allow merging after a loooong time without breaking anything,
// as long as there is one valid chain of small hops, it's fine.
if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 {
return *new(EntityT), fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
}
}
oppMap[commit.Hash] = opp
opsCount += len(opp.Operations)
}
// The clocks are fine, we witness them
for _, opp := range oppMap {
err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), opp.CreateTime)
if err != nil {
return *new(EntityT), err
}
err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), opp.EditTime)
if err != nil {
return *new(EntityT), err
}
}
// Now that we know that the topological order and clocks are fine, we order the operationPacks
// based on the logical clocks, entirely ignoring the DAG topology
oppSlice := make([]*operationPack, 0, len(oppMap))
for _, pack := range oppMap {
oppSlice = append(oppSlice, pack)
}
sort.Slice(oppSlice, func(i, j int) bool {
// Primary ordering with the EditTime.
if oppSlice[i].EditTime != oppSlice[j].EditTime {
return oppSlice[i].EditTime < oppSlice[j].EditTime
}
// We have equal EditTime, which means we have concurrent edition over different machines, and we
// can't tell which one came first. So, what now? We still need a total ordering and the most stable possible.
// As a secondary ordering, we can order based on a hash of the serialized Operations in the
// operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse.
// This is a lexicographic ordering on the stringified ID.
return oppSlice[i].Id() < oppSlice[j].Id()
})
// Now that we ordered the operationPacks, we have the order of the Operations
ops := make([]Operation, 0, opsCount)
var createTime lamport.Time
var editTime lamport.Time
for _, pack := range oppSlice {
for _, operation := range pack.Operations {
ops = append(ops, operation)
}
if pack.CreateTime > createTime {
createTime = pack.CreateTime
}
if pack.EditTime > editTime {
editTime = pack.EditTime
}
}
return wrapper(&Entity{
Definition: def,
ops: ops,
lastCommit: rootHash,
createTime: createTime,
editTime: editTime,
}), nil
}
// readClockNoCheck fetch from git, read and witness the clocks of an Entity at an arbitrary git reference.
// Note: readClockNoCheck does not verify the integrity of the Entity and could witness incorrect or incomplete
// clocks if so. If data integrity check is a requirement, a flow similar to read without actually reading/decoding
// operation blobs can be implemented instead.
func readClockNoCheck(def Definition, repo repository.ClockedRepo, ref string) error {
rootHash, err := repo.ResolveRef(ref)
if err != nil {
return err
}
commit, err := repo.ReadCommit(rootHash)
if err != nil {
return err
}
createTime, editTime, err := readOperationPackClock(repo, commit)
if err != nil {
return err
}
// if we have more than one commit, we need to find the root to have the create time
if len(commit.Parents) > 0 {
for len(commit.Parents) > 0 {
// The path to the root is irrelevant.
commit, err = repo.ReadCommit(commit.Parents[0])
if err != nil {
return err
}
}
createTime, _, err = readOperationPackClock(repo, commit)
if err != nil {
return err
}
}
if createTime <= 0 {
return fmt.Errorf("creation lamport time not set")
}
if editTime <= 0 {
return fmt.Errorf("creation lamport time not set")
}
err = repo.Witness(fmt.Sprintf(creationClockPattern, def.Namespace), createTime)
if err != nil {
return err
}
err = repo.Witness(fmt.Sprintf(editClockPattern, def.Namespace), editTime)
if err != nil {
return err
}
return nil
}
type StreamedEntity[EntityT entity.Interface] struct {
Entity EntityT
Err error
}
// ReadAll read and parse all local Entity
func ReadAll[EntityT entity.Interface](def Definition, wrapper func(e *Entity) EntityT, repo repository.ClockedRepo, resolvers entity.Resolvers) <-chan StreamedEntity[EntityT] {
out := make(chan StreamedEntity[EntityT])
go func() {
defer close(out)
refPrefix := fmt.Sprintf("refs/%s/", def.Namespace)
refs, err := repo.ListRefs(refPrefix)
if err != nil {
out <- StreamedEntity[EntityT]{Err: err}
return
}
for _, ref := range refs {
e, err := read[EntityT](def, wrapper, repo, resolvers, ref)
if err != nil {
out <- StreamedEntity[EntityT]{Err: err}
return
}
out <- StreamedEntity[EntityT]{Entity: e}
}
}()
return out
}
// ReadAllClocksNoCheck goes over all entities matching Definition and read/witness the corresponding clocks so that the
// repo end up with correct clocks for the next write.
func ReadAllClocksNoCheck(def Definition, repo repository.ClockedRepo) error {
refPrefix := fmt.Sprintf("refs/%s/", def.Namespace)
refs, err := repo.ListRefs(refPrefix)
if err != nil {
return err
}
for _, ref := range refs {
err = readClockNoCheck(def, repo, ref)
if err != nil {
return err
}
}
return nil
}
// Id return the Entity identifier
func (e *Entity) Id() entity.Id {
// id is the id of the first operation
return e.FirstOp().Id()
}
// Validate check if the Entity data is valid
func (e *Entity) Validate() error {
// non-empty
if len(e.ops) == 0 && len(e.staging) == 0 {
return fmt.Errorf("entity has no operations")
}
// check if each operation are valid
for _, op := range e.ops {
if err := op.Validate(); err != nil {
return err
}
}
// check if staging is valid if needed
for _, op := range e.staging {
if err := op.Validate(); err != nil {
return err
}
}
// Check that there is no colliding operation's ID
ids := make(map[entity.Id]struct{})
for _, op := range e.Operations() {
if _, ok := ids[op.Id()]; ok {
return fmt.Errorf("id collision: %s", op.Id())
}
ids[op.Id()] = struct{}{}
}
return nil
}
// Operations return the ordered operations
func (e *Entity) Operations() []Operation {
return append(e.ops, e.staging...)
}
// FirstOp lookup for the very first operation of the Entity
func (e *Entity) FirstOp() Operation {
for _, op := range e.ops {
return op
}
for _, op := range e.staging {
return op
}
return nil
}
// LastOp lookup for the very last operation of the Entity
func (e *Entity) LastOp() Operation {
if len(e.staging) > 0 {
return e.staging[len(e.staging)-1]
}
if len(e.ops) > 0 {
return e.ops[len(e.ops)-1]
}
return nil
}
// Append add a new Operation to the Entity
func (e *Entity) Append(op Operation) {
e.staging = append(e.staging, op)
}
// NeedCommit indicate if the in-memory state changed and need to be commit in the repository
func (e *Entity) NeedCommit() bool {
return len(e.staging) > 0
}
// CommitAsNeeded execute a Commit only if necessary. This function is useful to avoid getting an error if the Entity
// is already in sync with the repository.
func (e *Entity) CommitAsNeeded(repo repository.ClockedRepo) error {
if e.NeedCommit() {
return e.Commit(repo)
}
return nil
}
// Commit write the appended operations in the repository
func (e *Entity) Commit(repo repository.ClockedRepo) error {
if !e.NeedCommit() {
return fmt.Errorf("can't commit an entity with no pending operation")
}
err := e.Validate()
if err != nil {
return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.Typename)
}
for len(e.staging) > 0 {
var author identity.Interface
var toCommit []Operation
// Split into chunks with the same author
for len(e.staging) > 0 {
op := e.staging[0]
if author != nil && op.Author().Id() != author.Id() {
break
}
author = e.staging[0].Author()
toCommit = append(toCommit, op)
e.staging = e.staging[1:]
}
e.editTime, err = repo.Increment(fmt.Sprintf(editClockPattern, e.Namespace))
if err != nil {
return err
}
opp := &operationPack{
Author: author,
Operations: toCommit,
EditTime: e.editTime,
}
if e.lastCommit == "" {
e.createTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.Namespace))
if err != nil {
return err
}
opp.CreateTime = e.createTime
}
var parentCommit []repository.Hash
if e.lastCommit != "" {
parentCommit = []repository.Hash{e.lastCommit}
}
commitHash, err := opp.Write(e.Definition, repo, parentCommit...)
if err != nil {
return err
}
e.lastCommit = commitHash
e.ops = append(e.ops, toCommit...)
}
// not strictly necessary but make equality testing easier in tests
e.staging = nil
// Create or update the Git reference for this entity
// When pushing later, the remote will ensure that this ref update
// is fast-forward, that is no data has been overwritten.
ref := fmt.Sprintf(refsPattern, e.Namespace, e.Id().String())
return repo.UpdateRef(ref, e.lastCommit)
}
// CreateLamportTime return the Lamport time of creation
func (e *Entity) CreateLamportTime() lamport.Time {
return e.createTime
}
// EditLamportTime return the Lamport time of the last edition
func (e *Entity) EditLamportTime() lamport.Time {
return e.editTime
}