diff options
-rw-r--r-- | util/lamport.go | 85 | ||||
-rw-r--r-- | util/lamport_test.go | 66 | ||||
-rw-r--r-- | util/persisted_lamport.go | 81 |
3 files changed, 232 insertions, 0 deletions
diff --git a/util/lamport.go b/util/lamport.go new file mode 100644 index 00000000..c75e1d15 --- /dev/null +++ b/util/lamport.go @@ -0,0 +1,85 @@ +/* + + This Source Code Form is subject to the terms of the Mozilla Public + License, v. 2.0. If a copy of the MPL was not distributed with this file, + You can obtain one at http://mozilla.org/MPL/2.0/. + + Copyright (c) 2013, Armon Dadgar armon.dadgar@gmail.com + Copyright (c) 2013, Mitchell Hashimoto mitchell.hashimoto@gmail.com + + Alternatively, the contents of this file may be used under the terms + of the GNU General Public License Version 3 or later, as described below: + + This file is free software: you may copy, redistribute and/or modify + it under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + This file is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General + Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see http://www.gnu.org/licenses/. + +*/ + +package util + +import ( + "sync/atomic" +) + +// LamportClock is a thread safe implementation of a lamport clock. It +// uses efficient atomic operations for all of its functions, falling back +// to a heavy lock only if there are enough CAS failures. +type LamportClock struct { + counter uint64 +} + +// LamportTime is the value of a LamportClock. +type LamportTime uint64 + +func NewLamportClock() LamportClock { + return LamportClock{ + counter: 1, + } +} + +func NewLamportClockWithTime(time uint64) LamportClock { + return LamportClock{ + counter: time, + } +} + +// Time is used to return the current value of the lamport clock +func (l *LamportClock) Time() LamportTime { + return LamportTime(atomic.LoadUint64(&l.counter)) +} + +// Increment is used to increment and return the value of the lamport clock +func (l *LamportClock) Increment() LamportTime { + return LamportTime(atomic.AddUint64(&l.counter, 1)) +} + +// Witness is called to update our local clock if necessary after +// witnessing a clock value received from another process +func (l *LamportClock) Witness(v LamportTime) { +WITNESS: + // If the other value is old, we do not need to do anything + cur := atomic.LoadUint64(&l.counter) + other := uint64(v) + if other < cur { + return + } + + // Ensure that our local clock is at least one ahead. + if !atomic.CompareAndSwapUint64(&l.counter, cur, other+1) { + // CAS: CompareAndSwap + // The CAS failed, so we just retry. Eventually our CAS should + // succeed or a future witness will pass us by and our witness + // will end. + goto WITNESS + } +} diff --git a/util/lamport_test.go b/util/lamport_test.go new file mode 100644 index 00000000..0bac7d51 --- /dev/null +++ b/util/lamport_test.go @@ -0,0 +1,66 @@ +/* + + This Source Code Form is subject to the terms of the Mozilla Public + License, v. 2.0. If a copy of the MPL was not distributed with this file, + You can obtain one at http://mozilla.org/MPL/2.0/. + + Copyright (c) 2013, Armon Dadgar armon.dadgar@gmail.com + Copyright (c) 2013, Mitchell Hashimoto mitchell.hashimoto@gmail.com + + Alternatively, the contents of this file may be used under the terms + of the GNU General Public License Version 3 or later, as described below: + + This file is free software: you may copy, redistribute and/or modify + it under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + This file is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General + Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see http://www.gnu.org/licenses/. + +*/ + +package util + +import ( + "testing" +) + +func TestLamportClock(t *testing.T) { + l := &LamportClock{} + + if l.Time() != 0 { + t.Fatalf("bad time value") + } + + if l.Increment() != 1 { + t.Fatalf("bad time value") + } + + if l.Time() != 1 { + t.Fatalf("bad time value") + } + + l.Witness(41) + + if l.Time() != 42 { + t.Fatalf("bad time value") + } + + l.Witness(41) + + if l.Time() != 42 { + t.Fatalf("bad time value") + } + + l.Witness(30) + + if l.Time() != 42 { + t.Fatalf("bad time value") + } +} diff --git a/util/persisted_lamport.go b/util/persisted_lamport.go new file mode 100644 index 00000000..43a2863c --- /dev/null +++ b/util/persisted_lamport.go @@ -0,0 +1,81 @@ +package util + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" +) + +type PersistedLamport struct { + LamportClock + filePath string +} + +func NewPersistedLamport(filePath string) *PersistedLamport { + clock := &PersistedLamport{ + filePath: filePath, + } + return clock +} + +func LoadPersistedLamport(filePath string) (*PersistedLamport, error) { + clock := &PersistedLamport{ + filePath: filePath, + } + + err := clock.read() + if err != nil { + return nil, err + } + + return clock, nil +} + +func (c *PersistedLamport) Witness(time LamportTime) error { + c.LamportClock.Witness(time) + return c.Write() +} + +func (c *PersistedLamport) Time() LamportTime { + // Equivalent to: + // + // res = c.LamportClock.Time() + // bugClock.Increment() + // + // ... but thread safe + return c.Increment() - 1 +} + +func (c *PersistedLamport) read() error { + content, err := ioutil.ReadFile(c.filePath) + if err != nil { + return err + } + + var value uint64 + n, err := fmt.Sscanf(string(content), "%d", &value) + + if err != nil { + return err + } + + if n != 1 { + return fmt.Errorf("could not read the clock") + } + + c.LamportClock = NewLamportClockWithTime(value) + + return nil +} + +func (c *PersistedLamport) Write() error { + dir := filepath.Dir(c.filePath) + err := os.MkdirAll(dir, 0777) + if err != nil { + return err + } + + data := []byte(fmt.Sprintf("%d", c.LamportClock.Time())) + return ioutil.WriteFile(c.filePath, data, 0644) +} |