aboutsummaryrefslogtreecommitdiffstats
path: root/plumbing
diff options
context:
space:
mode:
authorMáximo Cuadros <mcuadros@gmail.com>2018-08-17 08:46:19 +0200
committerGitHub <noreply@github.com>2018-08-17 08:46:19 +0200
commitba0f659cbf9982846de731cca426ce2498601130 (patch)
treed4816a29f6460de9d481861cb36c4248212c6270 /plumbing
parenta28c2ce44695f13ddf28748958f236afd8e0b544 (diff)
parenteb2aa9b2c3bf7af93fd261228be1b96e61c52bcf (diff)
downloadgo-git-ba0f659cbf9982846de731cca426ce2498601130.tar.gz
Merge pull request #916 from jfontan/improvement/memory-consumption-new-packfile-parser
Improvement/memory consumption new packfile parser
Diffstat (limited to 'plumbing')
-rw-r--r--plumbing/cache/buffer_lru.go98
-rw-r--r--plumbing/cache/buffer_test.go128
-rw-r--r--plumbing/cache/common.go13
-rw-r--r--plumbing/format/packfile/parser.go90
4 files changed, 276 insertions, 53 deletions
diff --git a/plumbing/cache/buffer_lru.go b/plumbing/cache/buffer_lru.go
new file mode 100644
index 0000000..f2c0f90
--- /dev/null
+++ b/plumbing/cache/buffer_lru.go
@@ -0,0 +1,98 @@
+package cache
+
+import (
+ "container/list"
+ "sync"
+)
+
+// BufferLRU implements an object cache with an LRU eviction policy and a
+// maximum size (measured in object size).
+type BufferLRU struct {
+ MaxSize FileSize
+
+ actualSize FileSize
+ ll *list.List
+ cache map[int64]*list.Element
+ mut sync.Mutex
+}
+
+// NewBufferLRU creates a new BufferLRU with the given maximum size. The maximum
+// size will never be exceeded.
+func NewBufferLRU(maxSize FileSize) *BufferLRU {
+ return &BufferLRU{MaxSize: maxSize}
+}
+
+// NewBufferLRUDefault creates a new BufferLRU with the default cache size.
+func NewBufferLRUDefault() *BufferLRU {
+ return &BufferLRU{MaxSize: DefaultMaxSize}
+}
+
+type buffer struct {
+ Key int64
+ Slice []byte
+}
+
+// Put puts a buffer into the cache. If the buffer is already in the cache, it
+// will be marked as used. Otherwise, it will be inserted. A buffers might
+// be evicted to make room for the new one.
+func (c *BufferLRU) Put(key int64, slice []byte) {
+ c.mut.Lock()
+ defer c.mut.Unlock()
+
+ if c.cache == nil {
+ c.actualSize = 0
+ c.cache = make(map[int64]*list.Element, 1000)
+ c.ll = list.New()
+ }
+
+ if ee, ok := c.cache[key]; ok {
+ c.ll.MoveToFront(ee)
+ ee.Value = buffer{key, slice}
+ return
+ }
+
+ objSize := FileSize(len(slice))
+
+ if objSize > c.MaxSize {
+ return
+ }
+
+ for c.actualSize+objSize > c.MaxSize {
+ last := c.ll.Back()
+ lastObj := last.Value.(buffer)
+ lastSize := FileSize(len(lastObj.Slice))
+
+ c.ll.Remove(last)
+ delete(c.cache, lastObj.Key)
+ c.actualSize -= lastSize
+ }
+
+ ee := c.ll.PushFront(buffer{key, slice})
+ c.cache[key] = ee
+ c.actualSize += objSize
+}
+
+// Get returns a buffer by its key. It marks the buffer as used. If the buffer
+// is not in the cache, (nil, false) will be returned.
+func (c *BufferLRU) Get(key int64) ([]byte, bool) {
+ c.mut.Lock()
+ defer c.mut.Unlock()
+
+ ee, ok := c.cache[key]
+ if !ok {
+ return nil, false
+ }
+
+ c.ll.MoveToFront(ee)
+ return ee.Value.(buffer).Slice, true
+}
+
+// Clear the content of this buffer cache.
+func (c *BufferLRU) Clear() {
+ c.mut.Lock()
+ defer c.mut.Unlock()
+
+ c.ll = nil
+ c.cache = nil
+ c.actualSize = 0
+}
diff --git a/plumbing/cache/buffer_test.go b/plumbing/cache/buffer_test.go
new file mode 100644
index 0000000..262138a
--- /dev/null
+++ b/plumbing/cache/buffer_test.go
@@ -0,0 +1,128 @@
+package cache
+
+import (
+ "sync"
+
+ . "gopkg.in/check.v1"
+)
+
+type BufferSuite struct {
+ c map[string]Buffer
+ aBuffer []byte
+ bBuffer []byte
+ cBuffer []byte
+ dBuffer []byte
+ eBuffer []byte
+}
+
+var _ = Suite(&BufferSuite{})
+
+func (s *BufferSuite) SetUpTest(c *C) {
+ s.aBuffer = []byte("a")
+ s.bBuffer = []byte("bbb")
+ s.cBuffer = []byte("c")
+ s.dBuffer = []byte("d")
+ s.eBuffer = []byte("ee")
+
+ s.c = make(map[string]Buffer)
+ s.c["two_bytes"] = NewBufferLRU(2 * Byte)
+ s.c["default_lru"] = NewBufferLRUDefault()
+}
+
+func (s *BufferSuite) TestPutSameBuffer(c *C) {
+ for _, o := range s.c {
+ o.Put(1, s.aBuffer)
+ o.Put(1, s.aBuffer)
+ _, ok := o.Get(1)
+ c.Assert(ok, Equals, true)
+ }
+}
+
+func (s *BufferSuite) TestPutBigBuffer(c *C) {
+ for _, o := range s.c {
+ o.Put(1, s.bBuffer)
+ _, ok := o.Get(2)
+ c.Assert(ok, Equals, false)
+ }
+}
+
+func (s *BufferSuite) TestPutCacheOverflow(c *C) {
+ // this test only works with an specific size
+ o := s.c["two_bytes"]
+
+ o.Put(1, s.aBuffer)
+ o.Put(2, s.cBuffer)
+ o.Put(3, s.dBuffer)
+
+ obj, ok := o.Get(1)
+ c.Assert(ok, Equals, false)
+ c.Assert(obj, IsNil)
+ obj, ok = o.Get(2)
+ c.Assert(ok, Equals, true)
+ c.Assert(obj, NotNil)
+ obj, ok = o.Get(3)
+ c.Assert(ok, Equals, true)
+ c.Assert(obj, NotNil)
+}
+
+func (s *BufferSuite) TestEvictMultipleBuffers(c *C) {
+ o := s.c["two_bytes"]
+
+ o.Put(1, s.cBuffer)
+ o.Put(2, s.dBuffer) // now cache is full with two objects
+ o.Put(3, s.eBuffer) // this put should evict all previous objects
+
+ obj, ok := o.Get(1)
+ c.Assert(ok, Equals, false)
+ c.Assert(obj, IsNil)
+ obj, ok = o.Get(2)
+ c.Assert(ok, Equals, false)
+ c.Assert(obj, IsNil)
+ obj, ok = o.Get(3)
+ c.Assert(ok, Equals, true)
+ c.Assert(obj, NotNil)
+}
+
+func (s *BufferSuite) TestClear(c *C) {
+ for _, o := range s.c {
+ o.Put(1, s.aBuffer)
+ o.Clear()
+ obj, ok := o.Get(1)
+ c.Assert(ok, Equals, false)
+ c.Assert(obj, IsNil)
+ }
+}
+
+func (s *BufferSuite) TestConcurrentAccess(c *C) {
+ for _, o := range s.c {
+ var wg sync.WaitGroup
+
+ for i := 0; i < 1000; i++ {
+ wg.Add(3)
+ go func(i int) {
+ o.Put(int64(i), []byte{00})
+ wg.Done()
+ }(i)
+
+ go func(i int) {
+ if i%30 == 0 {
+ o.Clear()
+ }
+ wg.Done()
+ }(i)
+
+ go func(i int) {
+ o.Get(int64(i))
+ wg.Done()
+ }(i)
+ }
+
+ wg.Wait()
+ }
+}
+
+func (s *BufferSuite) TestDefaultLRU(c *C) {
+ defaultLRU := s.c["default_lru"].(*BufferLRU)
+
+ c.Assert(defaultLRU.MaxSize, Equals, DefaultMaxSize)
+}
diff --git a/plumbing/cache/common.go b/plumbing/cache/common.go
index e77baf0..2b7f36a 100644
--- a/plumbing/cache/common.go
+++ b/plumbing/cache/common.go
@@ -24,3 +24,16 @@ type Object interface {
// Clear clears every object from the cache.
Clear()
}
+
+// Buffer is an interface to a buffer cache.
+type Buffer interface {
+ // Put puts a buffer into the cache. If the buffer is already in the cache,
+ // it will be marked as used. Otherwise, it will be inserted. Buffer might
+ // be evicted to make room for the new one.
+ Put(key int64, slice []byte)
+ // Get returns a buffer by its key. It marks the buffer as used. If the
+ // buffer is not in the cache, (nil, false) will be returned.
+ Get(key int64) ([]byte, bool)
+ // Clear clears every object from the cache.
+ Clear()
+}
diff --git a/plumbing/format/packfile/parser.go b/plumbing/format/packfile/parser.go
index 581c334..28582b5 100644
--- a/plumbing/format/packfile/parser.go
+++ b/plumbing/format/packfile/parser.go
@@ -48,7 +48,7 @@ type Parser struct {
pendingRefDeltas map[plumbing.Hash][]*objectInfo
checksum plumbing.Hash
- cache *cache.ObjectLRU
+ cache *cache.BufferLRU
// delta content by offset, only used if source is not seekable
deltas map[int64][]byte
@@ -82,7 +82,7 @@ func NewParserWithStorage(
scanner: scanner,
ob: ob,
count: 0,
- cache: cache.NewObjectLRUDefault(),
+ cache: cache.NewBufferLRUDefault(),
pendingRefDeltas: make(map[plumbing.Hash][]*objectInfo),
deltas: deltas,
}, nil
@@ -221,21 +221,22 @@ func (p *Parser) indexObjects() error {
ota = newBaseObject(oh.Offset, oh.Length, t)
}
- size, crc, err := p.scanner.NextObject(buf)
+ _, crc, err := p.scanner.NextObject(buf)
if err != nil {
return err
}
ota.Crc32 = crc
- ota.PackSize = size
ota.Length = oh.Length
data := buf.Bytes()
if !delta {
- if _, err := ota.Write(data); err != nil {
+ sha1, err := getSHA1(ota.Type, data)
+ if err != nil {
return err
}
- ota.SHA1 = ota.Sum()
+
+ ota.SHA1 = sha1
p.oiByHash[ota.SHA1] = ota
}
@@ -291,41 +292,35 @@ func (p *Parser) resolveDeltas() error {
delete(p.deltas, obj.Offset)
}
}
-
- obj.Content = nil
}
return nil
}
func (p *Parser) get(o *objectInfo) ([]byte, error) {
- if len(o.Content) > 0 {
- return o.Content, nil
- }
-
- e, ok := p.cache.Get(o.SHA1)
+ b, ok := p.cache.Get(o.Offset)
// If it's not on the cache and is not a delta we can try to find it in the
// storage, if there's one.
if !ok && p.storage != nil && !o.Type.IsDelta() {
var err error
- e, err = p.storage.EncodedObject(plumbing.AnyObject, o.SHA1)
+ e, err := p.storage.EncodedObject(plumbing.AnyObject, o.SHA1)
if err != nil {
return nil, err
}
- }
- if e != nil {
r, err := e.Reader()
if err != nil {
return nil, err
}
- buf := make([]byte, e.Size())
- if _, err = r.Read(buf); err != nil {
+ b = make([]byte, e.Size())
+ if _, err = r.Read(b); err != nil {
return nil, err
}
+ }
- return buf, nil
+ if b != nil {
+ return b, nil
}
var data []byte
@@ -348,11 +343,7 @@ func (p *Parser) get(o *objectInfo) ([]byte, error) {
}
if len(o.Children) > 0 {
- m := &plumbing.MemoryObject{}
- m.Write(data)
- m.SetType(o.Type)
- m.SetSize(o.Size())
- p.cache.Put(m)
+ p.cache.Put(o.Offset, data)
}
return data, nil
@@ -410,8 +401,6 @@ func (p *Parser) readData(o *objectInfo) ([]byte, error) {
return data, nil
}
- // TODO: skip header. Header size can be calculated with the offset of the
- // next offset in the first pass.
if _, err := p.scanner.SeekFromStart(o.Offset); err != nil {
return nil, err
}
@@ -434,34 +423,40 @@ func applyPatchBase(ota *objectInfo, data, base []byte) ([]byte, error) {
return nil, err
}
- ota.Type = ota.Parent.Type
- ota.Hasher = plumbing.NewHasher(ota.Type, int64(len(patched)))
- if _, err := ota.Write(patched); err != nil {
- return nil, err
+ if ota.SHA1 == plumbing.ZeroHash {
+ ota.Type = ota.Parent.Type
+ sha1, err := getSHA1(ota.Type, patched)
+ if err != nil {
+ return nil, err
+ }
+
+ ota.SHA1 = sha1
+ ota.Length = int64(len(patched))
}
- ota.SHA1 = ota.Sum()
- ota.Length = int64(len(patched))
return patched, nil
}
-type objectInfo struct {
- plumbing.Hasher
+func getSHA1(t plumbing.ObjectType, data []byte) (plumbing.Hash, error) {
+ hasher := plumbing.NewHasher(t, int64(len(data)))
+ if _, err := hasher.Write(data); err != nil {
+ return plumbing.ZeroHash, err
+ }
+
+ return hasher.Sum(), nil
+}
- Offset int64
- Length int64
- HeaderLength int64
- PackSize int64
- Type plumbing.ObjectType
- DiskType plumbing.ObjectType
+type objectInfo struct {
+ Offset int64
+ Length int64
+ Type plumbing.ObjectType
+ DiskType plumbing.ObjectType
Crc32 uint32
Parent *objectInfo
Children []*objectInfo
SHA1 plumbing.Hash
-
- Content []byte
}
func newBaseObject(offset, length int64, t plumbing.ObjectType) *objectInfo {
@@ -473,29 +468,18 @@ func newDeltaObject(
t plumbing.ObjectType,
parent *objectInfo,
) *objectInfo {
- children := make([]*objectInfo, 0)
-
obj := &objectInfo{
- Hasher: plumbing.NewHasher(t, length),
Offset: offset,
Length: length,
- PackSize: 0,
Type: t,
DiskType: t,
Crc32: 0,
Parent: parent,
- Children: children,
}
return obj
}
-func (o *objectInfo) Write(b []byte) (int, error) {
- o.Content = make([]byte, len(b))
- copy(o.Content, b)
- return o.Hasher.Write(b)
-}
-
func (o *objectInfo) IsDelta() bool {
return o.Type.IsDelta()
}