aboutsummaryrefslogtreecommitdiffstats
path: root/entity/dag/entity.go
blob: 273e6ad12e29890f98808fe14e74f09d83931c07 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
// Package dag contains the base common code to define an entity stored
// in a chain of git objects, supporting actions like Push, Pull and Merge.
package dag

import (
	"encoding/json"
	"fmt"
	"sort"

	"github.com/pkg/errors"

	"github.com/MichaelMure/git-bug/entity"
	"github.com/MichaelMure/git-bug/identity"
	"github.com/MichaelMure/git-bug/repository"
	"github.com/MichaelMure/git-bug/util/lamport"
)

const refsPattern = "refs/%s/%s"
const creationClockPattern = "%s-create"
const editClockPattern = "%s-edit"

// Definition hold the details defining one specialization of an Entity.
type Definition struct {
	// the name of the entity (bug, pull-request, ...)
	typename string
	// the namespace in git (bugs, prs, ...)
	namespace string
	// a function decoding a JSON message into an Operation
	operationUnmarshaler func(author identity.Interface, raw json.RawMessage) (Operation, error)
	// a function loading an identity.Identity from its Id
	identityResolver identity.Resolver
	// the expected format version number, that can be used for data migration/upgrade
	formatVersion uint
}

// Entity is a data structure stored in a chain of git objects, supporting actions like Push, Pull and Merge.
type Entity struct {
	Definition

	// operations that are already stored in the repository
	ops []Operation
	// operations not yet stored in the repository
	staging []Operation

	// TODO: add here createTime and editTime

	// // TODO: doesn't seems to actually be useful over the topological sort ? Timestamp can be generated from graph depth
	// // TODO: maybe EditTime is better because it could spread ops in consecutive groups on the logical timeline --> avoid interleaving
	// packClock  lamport.Clock
	lastCommit repository.Hash
}

// New create an empty Entity
func New(definition Definition) *Entity {
	return &Entity{
		Definition: definition,
		// packClock:  lamport.NewMemClock(),
	}
}

// Read will read and decode a stored local Entity from a repository
func Read(def Definition, repo repository.ClockedRepo, id entity.Id) (*Entity, error) {
	if err := id.Validate(); err != nil {
		return nil, errors.Wrap(err, "invalid id")
	}

	ref := fmt.Sprintf("refs/%s/%s", def.namespace, id.String())

	return read(def, repo, ref)
}

// readRemote will read and decode a stored remote Entity from a repository
func readRemote(def Definition, repo repository.ClockedRepo, remote string, id entity.Id) (*Entity, error) {
	if err := id.Validate(); err != nil {
		return nil, errors.Wrap(err, "invalid id")
	}

	ref := fmt.Sprintf("refs/remotes/%s/%s/%s", def.namespace, remote, id.String())

	return read(def, repo, ref)
}

