diff options
author | zeripath <art27@cantab.net> | 2021-06-30 09:25:19 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-06-30 10:25:19 +0200 |
commit | b4368b2a2ca4103b1ff4e37c34a963127342747e (patch) | |
tree | 5a3616045c4be8e7d64706017cf4380f6937ad32 | |
parent | da810275bf682d29a530ed819aff175f47bd7634 (diff) | |
download | go-git-b4368b2a2ca4103b1ff4e37c34a963127342747e.tar.gz |
plumbing: format/packfile, prevent large objects from being read into memory completely (#330)
This PR adds code to prevent large objects from being read into memory
from packfiles or the filesystem.
Objects greater than 1Mb are now no longer directly stored in the cache
or read completely into memory.
This PR differs and improves the previous broken #323 by fixing several
bugs in the reader and transparently wrapping ReaderAt as a Reader.
Signed-off-by: Andrew Thornton <art27@cantab.net>
-rw-r--r-- | plumbing/format/packfile/delta_test.go | 51 | ||||
-rw-r--r-- | plumbing/format/packfile/encoder_advanced_test.go | 2 | ||||
-rw-r--r-- | plumbing/format/packfile/encoder_test.go | 2 | ||||
-rw-r--r-- | plumbing/format/packfile/fsobject.go | 54 | ||||
-rw-r--r-- | plumbing/format/packfile/packfile.go | 92 | ||||
-rw-r--r-- | plumbing/format/packfile/packfile_test.go | 12 | ||||
-rw-r--r-- | plumbing/format/packfile/patch_delta.go | 210 | ||||
-rw-r--r-- | plumbing/format/packfile/scanner.go | 15 | ||||
-rw-r--r-- | storage/filesystem/dotgit/reader.go | 79 | ||||
-rw-r--r-- | storage/filesystem/object.go | 21 | ||||
-rw-r--r-- | storage/filesystem/object_test.go | 63 | ||||
-rw-r--r-- | storage/filesystem/storage.go | 3 | ||||
-rw-r--r-- | utils/ioutil/common.go | 40 |
13 files changed, 601 insertions, 43 deletions
diff --git a/plumbing/format/packfile/delta_test.go b/plumbing/format/packfile/delta_test.go index 98f53f6..137e485 100644 --- a/plumbing/format/packfile/delta_test.go +++ b/plumbing/format/packfile/delta_test.go @@ -1,8 +1,11 @@ package packfile import ( + "bytes" + "io/ioutil" "math/rand" + "github.com/go-git/go-git/v5/plumbing" . "gopkg.in/check.v1" ) @@ -97,6 +100,32 @@ func (s *DeltaSuite) TestAddDelta(c *C) { } } +func (s *DeltaSuite) TestAddDeltaReader(c *C) { + for _, t := range s.testCases { + baseBuf := genBytes(t.base) + baseObj := &plumbing.MemoryObject{} + baseObj.Write(baseBuf) + + targetBuf := genBytes(t.target) + + delta := DiffDelta(baseBuf, targetBuf) + deltaRC := ioutil.NopCloser(bytes.NewReader(delta)) + + c.Log("Executing test case:", t.description) + + resultRC, err := ReaderFromDelta(baseObj, deltaRC) + c.Assert(err, IsNil) + + result, err := ioutil.ReadAll(resultRC) + c.Assert(err, IsNil) + + err = resultRC.Close() + c.Assert(err, IsNil) + + c.Assert(result, DeepEquals, targetBuf) + } +} + func (s *DeltaSuite) TestIncompleteDelta(c *C) { for _, t := range s.testCases { c.Log("Incomplete delta on:", t.description) @@ -125,3 +154,25 @@ func (s *DeltaSuite) TestMaxCopySizeDelta(c *C) { c.Assert(err, IsNil) c.Assert(result, DeepEquals, targetBuf) } + +func (s *DeltaSuite) TestMaxCopySizeDeltaReader(c *C) { + baseBuf := randBytes(maxCopySize) + baseObj := &plumbing.MemoryObject{} + baseObj.Write(baseBuf) + + targetBuf := baseBuf[0:] + targetBuf = append(targetBuf, byte(1)) + + delta := DiffDelta(baseBuf, targetBuf) + deltaRC := ioutil.NopCloser(bytes.NewReader(delta)) + + resultRC, err := ReaderFromDelta(baseObj, deltaRC) + c.Assert(err, IsNil) + + result, err := ioutil.ReadAll(resultRC) + c.Assert(err, IsNil) + + err = resultRC.Close() + c.Assert(err, IsNil) + c.Assert(result, DeepEquals, targetBuf) +} diff --git a/plumbing/format/packfile/encoder_advanced_test.go b/plumbing/format/packfile/encoder_advanced_test.go index 95db5c0..15c0fba 100644 --- a/plumbing/format/packfile/encoder_advanced_test.go +++ b/plumbing/format/packfile/encoder_advanced_test.go @@ -105,7 +105,7 @@ func (s *EncoderAdvancedSuite) testEncodeDecode( _, err = f.Seek(0, io.SeekStart) c.Assert(err, IsNil) - p := NewPackfile(index, fs, f) + p := NewPackfile(index, fs, f, 0) decodeHash, err := p.ID() c.Assert(err, IsNil) diff --git a/plumbing/format/packfile/encoder_test.go b/plumbing/format/packfile/encoder_test.go index d2db892..c9d49c3 100644 --- a/plumbing/format/packfile/encoder_test.go +++ b/plumbing/format/packfile/encoder_test.go @@ -318,7 +318,7 @@ func packfileFromReader(c *C, buf *bytes.Buffer) (*Packfile, func()) { index, err := w.Index() c.Assert(err, IsNil) - return NewPackfile(index, fs, file), func() { + return NewPackfile(index, fs, file, 0), func() { c.Assert(file.Close(), IsNil) } } diff --git a/plumbing/format/packfile/fsobject.go b/plumbing/format/packfile/fsobject.go index c5edaf5..a395d17 100644 --- a/plumbing/format/packfile/fsobject.go +++ b/plumbing/format/packfile/fsobject.go @@ -7,19 +7,21 @@ import ( "github.com/go-git/go-git/v5/plumbing" "github.com/go-git/go-git/v5/plumbing/cache" "github.com/go-git/go-git/v5/plumbing/format/idxfile" + "github.com/go-git/go-git/v5/utils/ioutil" ) // FSObject is an object from the packfile on the filesystem. type FSObject struct { - hash plumbing.Hash - h *ObjectHeader - offset int64 - size int64 - typ plumbing.ObjectType - index idxfile.Index - fs billy.Filesystem - path string - cache cache.Object + hash plumbing.Hash + h *ObjectHeader + offset int64 + size int64 + typ plumbing.ObjectType + index idxfile.Index + fs billy.Filesystem + path string + cache cache.Object + largeObjectThreshold int64 } // NewFSObject creates a new filesystem object. @@ -32,16 +34,18 @@ func NewFSObject( fs billy.Filesystem, path string, cache cache.Object, + largeObjectThreshold int64, ) *FSObject { return &FSObject{ - hash: hash, - offset: offset, - size: contentSize, - typ: finalType, - index: index, - fs: fs, - path: path, - cache: cache, + hash: hash, + offset: offset, + size: contentSize, + typ: finalType, + index: index, + fs: fs, + path: path, + cache: cache, + largeObjectThreshold: largeObjectThreshold, } } @@ -62,7 +66,21 @@ func (o *FSObject) Reader() (io.ReadCloser, error) { return nil, err } - p := NewPackfileWithCache(o.index, nil, f, o.cache) + p := NewPackfileWithCache(o.index, nil, f, o.cache, o.largeObjectThreshold) + if o.largeObjectThreshold > 0 && o.size > o.largeObjectThreshold { + // We have a big object + h, err := p.objectHeaderAtOffset(o.offset) + if err != nil { + return nil, err + } + + r, err := p.getReaderDirect(h) + if err != nil { + _ = f.Close() + return nil, err + } + return ioutil.NewReadCloserWithCloser(r, f.Close), nil + } r, err := p.getObjectContent(o.offset) if err != nil { _ = f.Close() diff --git a/plumbing/format/packfile/packfile.go b/plumbing/format/packfile/packfile.go index ddd7f62..8dd6041 100644 --- a/plumbing/format/packfile/packfile.go +++ b/plumbing/format/packfile/packfile.go @@ -2,6 +2,8 @@ package packfile import ( "bytes" + "compress/zlib" + "fmt" "io" "os" @@ -35,11 +37,12 @@ const smallObjectThreshold = 16 * 1024 // Packfile allows retrieving information from inside a packfile. type Packfile struct { idxfile.Index - fs billy.Filesystem - file billy.File - s *Scanner - deltaBaseCache cache.Object - offsetToType map[int64]plumbing.ObjectType + fs billy.Filesystem + file billy.File + s *Scanner + deltaBaseCache cache.Object + offsetToType map[int64]plumbing.ObjectType + largeObjectThreshold int64 } // NewPackfileWithCache creates a new Packfile with the given object cache. @@ -50,6 +53,7 @@ func NewPackfileWithCache( fs billy.Filesystem, file billy.File, cache cache.Object, + largeObjectThreshold int64, ) *Packfile { s := NewScanner(file) return &Packfile{ @@ -59,6 +63,7 @@ func NewPackfileWithCache( s, cache, make(map[int64]plumbing.ObjectType), + largeObjectThreshold, } } @@ -66,8 +71,8 @@ func NewPackfileWithCache( // and packfile idx. // If the filesystem is provided, the packfile will return FSObjects, otherwise // it will return MemoryObjects. -func NewPackfile(index idxfile.Index, fs billy.Filesystem, file billy.File) *Packfile { - return NewPackfileWithCache(index, fs, file, cache.NewObjectLRUDefault()) +func NewPackfile(index idxfile.Index, fs billy.Filesystem, file billy.File, largeObjectThreshold int64) *Packfile { + return NewPackfileWithCache(index, fs, file, cache.NewObjectLRUDefault(), largeObjectThreshold) } // Get retrieves the encoded object in the packfile with the given hash. @@ -263,6 +268,7 @@ func (p *Packfile) getNextObject(h *ObjectHeader, hash plumbing.Hash) (plumbing. p.fs, p.file.Name(), p.deltaBaseCache, + p.largeObjectThreshold, ), nil } @@ -282,6 +288,50 @@ func (p *Packfile) getObjectContent(offset int64) (io.ReadCloser, error) { return obj.Reader() } +func asyncReader(p *Packfile) (io.ReadCloser, error) { + reader := ioutil.NewReaderUsingReaderAt(p.file, p.s.r.offset) + zr := zlibReaderPool.Get().(io.ReadCloser) + + if err := zr.(zlib.Resetter).Reset(reader, nil); err != nil { + return nil, fmt.Errorf("zlib reset error: %s", err) + } + + return ioutil.NewReadCloserWithCloser(zr, func() error { + zlibReaderPool.Put(zr) + return nil + }), nil + +} + +func (p *Packfile) getReaderDirect(h *ObjectHeader) (io.ReadCloser, error) { + switch h.Type { + case plumbing.CommitObject, plumbing.TreeObject, plumbing.BlobObject, plumbing.TagObject: + return asyncReader(p) + case plumbing.REFDeltaObject: + deltaRc, err := asyncReader(p) + if err != nil { + return nil, err + } + r, err := p.readREFDeltaObjectContent(h, deltaRc) + if err != nil { + return nil, err + } + return r, nil + case plumbing.OFSDeltaObject: + deltaRc, err := asyncReader(p) + if err != nil { + return nil, err + } + r, err := p.readOFSDeltaObjectContent(h, deltaRc) + if err != nil { + return nil, err + } + return r, nil + default: + return nil, ErrInvalidObject.AddDetails("type %q", h.Type) + } +} + func (p *Packfile) getNextMemoryObject(h *ObjectHeader) (plumbing.EncodedObject, error) { var obj = new(plumbing.MemoryObject) obj.SetSize(h.Length) @@ -334,6 +384,20 @@ func (p *Packfile) fillREFDeltaObjectContent(obj plumbing.EncodedObject, ref plu return p.fillREFDeltaObjectContentWithBuffer(obj, ref, buf) } +func (p *Packfile) readREFDeltaObjectContent(h *ObjectHeader, deltaRC io.Reader) (io.ReadCloser, error) { + var err error + + base, ok := p.cacheGet(h.Reference) + if !ok { + base, err = p.Get(h.Reference) + if err != nil { + return nil, err + } + } + + return ReaderFromDelta(base, deltaRC) +} + func (p *Packfile) fillREFDeltaObjectContentWithBuffer(obj plumbing.EncodedObject, ref plumbing.Hash, buf *bytes.Buffer) error { var err error @@ -364,6 +428,20 @@ func (p *Packfile) fillOFSDeltaObjectContent(obj plumbing.EncodedObject, offset return p.fillOFSDeltaObjectContentWithBuffer(obj, offset, buf) } +func (p *Packfile) readOFSDeltaObjectContent(h *ObjectHeader, deltaRC io.Reader) (io.ReadCloser, error) { + hash, err := p.FindHash(h.OffsetReference) + if err != nil { + return nil, err + } + + base, err := p.objectAtOffset(h.OffsetReference, hash) + if err != nil { + return nil, err + } + + return ReaderFromDelta(base, deltaRC) +} + func (p *Packfile) fillOFSDeltaObjectContentWithBuffer(obj plumbing.EncodedObject, offset int64, buf *bytes.Buffer) error { hash, err := p.FindHash(offset) if err != nil { diff --git a/plumbing/format/packfile/packfile_test.go b/plumbing/format/packfile/packfile_test.go index 60c7c73..6af8817 100644 --- a/plumbing/format/packfile/packfile_test.go +++ b/plumbing/format/packfile/packfile_test.go @@ -111,7 +111,7 @@ func (s *PackfileSuite) SetUpTest(c *C) { s.idx = idxfile.NewMemoryIndex() c.Assert(idxfile.NewDecoder(s.f.Idx()).Decode(s.idx), IsNil) - s.p = packfile.NewPackfile(s.idx, fixtures.Filesystem, s.f.Packfile()) + s.p = packfile.NewPackfile(s.idx, fixtures.Filesystem, s.f.Packfile(), 0) } func (s *PackfileSuite) TearDownTest(c *C) { @@ -122,7 +122,7 @@ func (s *PackfileSuite) TestDecode(c *C) { fixtures.Basic().ByTag("packfile").Test(c, func(f *fixtures.Fixture) { index := getIndexFromIdxFile(f.Idx()) - p := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile()) + p := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile(), 0) defer p.Close() for _, h := range expectedHashes { @@ -138,7 +138,7 @@ func (s *PackfileSuite) TestDecodeByTypeRefDelta(c *C) { index := getIndexFromIdxFile(f.Idx()) - packfile := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile()) + packfile := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile(), 0) defer packfile.Close() iter, err := packfile.GetByType(plumbing.CommitObject) @@ -171,7 +171,7 @@ func (s *PackfileSuite) TestDecodeByType(c *C) { for _, t := range ts { index := getIndexFromIdxFile(f.Idx()) - packfile := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile()) + packfile := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile(), 0) defer packfile.Close() iter, err := packfile.GetByType(t) @@ -189,7 +189,7 @@ func (s *PackfileSuite) TestDecodeByTypeConstructor(c *C) { f := fixtures.Basic().ByTag("packfile").One() index := getIndexFromIdxFile(f.Idx()) - packfile := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile()) + packfile := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile(), 0) defer packfile.Close() _, err := packfile.GetByType(plumbing.OFSDeltaObject) @@ -266,7 +266,7 @@ func (s *PackfileSuite) TestSize(c *C) { index := getIndexFromIdxFile(f.Idx()) - packfile := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile()) + packfile := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile(), 0) defer packfile.Close() // Get the size of binary.jpg, which is not delta-encoded. diff --git a/plumbing/format/packfile/patch_delta.go b/plumbing/format/packfile/patch_delta.go index 9e90f30..17da11e 100644 --- a/plumbing/format/packfile/patch_delta.go +++ b/plumbing/format/packfile/patch_delta.go @@ -1,9 +1,11 @@ package packfile import ( + "bufio" "bytes" "errors" "io" + "math" "github.com/go-git/go-git/v5/plumbing" "github.com/go-git/go-git/v5/utils/ioutil" @@ -73,6 +75,131 @@ func PatchDelta(src, delta []byte) ([]byte, error) { return b.Bytes(), nil } +func ReaderFromDelta(base plumbing.EncodedObject, deltaRC io.Reader) (io.ReadCloser, error) { + deltaBuf := bufio.NewReaderSize(deltaRC, 1024) + srcSz, err := decodeLEB128ByteReader(deltaBuf) + if err != nil { + if err == io.EOF { + return nil, ErrInvalidDelta + } + return nil, err + } + if srcSz != uint(base.Size()) { + return nil, ErrInvalidDelta + } + + targetSz, err := decodeLEB128ByteReader(deltaBuf) + if err != nil { + if err == io.EOF { + return nil, ErrInvalidDelta + } + return nil, err + } + remainingTargetSz := targetSz + + dstRd, dstWr := io.Pipe() + + go func() { + baseRd, err := base.Reader() + if err != nil { + _ = dstWr.CloseWithError(ErrInvalidDelta) + return + } + defer baseRd.Close() + + baseBuf := bufio.NewReader(baseRd) + basePos := uint(0) + + for { + cmd, err := deltaBuf.ReadByte() + if err == io.EOF { + _ = dstWr.CloseWithError(ErrInvalidDelta) + return + } + if err != nil { + _ = dstWr.CloseWithError(err) + return + } + + if isCopyFromSrc(cmd) { + offset, err := decodeOffsetByteReader(cmd, deltaBuf) + if err != nil { + _ = dstWr.CloseWithError(err) + return + } + sz, err := decodeSizeByteReader(cmd, deltaBuf) + if err != nil { + _ = dstWr.CloseWithError(err) + return + } + + if invalidSize(sz, targetSz) || + invalidOffsetSize(offset, sz, srcSz) { + _ = dstWr.Close() + return + } + + discard := offset - basePos + if basePos > offset { + _ = baseRd.Close() + baseRd, err = base.Reader() + if err != nil { + _ = dstWr.CloseWithError(ErrInvalidDelta) + return + } + baseBuf.Reset(baseRd) + discard = offset + } + for discard > math.MaxInt32 { + n, err := baseBuf.Discard(math.MaxInt32) + if err != nil { + _ = dstWr.CloseWithError(err) + return + } + basePos += uint(n) + discard -= uint(n) + } + for discard > 0 { + n, err := baseBuf.Discard(int(discard)) + if err != nil { + _ = dstWr.CloseWithError(err) + return + } + basePos += uint(n) + discard -= uint(n) + } + if _, err := io.Copy(dstWr, io.LimitReader(baseBuf, int64(sz))); err != nil { + _ = dstWr.CloseWithError(err) + return + } + remainingTargetSz -= sz + basePos += sz + } else if isCopyFromDelta(cmd) { + sz := uint(cmd) // cmd is the size itself + if invalidSize(sz, targetSz) { + _ = dstWr.CloseWithError(ErrInvalidDelta) + return + } + if _, err := io.Copy(dstWr, io.LimitReader(deltaBuf, int64(sz))); err != nil { + _ = dstWr.CloseWithError(err) + return + } + + remainingTargetSz -= sz + } else { + _ = dstWr.CloseWithError(ErrDeltaCmd) + return + } + if remainingTargetSz <= 0 { + _ = dstWr.Close() + return + } + } + }() + + return dstRd, nil +} + func patchDelta(dst *bytes.Buffer, src, delta []byte) error { if len(delta) < deltaSizeMin { return ErrInvalidDelta @@ -161,6 +288,25 @@ func decodeLEB128(input []byte) (uint, []byte) { return num, input[sz:] } +func decodeLEB128ByteReader(input io.ByteReader) (uint, error) { + var num, sz uint + for { + b, err := input.ReadByte() + if err != nil { + return 0, err + } + + num |= (uint(b) & payload) << (sz * 7) // concats 7 bits chunks + sz++ + + if uint(b)&continuation == 0 { + break + } + } + + return num, nil +} + const ( payload = 0x7f // 0111 1111 continuation = 0x80 // 1000 0000 @@ -174,6 +320,40 @@ func isCopyFromDelta(cmd byte) bool { return (cmd&0x80) == 0 && cmd != 0 } +func decodeOffsetByteReader(cmd byte, delta io.ByteReader) (uint, error) { + var offset uint + if (cmd & 0x01) != 0 { + next, err := delta.ReadByte() + if err != nil { + return 0, err + } + offset = uint(next) + } + if (cmd & 0x02) != 0 { + next, err := delta.ReadByte() + if err != nil { + return 0, err + } + offset |= uint(next) << 8 + } + if (cmd & 0x04) != 0 { + next, err := delta.ReadByte() + if err != nil { + return 0, err + } + offset |= uint(next) << 16 + } + if (cmd & 0x08) != 0 { + next, err := delta.ReadByte() + if err != nil { + return 0, err + } + offset |= uint(next) << 24 + } + + return offset, nil +} + func decodeOffset(cmd byte, delta []byte) (uint, []byte, error) { var offset uint if (cmd & 0x01) != 0 { @@ -208,6 +388,36 @@ func decodeOffset(cmd byte, delta []byte) (uint, []byte, error) { return offset, delta, nil } +func decodeSizeByteReader(cmd byte, delta io.ByteReader) (uint, error) { + var sz uint + if (cmd & 0x10) != 0 { + next, err := delta.ReadByte() + if err != nil { + return 0, err + } + sz = uint(next) + } + if (cmd & 0x20) != 0 { + next, err := delta.ReadByte() + if err != nil { + return 0, err + } + sz |= uint(next) << 8 + } + if (cmd & 0x40) != 0 { + next, err := delta.ReadByte() + if err != nil { + return 0, err + } + sz |= uint(next) << 16 + } + if sz == 0 { + sz = 0x10000 + } + + return sz, nil +} + func decodeSize(cmd byte, delta []byte) (uint, []byte, error) { var sz uint if (cmd & 0x10) != 0 { diff --git a/plumbing/format/packfile/scanner.go b/plumbing/format/packfile/scanner.go index 6e6a687..5d9e8fb 100644 --- a/plumbing/format/packfile/scanner.go +++ b/plumbing/format/packfile/scanner.go @@ -320,6 +320,21 @@ func (s *Scanner) NextObject(w io.Writer) (written int64, crc32 uint32, err erro return } +// ReadObject returns a reader for the object content and an error +func (s *Scanner) ReadObject() (io.ReadCloser, error) { + s.pendingObject = nil + zr := zlibReaderPool.Get().(io.ReadCloser) + + if err := zr.(zlib.Resetter).Reset(s.r, nil); err != nil { + return nil, fmt.Errorf("zlib reset error: %s", err) + } + + return ioutil.NewReadCloserWithCloser(zr, func() error { + zlibReaderPool.Put(zr) + return nil + }), nil +} + // ReadRegularObject reads and write a non-deltified object // from it zlib stream in an object entry in the packfile. func (s *Scanner) copyObject(w io.Writer) (n int64, err error) { diff --git a/storage/filesystem/dotgit/reader.go b/storage/filesystem/dotgit/reader.go new file mode 100644 index 0000000..a82ac94 --- /dev/null +++ b/storage/filesystem/dotgit/reader.go @@ -0,0 +1,79 @@ +package dotgit + +import ( + "fmt" + "io" + "os" + + "github.com/go-git/go-git/v5/plumbing" + "github.com/go-git/go-git/v5/plumbing/format/objfile" + "github.com/go-git/go-git/v5/utils/ioutil" +) + +var _ (plumbing.EncodedObject) = &EncodedObject{} + +type EncodedObject struct { + dir *DotGit + h plumbing.Hash + t plumbing.ObjectType + sz int64 +} + +func (e *EncodedObject) Hash() plumbing.Hash { + return e.h +} + +func (e *EncodedObject) Reader() (io.ReadCloser, error) { + f, err := e.dir.Object(e.h) + if err != nil { + if os.IsNotExist(err) { + return nil, plumbing.ErrObjectNotFound + } + + return nil, err + } + r, err := objfile.NewReader(f) + if err != nil { + return nil, err + } + + t, size, err := r.Header() + if err != nil { + _ = r.Close() + return nil, err + } + if t != e.t { + _ = r.Close() + return nil, objfile.ErrHeader + } + if size != e.sz { + _ = r.Close() + return nil, objfile.ErrHeader + } + return ioutil.NewReadCloserWithCloser(r, f.Close), nil +} + +func (e *EncodedObject) SetType(plumbing.ObjectType) {} + +func (e *EncodedObject) Type() plumbing.ObjectType { + return e.t +} + +func (e *EncodedObject) Size() int64 { + return e.sz +} + +func (e *EncodedObject) SetSize(int64) {} + +func (e *EncodedObject) Writer() (io.WriteCloser, error) { + return nil, fmt.Errorf("Not supported") +} + +func NewEncodedObject(dir *DotGit, h plumbing.Hash, t plumbing.ObjectType, size int64) *EncodedObject { + return &EncodedObject{ + dir: dir, + h: h, + t: t, + sz: size, + } +} diff --git a/storage/filesystem/object.go b/storage/filesystem/object.go index 0c25dad..5c91bcd 100644 --- a/storage/filesystem/object.go +++ b/storage/filesystem/object.go @@ -204,9 +204,9 @@ func (s *ObjectStorage) packfile(idx idxfile.Index, pack plumbing.Hash) (*packfi var p *packfile.Packfile if s.objectCache != nil { - p = packfile.NewPackfileWithCache(idx, s.dir.Fs(), f, s.objectCache) + p = packfile.NewPackfileWithCache(idx, s.dir.Fs(), f, s.objectCache, s.options.LargeObjectThreshold) } else { - p = packfile.NewPackfile(idx, s.dir.Fs(), f) + p = packfile.NewPackfile(idx, s.dir.Fs(), f, s.options.LargeObjectThreshold) } return p, s.storePackfileInCache(pack, p) @@ -389,7 +389,6 @@ func (s *ObjectStorage) getFromUnpacked(h plumbing.Hash) (obj plumbing.EncodedOb return cacheObj, nil } - obj = s.NewEncodedObject() r, err := objfile.NewReader(f) if err != nil { return nil, err @@ -402,6 +401,13 @@ func (s *ObjectStorage) getFromUnpacked(h plumbing.Hash) (obj plumbing.EncodedOb return nil, err } + if s.options.LargeObjectThreshold > 0 && size > s.options.LargeObjectThreshold { + obj = dotgit.NewEncodedObject(s.dir, h, t, size) + return obj, nil + } + + obj = s.NewEncodedObject() + obj.SetType(t) obj.SetSize(size) w, err := obj.Writer() @@ -595,6 +601,7 @@ func (s *ObjectStorage) buildPackfileIters( return newPackfileIter( s.dir.Fs(), pack, t, seen, s.index[h], s.objectCache, s.options.KeepDescriptors, + s.options.LargeObjectThreshold, ) }, }, nil @@ -684,6 +691,7 @@ func NewPackfileIter( idxFile billy.File, t plumbing.ObjectType, keepPack bool, + largeObjectThreshold int64, ) (storer.EncodedObjectIter, error) { idx := idxfile.NewMemoryIndex() if err := idxfile.NewDecoder(idxFile).Decode(idx); err != nil { @@ -695,7 +703,7 @@ func NewPackfileIter( } seen := make(map[plumbing.Hash]struct{}) - return newPackfileIter(fs, f, t, seen, idx, nil, keepPack) + return newPackfileIter(fs, f, t, seen, idx, nil, keepPack, largeObjectThreshold) } func newPackfileIter( @@ -706,12 +714,13 @@ func newPackfileIter( index idxfile.Index, cache cache.Object, keepPack bool, + largeObjectThreshold int64, ) (storer.EncodedObjectIter, error) { var p *packfile.Packfile if cache != nil { - p = packfile.NewPackfileWithCache(index, fs, f, cache) + p = packfile.NewPackfileWithCache(index, fs, f, cache, largeObjectThreshold) } else { - p = packfile.NewPackfile(index, fs, f) + p = packfile.NewPackfile(index, fs, f, largeObjectThreshold) } iter, err := p.GetByType(t) diff --git a/storage/filesystem/object_test.go b/storage/filesystem/object_test.go index 22f5b0c..59b40d3 100644 --- a/storage/filesystem/object_test.go +++ b/storage/filesystem/object_test.go @@ -107,6 +107,27 @@ func (s *FsSuite) TestGetFromPackfileMaxOpenDescriptors(c *C) { c.Assert(err, IsNil) } +func (s *FsSuite) TestGetFromPackfileMaxOpenDescriptorsLargeObjectThreshold(c *C) { + fs := fixtures.ByTag(".git").ByTag("multi-packfile").One().DotGit() + o := NewObjectStorageWithOptions(dotgit.New(fs), cache.NewObjectLRUDefault(), Options{ + MaxOpenDescriptors: 1, + LargeObjectThreshold: 1, + }) + + expected := plumbing.NewHash("8d45a34641d73851e01d3754320b33bb5be3c4d3") + obj, err := o.getFromPackfile(expected, false) + c.Assert(err, IsNil) + c.Assert(obj.Hash(), Equals, expected) + + expected = plumbing.NewHash("e9cfa4c9ca160546efd7e8582ec77952a27b17db") + obj, err = o.getFromPackfile(expected, false) + c.Assert(err, IsNil) + c.Assert(obj.Hash(), Equals, expected) + + err = o.Close() + c.Assert(err, IsNil) +} + func (s *FsSuite) TestGetSizeOfObjectFile(c *C) { fs := fixtures.ByTag(".git").ByTag("unpacked").One().DotGit() o := NewObjectStorage(dotgit.New(fs), cache.NewObjectLRUDefault()) @@ -160,6 +181,21 @@ func (s *FsSuite) TestGetFromPackfileMultiplePackfiles(c *C) { c.Assert(obj.Hash(), Equals, expected) } +func (s *FsSuite) TestGetFromPackfileMultiplePackfilesLargeObjectThreshold(c *C) { + fs := fixtures.ByTag(".git").ByTag("multi-packfile").One().DotGit() + o := NewObjectStorageWithOptions(dotgit.New(fs), cache.NewObjectLRUDefault(), Options{LargeObjectThreshold: 1}) + + expected := plumbing.NewHash("8d45a34641d73851e01d3754320b33bb5be3c4d3") + obj, err := o.getFromPackfile(expected, false) + c.Assert(err, IsNil) + c.Assert(obj.Hash(), Equals, expected) + + expected = plumbing.NewHash("e9cfa4c9ca160546efd7e8582ec77952a27b17db") + obj, err = o.getFromPackfile(expected, false) + c.Assert(err, IsNil) + c.Assert(obj.Hash(), Equals, expected) +} + func (s *FsSuite) TestIter(c *C) { fixtures.ByTag(".git").ByTag("packfile").Test(c, func(f *fixtures.Fixture) { fs := f.DotGit() @@ -179,6 +215,25 @@ func (s *FsSuite) TestIter(c *C) { }) } +func (s *FsSuite) TestIterLargeObjectThreshold(c *C) { + fixtures.ByTag(".git").ByTag("packfile").Test(c, func(f *fixtures.Fixture) { + fs := f.DotGit() + o := NewObjectStorageWithOptions(dotgit.New(fs), cache.NewObjectLRUDefault(), Options{LargeObjectThreshold: 1}) + + iter, err := o.IterEncodedObjects(plumbing.AnyObject) + c.Assert(err, IsNil) + + var count int32 + err = iter.ForEach(func(o plumbing.EncodedObject) error { + count++ + return nil + }) + + c.Assert(err, IsNil) + c.Assert(count, Equals, f.ObjectsCount) + }) +} + func (s *FsSuite) TestIterWithType(c *C) { fixtures.ByTag(".git").Test(c, func(f *fixtures.Fixture) { for _, t := range objectTypes { @@ -215,7 +270,7 @@ func (s *FsSuite) TestPackfileIter(c *C) { idxf, err := dg.ObjectPackIdx(h) c.Assert(err, IsNil) - iter, err := NewPackfileIter(fs, f, idxf, t, false) + iter, err := NewPackfileIter(fs, f, idxf, t, false, 0) c.Assert(err, IsNil) err = iter.ForEach(func(o plumbing.EncodedObject) error { @@ -298,7 +353,7 @@ func (s *FsSuite) TestPackfileIterKeepDescriptors(c *C) { idxf, err := dg.ObjectPackIdx(h) c.Assert(err, IsNil) - iter, err := NewPackfileIter(fs, f, idxf, t, true) + iter, err := NewPackfileIter(fs, f, idxf, t, true, 0) c.Assert(err, IsNil) err = iter.ForEach(func(o plumbing.EncodedObject) error { @@ -377,7 +432,7 @@ func BenchmarkPackfileIter(b *testing.B) { b.Fatal(err) } - iter, err := NewPackfileIter(fs, f, idxf, t, false) + iter, err := NewPackfileIter(fs, f, idxf, t, false, 0) if err != nil { b.Fatal(err) } @@ -425,7 +480,7 @@ func BenchmarkPackfileIterReadContent(b *testing.B) { b.Fatal(err) } - iter, err := NewPackfileIter(fs, f, idxf, t, false) + iter, err := NewPackfileIter(fs, f, idxf, t, false, 0) if err != nil { b.Fatal(err) } diff --git a/storage/filesystem/storage.go b/storage/filesystem/storage.go index 8b69b27..7e7a2c5 100644 --- a/storage/filesystem/storage.go +++ b/storage/filesystem/storage.go @@ -34,6 +34,9 @@ type Options struct { // MaxOpenDescriptors is the max number of file descriptors to keep // open. If KeepDescriptors is true, all file descriptors will remain open. MaxOpenDescriptors int + // LargeObjectThreshold maximum object size (in bytes) that will be read in to memory. + // If left unset or set to 0 there is no limit + LargeObjectThreshold int64 } // NewStorage returns a new Storage backed by a given `fs.Filesystem` and cache. diff --git a/utils/ioutil/common.go b/utils/ioutil/common.go index b52e85a..b0ace4e 100644 --- a/utils/ioutil/common.go +++ b/utils/ioutil/common.go @@ -55,6 +55,28 @@ func NewReadCloser(r io.Reader, c io.Closer) io.ReadCloser { return &readCloser{Reader: r, closer: c} } +type readCloserCloser struct { + io.ReadCloser + closer func() error +} + +func (r *readCloserCloser) Close() (err error) { + defer func() { + if err == nil { + err = r.closer() + return + } + _ = r.closer() + }() + return r.ReadCloser.Close() +} + +// NewReadCloserWithCloser creates an `io.ReadCloser` with the given `io.ReaderCloser` and +// `io.Closer` that ensures that the closer is closed on close +func NewReadCloserWithCloser(r io.ReadCloser, c func() error) io.ReadCloser { + return &readCloserCloser{ReadCloser: r, closer: c} +} + type writeCloser struct { io.Writer closer io.Closer @@ -82,6 +104,24 @@ func WriteNopCloser(w io.Writer) io.WriteCloser { return writeNopCloser{w} } +type readerAtAsReader struct { + io.ReaderAt + offset int64 +} + +func (r *readerAtAsReader) Read(bs []byte) (int, error) { + n, err := r.ReaderAt.ReadAt(bs, r.offset) + r.offset += int64(n) + return n, err +} + +func NewReaderUsingReaderAt(r io.ReaderAt, offset int64) io.Reader { + return &readerAtAsReader{ + ReaderAt: r, + offset: offset, + } +} + // CheckClose calls Close on the given io.Closer. If the given *error points to // nil, it will be assigned the error returned by Close. Otherwise, any error // returned by Close will be ignored. CheckClose is usually called with defer. |