From ccd0fa0bc17f0680038529b00f5c5a44f8e77b41 Mon Sep 17 00:00:00 2001 From: Miguel Molina Date: Fri, 27 Jul 2018 15:07:25 +0200 Subject: plumbing: packfile, lazy object reads with DiskObjects Signed-off-by: Miguel Molina --- plumbing/format/idxfile/idxfile.go | 25 ++-- plumbing/format/packfile/decoder.go | 2 +- plumbing/format/packfile/disk_object.go | 64 +++++++++ plumbing/format/packfile/packfile.go | 208 ++++++++++++++++++++++++++---- plumbing/format/packfile/packfile_test.go | 46 +++++++ storage/memory/storage.go | 10 ++ 6 files changed, 314 insertions(+), 41 deletions(-) create mode 100644 plumbing/format/packfile/disk_object.go diff --git a/plumbing/format/idxfile/idxfile.go b/plumbing/format/idxfile/idxfile.go index f8debb1..d4a9365 100644 --- a/plumbing/format/idxfile/idxfile.go +++ b/plumbing/format/idxfile/idxfile.go @@ -87,7 +87,7 @@ func (idx *MemoryIndex) findHashIndex(h plumbing.Hash) int { low = mid + 1 } - if low > high { + if low >= high { break } } @@ -157,9 +157,8 @@ func (idx *MemoryIndex) getCrc32(firstLevel, secondLevel int) (uint32, error) { func (idx *MemoryIndex) FindHash(o int64) (plumbing.Hash, error) { // Lazily generate the reverse offset/hash map if required. if idx.offsetHash == nil { - err := idx.genOffsetHash() - if err != nil { - return plumbing.ZeroHash, nil + if err := idx.genOffsetHash(); err != nil { + return plumbing.ZeroHash, err } } @@ -185,19 +184,17 @@ func (idx *MemoryIndex) genOffsetHash() error { return err } - var entry *Entry - for err != nil { - entry, err = iter.Next() - if err == nil { - idx.offsetHash[int64(entry.Offset)] = entry.Hash + for { + entry, err := iter.Next() + if err != nil { + if err == io.EOF { + return nil + } + return err } - } - if err == io.EOF { - return nil + idx.offsetHash[int64(entry.Offset)] = entry.Hash } - - return err } // Count implements the Index interface. diff --git a/plumbing/format/packfile/decoder.go b/plumbing/format/packfile/decoder.go index b1a0a26..edf386b 100644 --- a/plumbing/format/packfile/decoder.go +++ b/plumbing/format/packfile/decoder.go @@ -478,7 +478,7 @@ func (d *Decoder) recallByOffset(o int64) (plumbing.EncodedObject, error) { func (d *Decoder) recallByHash(h plumbing.Hash) (plumbing.EncodedObject, error) { if d.s.IsSeekable { - if offset, err := d.idx.FindOffset(h); err != nil { + if offset, err := d.idx.FindOffset(h); err == nil { return d.DecodeObjectAt(offset) } } diff --git a/plumbing/format/packfile/disk_object.go b/plumbing/format/packfile/disk_object.go new file mode 100644 index 0000000..d3e8520 --- /dev/null +++ b/plumbing/format/packfile/disk_object.go @@ -0,0 +1,64 @@ +package packfile + +import ( + "io" + + "gopkg.in/src-d/go-git.v4/plumbing" +) + +// DiskObject is an object from the packfile on disk. +type DiskObject struct { + hash plumbing.Hash + h *ObjectHeader + offset int64 + size int64 + typ plumbing.ObjectType + packfile *Packfile +} + +// NewDiskObject creates a new disk object. +func NewDiskObject( + hash plumbing.Hash, + finalType plumbing.ObjectType, + offset int64, + contentSize int64, + packfile *Packfile, +) *DiskObject { + return &DiskObject{ + hash: hash, + offset: offset, + size: contentSize, + typ: finalType, + packfile: packfile, + } +} + +// Reader implements the plumbing.EncodedObject interface. +func (o *DiskObject) Reader() (io.ReadCloser, error) { + return o.packfile.getObjectContent(o.offset) +} + +// SetSize implements the plumbing.EncodedObject interface. This method +// is a noop. +func (o *DiskObject) SetSize(int64) {} + +// SetType implements the plumbing.EncodedObject interface. This method is +// a noop. +func (o *DiskObject) SetType(plumbing.ObjectType) {} + +// Hash implements the plumbing.EncodedObject interface. +func (o *DiskObject) Hash() plumbing.Hash { return o.hash } + +// Size implements the plumbing.EncodedObject interface. +func (o *DiskObject) Size() int64 { return o.size } + +// Type implements the plumbing.EncodedObject interface. +func (o *DiskObject) Type() plumbing.ObjectType { + return o.typ +} + +// Writer implements the plumbing.EncodedObject interface. This method always +// returns a nil writer. +func (o *DiskObject) Writer() (io.WriteCloser, error) { + return nil, nil +} diff --git a/plumbing/format/packfile/packfile.go b/plumbing/format/packfile/packfile.go index cee6031..00014f6 100644 --- a/plumbing/format/packfile/packfile.go +++ b/plumbing/format/packfile/packfile.go @@ -17,7 +17,7 @@ type Packfile struct { billy.File s *Scanner deltaBaseCache cache.Object - offsetToHash map[int64]plumbing.Hash + offsetToType map[int64]plumbing.ObjectType } // NewPackfile returns a packfile representation for the given packfile file @@ -30,7 +30,7 @@ func NewPackfile(index idxfile.Index, file billy.File) *Packfile { file, s, cache.NewObjectLRUDefault(), - make(map[int64]plumbing.Hash), + make(map[int64]plumbing.ObjectType), } } @@ -47,8 +47,9 @@ func (p *Packfile) Get(h plumbing.Hash) (plumbing.EncodedObject, error) { // GetByOffset retrieves the encoded object from the packfile with the given // offset. func (p *Packfile) GetByOffset(o int64) (plumbing.EncodedObject, error) { - if h, ok := p.offsetToHash[o]; ok { - if obj, ok := p.deltaBaseCache.Get(h); ok { + hash, err := p.FindHash(o) + if err == nil { + if obj, ok := p.deltaBaseCache.Get(hash); ok { return obj, nil } } @@ -60,13 +61,166 @@ func (p *Packfile) GetByOffset(o int64) (plumbing.EncodedObject, error) { return p.nextObject() } -func (p *Packfile) nextObject() (plumbing.EncodedObject, error) { +func (p *Packfile) nextObjectHeader() (*ObjectHeader, error) { h, err := p.s.NextObjectHeader() + p.s.pendingObject = nil + return h, err +} + +func (p *Packfile) getObjectData( + h *ObjectHeader, +) (typ plumbing.ObjectType, size int64, err error) { + switch h.Type { + case plumbing.CommitObject, plumbing.TreeObject, plumbing.BlobObject, plumbing.TagObject: + typ = h.Type + size = h.Length + case plumbing.REFDeltaObject, plumbing.OFSDeltaObject: + buf := bufPool.Get().(*bytes.Buffer) + buf.Reset() + defer bufPool.Put(buf) + + _, _, err = p.s.NextObject(buf) + if err != nil { + return + } + + delta := buf.Bytes() + _, delta = decodeLEB128(delta) // skip src size + sz, _ := decodeLEB128(delta) + size = int64(sz) + + var offset int64 + if h.Type == plumbing.REFDeltaObject { + offset, err = p.FindOffset(h.Reference) + if err != nil { + return + } + } else { + offset = h.OffsetReference + } + + if baseType, ok := p.offsetToType[offset]; ok { + typ = baseType + } else { + if _, err = p.s.SeekFromStart(offset); err != nil { + return + } + + h, err = p.nextObjectHeader() + if err != nil { + return + } + + typ, _, err = p.getObjectData(h) + if err != nil { + return + } + } + default: + err = ErrInvalidObject.AddDetails("type %q", h.Type) + } + + return +} + +func (p *Packfile) getObjectSize(h *ObjectHeader) (int64, error) { + switch h.Type { + case plumbing.CommitObject, plumbing.TreeObject, plumbing.BlobObject, plumbing.TagObject: + return h.Length, nil + case plumbing.REFDeltaObject, plumbing.OFSDeltaObject: + buf := bufPool.Get().(*bytes.Buffer) + buf.Reset() + defer bufPool.Put(buf) + + if _, _, err := p.s.NextObject(buf); err != nil { + return 0, err + } + + delta := buf.Bytes() + _, delta = decodeLEB128(delta) // skip src size + sz, _ := decodeLEB128(delta) + return int64(sz), nil + default: + return 0, ErrInvalidObject.AddDetails("type %q", h.Type) + } +} + +func (p *Packfile) getObjectType(h *ObjectHeader) (typ plumbing.ObjectType, err error) { + switch h.Type { + case plumbing.CommitObject, plumbing.TreeObject, plumbing.BlobObject, plumbing.TagObject: + return h.Type, nil + case plumbing.REFDeltaObject, plumbing.OFSDeltaObject: + var offset int64 + if h.Type == plumbing.REFDeltaObject { + offset, err = p.FindOffset(h.Reference) + if err != nil { + return + } + } else { + offset = h.OffsetReference + } + + if baseType, ok := p.offsetToType[offset]; ok { + typ = baseType + } else { + if _, err = p.s.SeekFromStart(offset); err != nil { + return + } + + h, err = p.nextObjectHeader() + if err != nil { + return + } + + typ, err = p.getObjectType(h) + if err != nil { + return + } + } + default: + err = ErrInvalidObject.AddDetails("type %q", h.Type) + } + + return +} + +func (p *Packfile) nextObject() (plumbing.EncodedObject, error) { + h, err := p.nextObjectHeader() + if err != nil { + return nil, err + } + + hash, err := p.FindHash(h.Offset) + if err != nil { + return nil, err + } + + size, err := p.getObjectSize(h) if err != nil { return nil, err } - obj := new(plumbing.MemoryObject) + typ, err := p.getObjectType(h) + if err != nil { + return nil, err + } + + p.offsetToType[h.Offset] = typ + + return NewDiskObject(hash, typ, h.Offset, size, p), nil +} + +func (p *Packfile) getObjectContent(offset int64) (io.ReadCloser, error) { + if _, err := p.s.SeekFromStart(offset); err != nil { + return nil, err + } + + h, err := p.nextObjectHeader() + if err != nil { + return nil, err + } + + var obj = new(plumbing.MemoryObject) obj.SetSize(h.Length) obj.SetType(h.Type) @@ -82,12 +236,10 @@ func (p *Packfile) nextObject() (plumbing.EncodedObject, error) { } if err != nil { - return obj, err + return nil, err } - p.offsetToHash[h.Offset] = obj.Hash() - - return obj, nil + return obj.Reader() } func (p *Packfile) fillRegularObjectContent(obj plumbing.EncodedObject) error { @@ -132,9 +284,10 @@ func (p *Packfile) fillOFSDeltaObjectContent(obj plumbing.EncodedObject, offset } var base plumbing.EncodedObject - h, ok := p.offsetToHash[offset] - if ok { - base, ok = p.cacheGet(h) + var ok bool + hash, err := p.FindHash(offset) + if err == nil { + base, ok = p.cacheGet(hash) } if !ok { @@ -173,9 +326,7 @@ func (p *Packfile) cachePut(obj plumbing.EncodedObject) { // The iterator returned is not thread-safe, it should be used in the same // thread as the Packfile instance. func (p *Packfile) GetAll() (storer.EncodedObjectIter, error) { - s := NewScanner(p.File) - - _, count, err := s.Header() + entries, err := p.Entries() if err != nil { return nil, err } @@ -185,8 +336,14 @@ func (p *Packfile) GetAll() (storer.EncodedObjectIter, error) { // instance. To not mess with the seeks, it's a new instance with a // different scanner but the same cache and offset to hash map for // reusing as much cache as possible. - d: &Packfile{p.Index, nil, s, p.deltaBaseCache, p.offsetToHash}, - count: int(count), + p: &Packfile{ + p.Index, + p.File, + NewScanner(p.File), + p.deltaBaseCache, + p.offsetToType, + }, + iter: entries, }, nil } @@ -214,18 +371,17 @@ type objectDecoder interface { } type objectIter struct { - d objectDecoder - count int - pos int + p *Packfile + iter idxfile.EntryIter } func (i *objectIter) Next() (plumbing.EncodedObject, error) { - if i.pos >= i.count { - return nil, io.EOF + e, err := i.iter.Next() + if err != nil { + return nil, err } - i.pos++ - return i.d.nextObject() + return i.p.GetByOffset(int64(e.Offset)) } func (i *objectIter) ForEach(f func(plumbing.EncodedObject) error) error { @@ -245,5 +401,5 @@ func (i *objectIter) ForEach(f func(plumbing.EncodedObject) error) error { } func (i *objectIter) Close() { - i.pos = i.count + i.iter.Close() } diff --git a/plumbing/format/packfile/packfile_test.go b/plumbing/format/packfile/packfile_test.go index 10e4080..0d7a806 100644 --- a/plumbing/format/packfile/packfile_test.go +++ b/plumbing/format/packfile/packfile_test.go @@ -1,14 +1,18 @@ package packfile import ( + "bytes" "io" "math" + "io/ioutil" + . "gopkg.in/check.v1" "gopkg.in/src-d/go-billy.v4/osfs" fixtures "gopkg.in/src-d/go-git-fixtures.v3" "gopkg.in/src-d/go-git.v4/plumbing" "gopkg.in/src-d/go-git.v4/plumbing/format/idxfile" + "gopkg.in/src-d/go-git.v4/storage/memory" ) type PackfileSuite struct { @@ -104,6 +108,48 @@ var expectedEntries = map[plumbing.Hash]int64{ plumbing.NewHash("fb72698cab7617ac416264415f13224dfd7a165e"): 84671, } +func (s *PackfileSuite) TestContent(c *C) { + storer := memory.NewObjectStorage() + decoder, err := NewDecoder(NewScanner(s.f.Packfile()), storer) + c.Assert(err, IsNil) + + _, err = decoder.Decode() + c.Assert(err, IsNil) + + iter, err := s.p.GetAll() + c.Assert(err, IsNil) + + for { + o, err := iter.Next() + if err == io.EOF { + break + } + c.Assert(err, IsNil) + + o2, err := storer.EncodedObject(plumbing.AnyObject, o.Hash()) + c.Assert(err, IsNil) + + c.Assert(o.Type(), Equals, o2.Type()) + c.Assert(o.Size(), Equals, o2.Size()) + + r, err := o.Reader() + c.Assert(err, IsNil) + + c1, err := ioutil.ReadAll(r) + c.Assert(err, IsNil) + c.Assert(r.Close(), IsNil) + + r, err = o2.Reader() + c.Assert(err, IsNil) + + c2, err := ioutil.ReadAll(r) + c.Assert(err, IsNil) + c.Assert(r.Close(), IsNil) + + c.Assert(bytes.Compare(c1, c2), Equals, 0) + } +} + func (s *PackfileSuite) SetUpTest(c *C) { s.f = fixtures.Basic().One() diff --git a/storage/memory/storage.go b/storage/memory/storage.go index 2e32509..a950a62 100644 --- a/storage/memory/storage.go +++ b/storage/memory/storage.go @@ -91,6 +91,16 @@ type ObjectStorage struct { Tags map[plumbing.Hash]plumbing.EncodedObject } +func NewObjectStorage() *ObjectStorage { + return &ObjectStorage{ + Objects: make(map[plumbing.Hash]plumbing.EncodedObject), + Commits: make(map[plumbing.Hash]plumbing.EncodedObject), + Trees: make(map[plumbing.Hash]plumbing.EncodedObject), + Blobs: make(map[plumbing.Hash]plumbing.EncodedObject), + Tags: make(map[plumbing.Hash]plumbing.EncodedObject), + } +} + func (o *ObjectStorage) NewEncodedObject() plumbing.EncodedObject { return &plumbing.MemoryObject{} } -- cgit