diff options
author | Paulo Gomes <pjbgf@linux.com> | 2023-11-06 17:18:19 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-11-06 17:18:19 +0000 |
commit | 48bec23593e78f55f1470e10d30b22f3bb7a2c1f (patch) | |
tree | 581a5c41f78346aaa05ed8bff276c6df40fb3e41 | |
parent | 8c1e3e2eb458677af127fc286a43ad262c3cca1e (diff) | |
parent | 1c361adbc1f4b0e3d0743d11f187fb0b3ac4cb4d (diff) | |
download | go-git-48bec23593e78f55f1470e10d30b22f3bb7a2c1f.tar.gz |
Merge pull request #799 from pjbgf/perf2
plumbing: Optimise memory consumption for filesystem storage
-rw-r--r-- | plumbing/format/packfile/parser.go | 195 | ||||
-rw-r--r-- | plumbing/format/packfile/patch_delta.go | 102 | ||||
-rw-r--r-- | repository_test.go | 27 | ||||
-rw-r--r-- | storage/filesystem/object.go | 13 |
4 files changed, 276 insertions, 61 deletions
diff --git a/plumbing/format/packfile/parser.go b/plumbing/format/packfile/parser.go index edbc0e7..62f1d13 100644 --- a/plumbing/format/packfile/parser.go +++ b/plumbing/format/packfile/parser.go @@ -3,6 +3,7 @@ package packfile import ( "bytes" "errors" + "fmt" "io" "github.com/go-git/go-git/v5/plumbing" @@ -174,13 +175,25 @@ func (p *Parser) init() error { return nil } +type objectHeaderWriter func(typ plumbing.ObjectType, sz int64) error + +type lazyObjectWriter interface { + // LazyWriter enables an object to be lazily written. + // It returns: + // - w: a writer to receive the object's content. + // - lwh: a func to write the object header. + // - err: any error from the initial writer creation process. + // + // Note that if the object header is not written BEFORE the writer + // is used, this will result in an invalid object. + LazyWriter() (w io.WriteCloser, lwh objectHeaderWriter, err error) +} + func (p *Parser) indexObjects() error { buf := sync.GetBytesBuffer() defer sync.PutBytesBuffer(buf) for i := uint32(0); i < p.count; i++ { - buf.Reset() - oh, err := p.scanner.NextObjectHeader() if err != nil { return err @@ -220,21 +233,60 @@ func (p *Parser) indexObjects() error { ota = newBaseObject(oh.Offset, oh.Length, t) } - buf.Grow(int(oh.Length)) - _, crc, err := p.scanner.NextObject(buf) + hasher := plumbing.NewHasher(oh.Type, oh.Length) + writers := []io.Writer{hasher} + var obj *plumbing.MemoryObject + + // Lazy writing is only available for non-delta objects. + if p.storage != nil && !delta { + // When a storage is set and supports lazy writing, + // use that instead of creating a memory object. + if low, ok := p.storage.(lazyObjectWriter); ok { + ow, lwh, err := low.LazyWriter() + if err != nil { + return err + } + + if err = lwh(oh.Type, oh.Length); err != nil { + return err + } + + defer ow.Close() + writers = append(writers, ow) + } else { + obj = new(plumbing.MemoryObject) + obj.SetSize(oh.Length) + obj.SetType(oh.Type) + + writers = append(writers, obj) + } + } + if delta && !p.scanner.IsSeekable { + buf.Reset() + buf.Grow(int(oh.Length)) + writers = append(writers, buf) + } + + mw := io.MultiWriter(writers...) + + _, crc, err := p.scanner.NextObject(mw) if err != nil { return err } + // Non delta objects needs to be added into the storage. This + // is only required when lazy writing is not supported. + if obj != nil { + if _, err := p.storage.SetEncodedObject(obj); err != nil { + return err + } + } + ota.Crc32 = crc ota.Length = oh.Length - data := buf.Bytes() if !delta { - sha1, err := getSHA1(ota.Type, data) - if err != nil { - return err - } + sha1 := hasher.Sum() // Move children of placeholder parent into actual parent, in case this // was a non-external delta reference. @@ -249,20 +301,8 @@ func (p *Parser) indexObjects() error { p.oiByHash[ota.SHA1] = ota } - if p.storage != nil && !delta { - obj := new(plumbing.MemoryObject) - obj.SetSize(oh.Length) - obj.SetType(oh.Type) - if _, err := obj.Write(data); err != nil { - return err - } - - if _, err := p.storage.SetEncodedObject(obj); err != nil { - return err - } - } - if delta && !p.scanner.IsSeekable { + data := buf.Bytes() p.deltas[oh.Offset] = make([]byte, len(data)) copy(p.deltas[oh.Offset], data) } @@ -280,23 +320,29 @@ func (p *Parser) resolveDeltas() error { for _, obj := range p.oi { buf.Reset() + buf.Grow(int(obj.Length)) err := p.get(obj, buf) if err != nil { return err } - content := buf.Bytes() if err := p.onInflatedObjectHeader(obj.Type, obj.Length, obj.Offset); err != nil { return err } - if err := p.onInflatedObjectContent(obj.SHA1, obj.Offset, obj.Crc32, content); err != nil { + if err := p.onInflatedObjectContent(obj.SHA1, obj.Offset, obj.Crc32, nil); err != nil { return err } if !obj.IsDelta() && len(obj.Children) > 0 { + // Dealing with an io.ReaderAt object, means we can + // create it once and reuse across all children. + r := bytes.NewReader(buf.Bytes()) for _, child := range obj.Children { - if err := p.resolveObject(io.Discard, child, content); err != nil { + // Even though we are discarding the output, we still need to read it to + // so that the scanner can advance to the next object, and the SHA1 can be + // calculated. + if err := p.resolveObject(io.Discard, child, r); err != nil { return err } p.resolveExternalRef(child) @@ -361,13 +407,13 @@ func (p *Parser) get(o *objectInfo, buf *bytes.Buffer) (err error) { if o.DiskType.IsDelta() { b := sync.GetBytesBuffer() defer sync.PutBytesBuffer(b) + buf.Grow(int(o.Length)) err := p.get(o.Parent, b) if err != nil { return err } - base := b.Bytes() - err = p.resolveObject(buf, o, base) + err = p.resolveObject(buf, o, bytes.NewReader(b.Bytes())) if err != nil { return err } @@ -378,6 +424,13 @@ func (p *Parser) get(o *objectInfo, buf *bytes.Buffer) (err error) { } } + // If the scanner is seekable, caching this data into + // memory by offset seems wasteful. + // There is a trade-off to be considered here in terms + // of execution time vs memory consumption. + // + // TODO: improve seekable execution time, so that we can + // skip this cache. if len(o.Children) > 0 { data := make([]byte, buf.Len()) copy(data, buf.Bytes()) @@ -386,10 +439,25 @@ func (p *Parser) get(o *objectInfo, buf *bytes.Buffer) (err error) { return nil } +// resolveObject resolves an object from base, using information +// provided by o. +// +// This call has the side-effect of changing field values +// from the object info o: +// - Type: OFSDeltaObject may become the target type (e.g. Blob). +// - Size: The size may be update with the target size. +// - Hash: Zero hashes will be calculated as part of the object +// resolution. Hence why this process can't be avoided even when w +// is an io.Discard. +// +// base must be an io.ReaderAt, which is a requirement from +// patchDeltaStream. The main reason being that reversing an +// delta object may lead to going backs and forths within base, +// which is not supported by io.Reader. func (p *Parser) resolveObject( w io.Writer, o *objectInfo, - base []byte, + base io.ReaderAt, ) error { if !o.DiskType.IsDelta() { return nil @@ -400,26 +468,46 @@ func (p *Parser) resolveObject( if err != nil { return err } - data := buf.Bytes() - data, err = applyPatchBase(o, data, base) + writers := []io.Writer{w} + var obj *plumbing.MemoryObject + var lwh objectHeaderWriter + + if p.storage != nil { + if low, ok := p.storage.(lazyObjectWriter); ok { + ow, wh, err := low.LazyWriter() + if err != nil { + return err + } + lwh = wh + + defer ow.Close() + writers = append(writers, ow) + } else { + obj = new(plumbing.MemoryObject) + ow, err := obj.Writer() + if err != nil { + return err + } + + writers = append(writers, ow) + } + } + + mw := io.MultiWriter(writers...) + + err = applyPatchBase(o, base, buf, mw, lwh) if err != nil { return err } - if p.storage != nil { - obj := new(plumbing.MemoryObject) - obj.SetSize(o.Size()) + if obj != nil { obj.SetType(o.Type) - if _, err := obj.Write(data); err != nil { - return err - } - + obj.SetSize(o.Size()) // Size here is correct as it was populated by applyPatchBase. if _, err := p.storage.SetEncodedObject(obj); err != nil { return err } } - _, err = w.Write(data) return err } @@ -443,24 +531,31 @@ func (p *Parser) readData(w io.Writer, o *objectInfo) error { return nil } -func applyPatchBase(ota *objectInfo, data, base []byte) ([]byte, error) { - patched, err := PatchDelta(base, data) - if err != nil { - return nil, err +// applyPatchBase applies the patch to target. +// +// Note that ota will be updated based on the description in resolveObject. +func applyPatchBase(ota *objectInfo, base io.ReaderAt, delta io.Reader, target io.Writer, wh objectHeaderWriter) error { + if target == nil { + return fmt.Errorf("cannot apply patch against nil target") } + typ := ota.Type if ota.SHA1 == plumbing.ZeroHash { - ota.Type = ota.Parent.Type - sha1, err := getSHA1(ota.Type, patched) - if err != nil { - return nil, err - } + typ = ota.Parent.Type + } + + sz, h, err := patchDeltaWriter(target, base, delta, typ, wh) + if err != nil { + return err + } - ota.SHA1 = sha1 - ota.Length = int64(len(patched)) + if ota.SHA1 == plumbing.ZeroHash { + ota.Type = typ + ota.Length = int64(sz) + ota.SHA1 = h } - return patched, nil + return nil } func getSHA1(t plumbing.ObjectType, data []byte) (plumbing.Hash, error) { diff --git a/plumbing/format/packfile/patch_delta.go b/plumbing/format/packfile/patch_delta.go index f00562d..67c20ff 100644 --- a/plumbing/format/packfile/patch_delta.go +++ b/plumbing/format/packfile/patch_delta.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "errors" + "fmt" "io" "math" @@ -265,6 +266,107 @@ func patchDelta(dst *bytes.Buffer, src, delta []byte) error { return nil } +func patchDeltaWriter(dst io.Writer, base io.ReaderAt, delta io.Reader, + typ plumbing.ObjectType, writeHeader objectHeaderWriter) (uint, plumbing.Hash, error) { + deltaBuf := bufio.NewReaderSize(delta, 1024) + srcSz, err := decodeLEB128ByteReader(deltaBuf) + if err != nil { + if err == io.EOF { + return 0, plumbing.ZeroHash, ErrInvalidDelta + } + return 0, plumbing.ZeroHash, err + } + + if r, ok := base.(*bytes.Reader); ok && srcSz != uint(r.Size()) { + return 0, plumbing.ZeroHash, ErrInvalidDelta + } + + targetSz, err := decodeLEB128ByteReader(deltaBuf) + if err != nil { + if err == io.EOF { + return 0, plumbing.ZeroHash, ErrInvalidDelta + } + return 0, plumbing.ZeroHash, err + } + + // If header still needs to be written, caller will provide + // a LazyObjectWriterHeader. This seems to be the case when + // dealing with thin-packs. + if writeHeader != nil { + err = writeHeader(typ, int64(targetSz)) + if err != nil { + return 0, plumbing.ZeroHash, fmt.Errorf("could not lazy write header: %w", err) + } + } + + remainingTargetSz := targetSz + + hasher := plumbing.NewHasher(typ, int64(targetSz)) + mw := io.MultiWriter(dst, hasher) + + bufp := sync.GetByteSlice() + defer sync.PutByteSlice(bufp) + + sr := io.NewSectionReader(base, int64(0), int64(srcSz)) + // Keep both the io.LimitedReader types, so we can reset N. + baselr := io.LimitReader(sr, 0).(*io.LimitedReader) + deltalr := io.LimitReader(deltaBuf, 0).(*io.LimitedReader) + + for { + buf := *bufp + cmd, err := deltaBuf.ReadByte() + if err == io.EOF { + return 0, plumbing.ZeroHash, ErrInvalidDelta + } + if err != nil { + return 0, plumbing.ZeroHash, err + } + + if isCopyFromSrc(cmd) { + offset, err := decodeOffsetByteReader(cmd, deltaBuf) + if err != nil { + return 0, plumbing.ZeroHash, err + } + sz, err := decodeSizeByteReader(cmd, deltaBuf) + if err != nil { + return 0, plumbing.ZeroHash, err + } + + if invalidSize(sz, targetSz) || + invalidOffsetSize(offset, sz, srcSz) { + return 0, plumbing.ZeroHash, err + } + + if _, err := sr.Seek(int64(offset), io.SeekStart); err != nil { + return 0, plumbing.ZeroHash, err + } + baselr.N = int64(sz) + if _, err := io.CopyBuffer(mw, baselr, buf); err != nil { + return 0, plumbing.ZeroHash, err + } + remainingTargetSz -= sz + } else if isCopyFromDelta(cmd) { + sz := uint(cmd) // cmd is the size itself + if invalidSize(sz, targetSz) { + return 0, plumbing.ZeroHash, ErrInvalidDelta + } + deltalr.N = int64(sz) + if _, err := io.CopyBuffer(mw, deltalr, buf); err != nil { + return 0, plumbing.ZeroHash, err + } + + remainingTargetSz -= sz + } else { + return 0, plumbing.ZeroHash, err + } + if remainingTargetSz <= 0 { + break + } + } + + return targetSz, hasher.Sum(), nil +} + // Decodes a number encoded as an unsigned LEB128 at the start of some // binary data and returns the decoded number and the rest of the // stream. diff --git a/repository_test.go b/repository_test.go index f6839b6..35a62f1 100644 --- a/repository_test.go +++ b/repository_test.go @@ -3311,20 +3311,25 @@ func BenchmarkObjects(b *testing.B) { } func BenchmarkPlainClone(b *testing.B) { - for i := 0; i < b.N; i++ { - t, err := os.MkdirTemp("", "") - if err != nil { - b.Fatal(err) - } - _, err = PlainClone(t, false, &CloneOptions{ - URL: "https://github.com/knqyf263/vuln-list", - Depth: 1, + b.StopTimer() + clone := func(b *testing.B) { + _, err := PlainClone(b.TempDir(), true, &CloneOptions{ + URL: "https://github.com/go-git/go-git.git", + Depth: 1, + Tags: NoTags, + SingleBranch: true, }) if err != nil { b.Error(err) } - b.StopTimer() - os.RemoveAll(t) - b.StartTimer() + } + + // Warm-up as the initial clone could have a higher cost which + // may skew results. + clone(b) + + b.StartTimer() + for i := 0; i < b.N; i++ { + clone(b) } } diff --git a/storage/filesystem/object.go b/storage/filesystem/object.go index 846a7b8..e812fe9 100644 --- a/storage/filesystem/object.go +++ b/storage/filesystem/object.go @@ -146,6 +146,19 @@ func (s *ObjectStorage) SetEncodedObject(o plumbing.EncodedObject) (h plumbing.H return o.Hash(), err } +// LazyWriter returns a lazy ObjectWriter that is bound to a DotGit file. +// It first write the header passing on the object type and size, so +// that the object contents can be written later, without the need to +// create a MemoryObject and buffering its entire contents into memory. +func (s *ObjectStorage) LazyWriter() (w io.WriteCloser, wh func(typ plumbing.ObjectType, sz int64) error, err error) { + ow, err := s.dir.NewObject() + if err != nil { + return nil, nil, err + } + + return ow, ow.WriteHeader, nil +} + // HasEncodedObject returns nil if the object exists, without actually // reading the object data from storage. func (s *ObjectStorage) HasEncodedObject(h plumbing.Hash) (err error) { |