// read fetch from git and decode an Entity at an arbitrary git reference.
func read(def Definition, repo repository.ClockedRepo, ref string) (*Entity, error) {
	rootHash, err := repo.ResolveRef(ref)
	if err != nil {
		return nil, err
	}

	// Perform a breadth-first search to get a topological order of the DAG where we discover the
	// parents commit and go back in time up to the chronological root

	queue := make([]repository.Hash, 0, 32)
	visited := make(map[repository.Hash]struct{})
	BFSOrder := make([]repository.Commit, 0, 32)

	queue = append(queue, rootHash)
	visited[rootHash] = struct{}{}

	for len(queue) > 0 {
		// pop
		hash := queue[0]
		queue = queue[1:]

		commit, err := repo.ReadCommit(hash)
		if err != nil {
			return nil, err
		}

		BFSOrder = append(BFSOrder, commit)

		for _, parent := range commit.Parents {
			if _, ok := visited[parent]; !ok {
				queue = append(queue, parent)
				// mark as visited
				visited[parent] = struct{}{}
			}
		}
	}

	// Now, we can reverse this topological order and read the commits in an order where
	// we are sure to have read all the chronological ancestors when we read a commit.

	// Next step is to:
	// 1) read the operationPacks
	// 2) make sure that the clocks causality respect the DAG topology.

	oppMap := make(map[repository.Hash]*operationPack)
	var opsCount int
	// var packClock = lamport.NewMemClock()

	for i := len(BFSOrder) - 1; i >= 0; i-- {
		commit := BFSOrder[i]
		isFirstCommit := i == len(BFSOrder)-1
		isMerge := len(commit.Parents) > 1

		// Verify DAG structure: single chronological root, so only the root
		// can have no parents. Said otherwise, the DAG need to have exactly
		// one leaf.
		if !isFirstCommit && len(commit.Parents) == 0 {
			return nil, fmt.Errorf("multiple leafs in the entity DAG")
		}

		opp, err := readOperationPack(def, repo, commit)
		if err != nil {
			return nil, err
		}

		err = opp.Validate()
		if err != nil {
			return nil, err
		}

		// Check that the create lamport clock is set (not checked in Validate() as it's optional)
		if isFirstCommit && opp.CreateTime <= 0 {
			return nil, fmt.Errorf("creation lamport time not set")
		}

		// make sure that the lamport clocks causality match the DAG topology
		for _, parentHash := range commit.Parents {
			parentPack, ok := oppMap[parentHash]
			if !ok {
				panic("DFS failed")
			}

			if parentPack.EditTime >= opp.EditTime {
				return nil, fmt.Errorf("lamport clock ordering doesn't match the DAG")
			}

			// to avoid an attack where clocks are pushed toward the uint64 rollover, make sure
			// that the clocks don't jump too far in the future
			// we ignore merge commits here to allow merging after a loooong time without breaking anything,
			// as long as there is one valid chain of small hops, it's fine.
			if !isMerge && opp.EditTime-parentPack.EditTime > 1_000_000 {
				return nil, fmt.Errorf("lamport clock jumping too far in the future, likely an attack")
			}

			// TODO: PackTime is not checked
		}

		oppMap[commit.Hash] = opp
		opsCount += len(opp.Operations)
	}

	// The clocks are fine, we witness them
	for _, opp := range oppMap {
		err = repo.Witness(fmt.Sprintf(creationClockPattern, def.namespace), opp.CreateTime)
		if err != nil {
			return nil, err
		}
		err = repo.Witness(fmt.Sprintf(editClockPattern, def.namespace), opp.EditTime)
		if err != nil {
			return nil, err
		}
		// err = packClock.Witness(opp.PackTime)
		// if err != nil {
		// 	return nil, err
		// }
	}

	// Now that we know that the topological order and clocks are fine, we order the operationPacks
	// based on the logical clocks, entirely ignoring the DAG topology

	oppSlice := make([]*operationPack, 0, len(oppMap))
	for _, pack := range oppMap {
		oppSlice = append(oppSlice, pack)
	}
	sort.Slice(oppSlice, func(i, j int) bool {
		// Primary ordering with the dedicated "pack" Lamport time that encode causality
		// within the entity
		// if oppSlice[i].PackTime != oppSlice[j].PackTime {
		// 	return oppSlice[i].PackTime < oppSlice[i].PackTime
		// }
		// We have equal PackTime, which means we had a concurrent edition. We can't tell which exactly
		// came first. As a secondary arbitrary ordering, we can use the EditTime. It's unlikely to be
		// enough but it can give us an edge to approach what really happened.
		if oppSlice[i].EditTime != oppSlice[j].EditTime {
			return oppSlice[i].EditTime < oppSlice[j].EditTime
		}
		// Well, what now? We still need a total ordering and the most stable possible.
		// As a last resort, we can order based on a hash of the serialized Operations in the
		// operationPack. It doesn't carry much meaning but it's unbiased and hard to abuse.
		// This is a lexicographic ordering on the stringified ID.
		return oppSlice[i].Id() < oppSlice[j].Id()
	})

	// Now that we ordered the operationPacks, we have the order of the Operations

	ops := make([]Operation, 0, opsCount)
	for _, pack := range oppSlice {
		for _, operation := range pack.Operations {
			ops = append(ops, operation)
		}
	}

	return &Entity{
		Definition: def,
		ops:        ops,
		// packClock:  packClock,
		lastCommit: rootHash,
	}, nil
}

type StreamedEntity struct {
	Entity *Entity
	Err    error
}

// ReadAll read and parse all local Entity
func ReadAll(def Definition, repo repository.ClockedRepo) <-chan StreamedEntity {
	out := make(chan StreamedEntity)

	go func() {
		defer close(out)

		refPrefix := fmt.Sprintf("refs/%s/", def.namespace)

		refs, err := repo.ListRefs(refPrefix)
		if err != nil {
			out <- StreamedEntity{Err: err}
			return
		}

		for _, ref := range refs {
			e, err := read(def, repo, ref)

			if err != nil {
				out <- StreamedEntity{Err: err}
				return
			}

			out <- StreamedEntity{Entity: e}
		}
	}()

	return out
}

