diff options
Diffstat (limited to 'util/lamport')
-rw-r--r-- | util/lamport/clock.go | 15 | ||||
-rw-r--r-- | util/lamport/clock_testing.go | 28 | ||||
-rw-r--r-- | util/lamport/lamport_test.go | 66 | ||||
-rw-r--r-- | util/lamport/mem_clock.go (renamed from util/lamport/lamport.go) | 39 | ||||
-rw-r--r-- | util/lamport/mem_clock_test.go | 8 | ||||
-rw-r--r-- | util/lamport/persisted_clock.go | 100 | ||||
-rw-r--r-- | util/lamport/persisted_clock_test.go | 19 | ||||
-rw-r--r-- | util/lamport/persisted_lamport.go | 84 |
8 files changed, 190 insertions, 169 deletions
diff --git a/util/lamport/clock.go b/util/lamport/clock.go new file mode 100644 index 00000000..53b0ac7a --- /dev/null +++ b/util/lamport/clock.go @@ -0,0 +1,15 @@ +package lamport + +// Time is the value of a Clock. +type Time uint64 + +// Clock is a Lamport logical clock +type Clock interface { + // Time is used to return the current value of the lamport clock + Time() Time + // Increment is used to return the value of the lamport clock and increment it afterwards + Increment() (Time, error) + // Witness is called to update our local clock if necessary after + // witnessing a clock value received from another process + Witness(time Time) error +} diff --git a/util/lamport/clock_testing.go b/util/lamport/clock_testing.go new file mode 100644 index 00000000..fc59afb2 --- /dev/null +++ b/util/lamport/clock_testing.go @@ -0,0 +1,28 @@ +package lamport + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func testClock(t *testing.T, c Clock) { + assert.Equal(t, Time(1), c.Time()) + + val, err := c.Increment() + assert.NoError(t, err) + assert.Equal(t, Time(1), val) + assert.Equal(t, Time(2), c.Time()) + + err = c.Witness(41) + assert.NoError(t, err) + assert.Equal(t, Time(42), c.Time()) + + err = c.Witness(41) + assert.NoError(t, err) + assert.Equal(t, Time(42), c.Time()) + + err = c.Witness(30) + assert.NoError(t, err) + assert.Equal(t, Time(42), c.Time()) +} diff --git a/util/lamport/lamport_test.go b/util/lamport/lamport_test.go deleted file mode 100644 index c650fe6a..00000000 --- a/util/lamport/lamport_test.go +++ /dev/null @@ -1,66 +0,0 @@ -/* - - This Source Code Form is subject to the terms of the Mozilla Public - License, v. 2.0. If a copy of the MPL was not distributed with this file, - You can obtain one at http://mozilla.org/MPL/2.0/. - - Copyright (c) 2013, Armon Dadgar armon.dadgar@gmail.com - Copyright (c) 2013, Mitchell Hashimoto mitchell.hashimoto@gmail.com - - Alternatively, the contents of this file may be used under the terms - of the GNU General Public License Version 3 or later, as described below: - - This file is free software: you may copy, redistribute and/or modify - it under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - This file is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General - Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see http://www.gnu.org/licenses/. - -*/ - -package lamport - -import ( - "testing" -) - -func TestClock(t *testing.T) { - l := &Clock{} - - if l.Time() != 0 { - t.Fatalf("bad time value") - } - - if l.Increment() != 0 { - t.Fatalf("bad time value") - } - - if l.Time() != 1 { - t.Fatalf("bad time value") - } - - l.Witness(41) - - if l.Time() != 42 { - t.Fatalf("bad time value") - } - - l.Witness(41) - - if l.Time() != 42 { - t.Fatalf("bad time value") - } - - l.Witness(30) - - if l.Time() != 42 { - t.Fatalf("bad time value") - } -} diff --git a/util/lamport/lamport.go b/util/lamport/mem_clock.go index 0372bb6f..ce6f2d4d 100644 --- a/util/lamport/lamport.go +++ b/util/lamport/mem_clock.go @@ -31,58 +31,59 @@ import ( "sync/atomic" ) -// Clock is a thread safe implementation of a lamport clock. It +var _ Clock = &MemClock{} + +// MemClock is a thread safe implementation of a lamport clock. It // uses efficient atomic operations for all of its functions, falling back // to a heavy lock only if there are enough CAS failures. -type Clock struct { +type MemClock struct { counter uint64 } -// Time is the value of a Clock. -type Time uint64 - -// NewClock create a new clock with the value 1. +// NewMemClock create a new clock with the value 1. // Value 0 is considered as invalid. -func NewClock() Clock { - return Clock{ +func NewMemClock() *MemClock { + return &MemClock{ counter: 1, } } -// NewClockWithTime create a new clock with a value. -func NewClockWithTime(time uint64) Clock { - return Clock{ +// NewMemClockWithTime create a new clock with a value. +func NewMemClockWithTime(time uint64) *MemClock { + return &MemClock{ counter: time, } } // Time is used to return the current value of the lamport clock -func (l *Clock) Time() Time { - return Time(atomic.LoadUint64(&l.counter)) +func (mc *MemClock) Time() Time { + return Time(atomic.LoadUint64(&mc.counter)) } // Increment is used to return the value of the lamport clock and increment it afterwards -func (l *Clock) Increment() Time { - return Time(atomic.AddUint64(&l.counter, 1) - 1) +func (mc *MemClock) Increment() (Time, error) { + return Time(atomic.AddUint64(&mc.counter, 1) - 1), nil } // Witness is called to update our local clock if necessary after // witnessing a clock value received from another process -func (l *Clock) Witness(v Time) { +func (mc *MemClock) Witness(v Time) error { WITNESS: // If the other value is old, we do not need to do anything - cur := atomic.LoadUint64(&l.counter) + cur := atomic.LoadUint64(&mc.counter) other := uint64(v) if other < cur { - return + return nil } // Ensure that our local clock is at least one ahead. - if !atomic.CompareAndSwapUint64(&l.counter, cur, other+1) { + if !atomic.CompareAndSwapUint64(&mc.counter, cur, other+1) { // CAS: CompareAndSwap // The CAS failed, so we just retry. Eventually our CAS should // succeed or a future witness will pass us by and our witness // will end. goto WITNESS } + + return nil } diff --git a/util/lamport/mem_clock_test.go b/util/lamport/mem_clock_test.go new file mode 100644 index 00000000..e01d2ec0 --- /dev/null +++ b/util/lamport/mem_clock_test.go @@ -0,0 +1,8 @@ +package lamport + +import "testing" + +func TestMemClock(t *testing.T) { + c := NewMemClock() + testClock(t, c) +} diff --git a/util/lamport/persisted_clock.go b/util/lamport/persisted_clock.go new file mode 100644 index 00000000..e70b01ef --- /dev/null +++ b/util/lamport/persisted_clock.go @@ -0,0 +1,100 @@ +package lamport + +import ( + "errors" + "fmt" + "io/ioutil" + "os" + "path/filepath" +) + +var ErrClockNotExist = errors.New("clock doesn't exist") + +type PersistedClock struct { + *MemClock + filePath string +} + +// NewPersistedClock create a new persisted Lamport clock +func NewPersistedClock(filePath string) (*PersistedClock, error) { + clock := &PersistedClock{ + MemClock: NewMemClock(), + filePath: filePath, + } + + dir := filepath.Dir(filePath) + err := os.MkdirAll(dir, 0777) + if err != nil { + return nil, err + } + + err = clock.Write() + if err != nil { + return nil, err + } + + return clock, nil +} + +// LoadPersistedClock load a persisted Lamport clock from a file +func LoadPersistedClock(filePath string) (*PersistedClock, error) { + clock := &PersistedClock{ + filePath: filePath, + } + + err := clock.read() + if err != nil { + return nil, err + } + + return clock, nil +} + +// Increment is used to return the value of the lamport clock and increment it afterwards +func (pc *PersistedClock) Increment() (Time, error) { + time, err := pc.MemClock.Increment() + if err != nil { + return 0, err + } + return time, pc.Write() +} + +// Witness is called to update our local clock if necessary after +// witnessing a clock value received from another process +func (pc *PersistedClock) Witness(time Time) error { + // TODO: rework so that we write only when the clock was actually updated + err := pc.MemClock.Witness(time) + if err != nil { + return err + } + return pc.Write() +} + +func (pc *PersistedClock) read() error { + content, err := ioutil.ReadFile(pc.filePath) + if os.IsNotExist(err) { + return ErrClockNotExist + } + if err != nil { + return err + } + + var value uint64 + n, err := fmt.Sscanf(string(content), "%d", &value) + if err != nil { + return err + } + + if n != 1 { + return fmt.Errorf("could not read the clock") + } + + pc.MemClock = NewMemClockWithTime(value) + + return nil +} + +func (pc *PersistedClock) Write() error { + data := []byte(fmt.Sprintf("%d", pc.counter)) + return ioutil.WriteFile(pc.filePath, data, 0644) +} diff --git a/util/lamport/persisted_clock_test.go b/util/lamport/persisted_clock_test.go new file mode 100644 index 00000000..aacec3bf --- /dev/null +++ b/util/lamport/persisted_clock_test.go @@ -0,0 +1,19 @@ +package lamport + +import ( + "io/ioutil" + "path" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestPersistedClock(t *testing.T) { + dir, err := ioutil.TempDir("", "") + require.NoError(t, err) + + c, err := NewPersistedClock(path.Join(dir, "test-clock")) + require.NoError(t, err) + + testClock(t, c) +} diff --git a/util/lamport/persisted_lamport.go b/util/lamport/persisted_lamport.go deleted file mode 100644 index ab4b93b1..00000000 --- a/util/lamport/persisted_lamport.go +++ /dev/null @@ -1,84 +0,0 @@ -package lamport - -import ( - "fmt" - "io/ioutil" - "os" - "path/filepath" -) - -type Persisted struct { - Clock - filePath string -} - -// NewPersisted create a new persisted Lamport clock -func NewPersisted(filePath string) (*Persisted, error) { - clock := &Persisted{ - Clock: NewClock(), - filePath: filePath, - } - - dir := filepath.Dir(filePath) - err := os.MkdirAll(dir, 0777) - if err != nil { - return nil, err - } - - return clock, nil -} - -// LoadPersisted load a persisted Lamport clock from a file -func LoadPersisted(filePath string) (*Persisted, error) { - clock := &Persisted{ - filePath: filePath, - } - - err := clock.read() - if err != nil { - return nil, err - } - - return clock, nil -} - -// Increment is used to return the value of the lamport clock and increment it afterwards -func (c *Persisted) Increment() (Time, error) { - time := c.Clock.Increment() - return time, c.Write() -} - -// Witness is called to update our local clock if necessary after -// witnessing a clock value received from another process -func (c *Persisted) Witness(time Time) error { - // TODO: rework so that we write only when the clock was actually updated - c.Clock.Witness(time) - return c.Write() -} - -func (c *Persisted) read() error { - content, err := ioutil.ReadFile(c.filePath) - if err != nil { - return err - } - - var value uint64 - n, err := fmt.Sscanf(string(content), "%d", &value) - - if err != nil { - return err - } - - if n != 1 { - return fmt.Errorf("could not read the clock") - } - - c.Clock = NewClockWithTime(value) - - return nil -} - -func (c *Persisted) Write() error { - data := []byte(fmt.Sprintf("%d", c.counter)) - return ioutil.WriteFile(c.filePath, data, 0644) -} |