// Id return the Entity identifier
func (e *Entity) Id() entity.Id {
	// id is the id of the first operation
	return e.FirstOp().Id()
}

// Validate check if the Entity data is valid
func (e *Entity) Validate() error {
	// non-empty
	if len(e.ops) == 0 && len(e.staging) == 0 {
		return fmt.Errorf("entity has no operations")
	}

	// check if each operations are valid
	for _, op := range e.ops {
		if err := op.Validate(); err != nil {
			return err
		}
	}

	// check if staging is valid if needed
	for _, op := range e.staging {
		if err := op.Validate(); err != nil {
			return err
		}
	}

	// Check that there is no colliding operation's ID
	ids := make(map[entity.Id]struct{})
	for _, op := range e.Operations() {
		if _, ok := ids[op.Id()]; ok {
			return fmt.Errorf("id collision: %s", op.Id())
		}
		ids[op.Id()] = struct{}{}
	}

	return nil
}

// Operations return the ordered operations
func (e *Entity) Operations() []Operation {
	return append(e.ops, e.staging...)
}

// FirstOp lookup for the very first operation of the Entity
func (e *Entity) FirstOp() Operation {
	for _, op := range e.ops {
		return op
	}
	for _, op := range e.staging {
		return op
	}
	return nil
}

// LastOp lookup for the very last operation of the Entity
func (e *Entity) LastOp() Operation {
	if len(e.staging) > 0 {
		return e.staging[len(e.staging)-1]
	}
	if len(e.ops) > 0 {
		return e.ops[len(e.ops)-1]
	}
	return nil
}

// Append add a new Operation to the Entity
func (e *Entity) Append(op Operation) {
	e.staging = append(e.staging, op)
}

// NeedCommit indicate if the in-memory state changed and need to be commit in the repository
func (e *Entity) NeedCommit() bool {
	return len(e.staging) > 0
}

// CommitAdNeeded execute a Commit only if necessary. This function is useful to avoid getting an error if the Entity
// is already in sync with the repository.
func (e *Entity) CommitAdNeeded(repo repository.ClockedRepo) error {
	if e.NeedCommit() {
		return e.Commit(repo)
	}
	return nil
}

// Commit write the appended operations in the repository
func (e *Entity) Commit(repo repository.ClockedRepo) error {
	if !e.NeedCommit() {
		return fmt.Errorf("can't commit an entity with no pending operation")
	}

	if err := e.Validate(); err != nil {
		return errors.Wrapf(err, "can't commit a %s with invalid data", e.Definition.typename)
	}

	var author identity.Interface
	for _, op := range e.staging {
		if author != nil && op.Author() != author {
			return fmt.Errorf("operations with different author")
		}
		author = op.Author()
	}

	// increment the various clocks for this new operationPack
	// packTime, err := e.packClock.Increment()
	// if err != nil {
	// 	return err
	// }
	editTime, err := repo.Increment(fmt.Sprintf(editClockPattern, e.namespace))
	if err != nil {
		return err
	}
	var creationTime lamport.Time
	if e.lastCommit == "" {
		creationTime, err = repo.Increment(fmt.Sprintf(creationClockPattern, e.namespace))
		if err != nil {
			return err
		}
	}

	opp := &operationPack{
		Author:     author,
		Operations: e.staging,
		CreateTime: creationTime,
		EditTime:   editTime,
		// PackTime:   packTime,
	}

	var commitHash repository.Hash
	if e.lastCommit == "" {
		commitHash, err = opp.Write(e.Definition, repo)
	} else {
		commitHash, err = opp.Write(e.Definition, repo, e.lastCommit)
	}

	if err != nil {
		return err
	}

	e.lastCommit = commitHash
	e.ops = append(e.ops, e.staging...)
	e.staging = nil

	// Create or update the Git reference for this entity
	// When pushing later, the remote will ensure that this ref update
	// is fast-forward, that is no data has been overwritten.
	ref := fmt.Sprintf(refsPattern, e.namespace, e.Id().String())
	return repo.UpdateRef(ref, commitHash)
}