From 1c361adbc1f4b0e3d0743d11f187fb0b3ac4cb4d Mon Sep 17 00:00:00 2001 From: Paulo Gomes Date: Thu, 20 Jul 2023 21:19:28 +0100 Subject: plumbing: Optimise memory consumption for filesystem storage Previously, as part of building the index representation, the resolveObject func would create an interim plumbing.MemoryObject, which would then be saved into storage via storage.SetEncodedObject. This meant that objects would be unnecessarily loaded into memory, to then be saved into disk. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The changes streamlines this process by: - Introducing the LazyObjectWriter interface which enables the write operation to take places directly against the filesystem-based storage. - Leverage multi-writers to process the input data once, while targeting multiple writers (e.g. hasher and storage). An additional change relates to the caching of object info children within Parser.get. The cache is now skipped when a seekable filesystem is being used. The impact of the changes can be observed when using seekable filesystem storages, especially when cloning large repositories. The stats below were captured by adapting the BenchmarkPlainClone test to clone https://github.com/torvalds/linux.git: pkg: github.com/go-git/go-git/v5 cpu: Intel(R) Core(TM) i9-10885H CPU @ 2.40GHz │ /tmp/old │ /tmp/new │ │ sec/op │ sec/op vs base │ PlainClone-16 41.68 ± 17% 48.04 ± 9% +15.27% (p=0.015 n=6) │ /tmp/old │ /tmp/new │ │ B/op │ B/op vs base │ PlainClone-16 1127.8Mi ± 7% 256.7Mi ± 50% -77.23% (p=0.002 n=6) │ /tmp/old │ /tmp/new │ │ allocs/op │ allocs/op vs base │ PlainClone-16 3.125M ± 0% 3.800M ± 0% +21.60% (p=0.002 n=6) Notice that on average the memory consumption per operation is over 75% smaller. The time per operation increased by 15%, which may actual be less on long running applications, due to the decreased GC pressure and the garbage collection costs. Signed-off-by: Paulo Gomes --- plumbing/format/packfile/parser.go | 195 ++++++++++++++++++++++++-------- plumbing/format/packfile/patch_delta.go | 102 +++++++++++++++++ 2 files changed, 247 insertions(+), 50 deletions(-) (limited to 'plumbing/format/packfile') diff --git a/plumbing/format/packfile/parser.go b/plumbing/format/packfile/parser.go index edbc0e7..62f1d13 100644 --- a/plumbing/format/packfile/parser.go +++ b/plumbing/format/packfile/parser.go @@ -3,6 +3,7 @@ package packfile import ( "bytes" "errors" + "fmt" "io" "github.com/go-git/go-git/v5/plumbing" @@ -174,13 +175,25 @@ func (p *Parser) init() error { return nil } +type objectHeaderWriter func(typ plumbing.ObjectType, sz int64) error + +type lazyObjectWriter interface { + // LazyWriter enables an object to be lazily written. + // It returns: + // - w: a writer to receive the object's content. + // - lwh: a func to write the object header. + // - err: any error from the initial writer creation process. + // + // Note that if the object header is not written BEFORE the writer + // is used, this will result in an invalid object. + LazyWriter() (w io.WriteCloser, lwh objectHeaderWriter, err error) +} + func (p *Parser) indexObjects() error { buf := sync.GetBytesBuffer() defer sync.PutBytesBuffer(buf) for i := uint32(0); i < p.count; i++ { - buf.Reset() - oh, err := p.scanner.NextObjectHeader() if err != nil { return err @@ -220,21 +233,60 @@ func (p *Parser) indexObjects() error { ota = newBaseObject(oh.Offset, oh.Length, t) } - buf.Grow(int(oh.Length)) - _, crc, err := p.scanner.NextObject(buf) + hasher := plumbing.NewHasher(oh.Type, oh.Length) + writers := []io.Writer{hasher} + var obj *plumbing.MemoryObject + + // Lazy writing is only available for non-delta objects. + if p.storage != nil && !delta { + // When a storage is set and supports lazy writing, + // use that instead of creating a memory object. + if low, ok := p.storage.(lazyObjectWriter); ok { + ow, lwh, err := low.LazyWriter() + if err != nil { + return err + } + + if err = lwh(oh.Type, oh.Length); err != nil { + return err + } + + defer ow.Close() + writers = append(writers, ow) + } else { + obj = new(plumbing.MemoryObject) + obj.SetSize(oh.Length) + obj.SetType(oh.Type) + + writers = append(writers, obj) + } + } + if delta && !p.scanner.IsSeekable { + buf.Reset() + buf.Grow(int(oh.Length)) + writers = append(writers, buf) + } + + mw := io.MultiWriter(writers...) + + _, crc, err := p.scanner.NextObject(mw) if err != nil { return err } + // Non delta objects needs to be added into the storage. This + // is only required when lazy writing is not supported. + if obj != nil { + if _, err := p.storage.SetEncodedObject(obj); err != nil { + return err + } + } + ota.Crc32 = crc ota.Length = oh.Length - data := buf.Bytes() if !delta { - sha1, err := getSHA1(ota.Type, data) - if err != nil { - return err - } + sha1 := hasher.Sum() // Move children of placeholder parent into actual parent, in case this // was a non-external delta reference. @@ -249,20 +301,8 @@ func (p *Parser) indexObjects() error { p.oiByHash[ota.SHA1] = ota } - if p.storage != nil && !delta { - obj := new(plumbing.MemoryObject) - obj.SetSize(oh.Length) - obj.SetType(oh.Type) - if _, err := obj.Write(data); err != nil { - return err - } - - if _, err := p.storage.SetEncodedObject(obj); err != nil { - return err - } - } - if delta && !p.scanner.IsSeekable { + data := buf.Bytes() p.deltas[oh.Offset] = make([]byte, len(data)) copy(p.deltas[oh.Offset], data) } @@ -280,23 +320,29 @@ func (p *Parser) resolveDeltas() error { for _, obj := range p.oi { buf.Reset() + buf.Grow(int(obj.Length)) err := p.get(obj, buf) if err != nil { return err } - content := buf.Bytes() if err := p.onInflatedObjectHeader(obj.Type, obj.Length, obj.Offset); err != nil { return err } - if err := p.onInflatedObjectContent(obj.SHA1, obj.Offset, obj.Crc32, content); err != nil { + if err := p.onInflatedObjectContent(obj.SHA1, obj.Offset, obj.Crc32, nil); err != nil { return err } if !obj.IsDelta() && len(obj.Children) > 0 { + // Dealing with an io.ReaderAt object, means we can + // create it once and reuse across all children. + r := bytes.NewReader(buf.Bytes()) for _, child := range obj.Children { - if err := p.resolveObject(io.Discard, child, content); err != nil { + // Even though we are discarding the output, we still need to read it to + // so that the scanner can advance to the next object, and the SHA1 can be + // calculated. + if err := p.resolveObject(io.Discard, child, r); err != nil { return err } p.resolveExternalRef(child) @@ -361,13 +407,13 @@ func (p *Parser) get(o *objectInfo, buf *bytes.Buffer) (err error) { if o.DiskType.IsDelta() { b := sync.GetBytesBuffer() defer sync.PutBytesBuffer(b) + buf.Grow(int(o.Length)) err := p.get(o.Parent, b) if err != nil { return err } - base := b.Bytes() - err = p.resolveObject(buf, o, base) + err = p.resolveObject(buf, o, bytes.NewReader(b.Bytes())) if err != nil { return err } @@ -378,6 +424,13 @@ func (p *Parser) get(o *objectInfo, buf *bytes.Buffer) (err error) { } } + // If the scanner is seekable, caching this data into + // memory by offset seems wasteful. + // There is a trade-off to be considered here in terms + // of execution time vs memory consumption. + // + // TODO: improve seekable execution time, so that we can + // skip this cache. if len(o.Children) > 0 { data := make([]byte, buf.Len()) copy(data, buf.Bytes()) @@ -386,10 +439,25 @@ func (p *Parser) get(o *objectInfo, buf *bytes.Buffer) (err error) { return nil } +// resolveObject resolves an object from base, using information +// provided by o. +// +// This call has the side-effect of changing field values +// from the object info o: +// - Type: OFSDeltaObject may become the target type (e.g. Blob). +// - Size: The size may be update with the target size. +// - Hash: Zero hashes will be calculated as part of the object +// resolution. Hence why this process can't be avoided even when w +// is an io.Discard. +// +// base must be an io.ReaderAt, which is a requirement from +// patchDeltaStream. The main reason being that reversing an +// delta object may lead to going backs and forths within base, +// which is not supported by io.Reader. func (p *Parser) resolveObject( w io.Writer, o *objectInfo, - base []byte, + base io.ReaderAt, ) error { if !o.DiskType.IsDelta() { return nil @@ -400,26 +468,46 @@ func (p *Parser) resolveObject( if err != nil { return err } - data := buf.Bytes() - data, err = applyPatchBase(o, data, base) + writers := []io.Writer{w} + var obj *plumbing.MemoryObject + var lwh objectHeaderWriter + + if p.storage != nil { + if low, ok := p.storage.(lazyObjectWriter); ok { + ow, wh, err := low.LazyWriter() + if err != nil { + return err + } + lwh = wh + + defer ow.Close() + writers = append(writers, ow) + } else { + obj = new(plumbing.MemoryObject) + ow, err := obj.Writer() + if err != nil { + return err + } + + writers = append(writers, ow) + } + } + + mw := io.MultiWriter(writers...) + + err = applyPatchBase(o, base, buf, mw, lwh) if err != nil { return err } - if p.storage != nil { - obj := new(plumbing.MemoryObject) - obj.SetSize(o.Size()) + if obj != nil { obj.SetType(o.Type) - if _, err := obj.Write(data); err != nil { - return err - } - + obj.SetSize(o.Size()) // Size here is correct as it was populated by applyPatchBase. if _, err := p.storage.SetEncodedObject(obj); err != nil { return err } } - _, err = w.Write(data) return err } @@ -443,24 +531,31 @@ func (p *Parser) readData(w io.Writer, o *objectInfo) error { return nil } -func applyPatchBase(ota *objectInfo, data, base []byte) ([]byte, error) { - patched, err := PatchDelta(base, data) - if err != nil { - return nil, err +// applyPatchBase applies the patch to target. +// +// Note that ota will be updated based on the description in resolveObject. +func applyPatchBase(ota *objectInfo, base io.ReaderAt, delta io.Reader, target io.Writer, wh objectHeaderWriter) error { + if target == nil { + return fmt.Errorf("cannot apply patch against nil target") } + typ := ota.Type if ota.SHA1 == plumbing.ZeroHash { - ota.Type = ota.Parent.Type - sha1, err := getSHA1(ota.Type, patched) - if err != nil { - return nil, err - } + typ = ota.Parent.Type + } + + sz, h, err := patchDeltaWriter(target, base, delta, typ, wh) + if err != nil { + return err + } - ota.SHA1 = sha1 - ota.Length = int64(len(patched)) + if ota.SHA1 == plumbing.ZeroHash { + ota.Type = typ + ota.Length = int64(sz) + ota.SHA1 = h } - return patched, nil + return nil } func getSHA1(t plumbing.ObjectType, data []byte) (plumbing.Hash, error) { diff --git a/plumbing/format/packfile/patch_delta.go b/plumbing/format/packfile/patch_delta.go index f00562d..67c20ff 100644 --- a/plumbing/format/packfile/patch_delta.go +++ b/plumbing/format/packfile/patch_delta.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "errors" + "fmt" "io" "math" @@ -265,6 +266,107 @@ func patchDelta(dst *bytes.Buffer, src, delta []byte) error { return nil } +func patchDeltaWriter(dst io.Writer, base io.ReaderAt, delta io.Reader, + typ plumbing.ObjectType, writeHeader objectHeaderWriter) (uint, plumbing.Hash, error) { + deltaBuf := bufio.NewReaderSize(delta, 1024) + srcSz, err := decodeLEB128ByteReader(deltaBuf) + if err != nil { + if err == io.EOF { + return 0, plumbing.ZeroHash, ErrInvalidDelta + } + return 0, plumbing.ZeroHash, err + } + + if r, ok := base.(*bytes.Reader); ok && srcSz != uint(r.Size()) { + return 0, plumbing.ZeroHash, ErrInvalidDelta + } + + targetSz, err := decodeLEB128ByteReader(deltaBuf) + if err != nil { + if err == io.EOF { + return 0, plumbing.ZeroHash, ErrInvalidDelta + } + return 0, plumbing.ZeroHash, err + } + + // If header still needs to be written, caller will provide + // a LazyObjectWriterHeader. This seems to be the case when + // dealing with thin-packs. + if writeHeader != nil { + err = writeHeader(typ, int64(targetSz)) + if err != nil { + return 0, plumbing.ZeroHash, fmt.Errorf("could not lazy write header: %w", err) + } + } + + remainingTargetSz := targetSz + + hasher := plumbing.NewHasher(typ, int64(targetSz)) + mw := io.MultiWriter(dst, hasher) + + bufp := sync.GetByteSlice() + defer sync.PutByteSlice(bufp) + + sr := io.NewSectionReader(base, int64(0), int64(srcSz)) + // Keep both the io.LimitedReader types, so we can reset N. + baselr := io.LimitReader(sr, 0).(*io.LimitedReader) + deltalr := io.LimitReader(deltaBuf, 0).(*io.LimitedReader) + + for { + buf := *bufp + cmd, err := deltaBuf.ReadByte() + if err == io.EOF { + return 0, plumbing.ZeroHash, ErrInvalidDelta + } + if err != nil { + return 0, plumbing.ZeroHash, err + } + + if isCopyFromSrc(cmd) { + offset, err := decodeOffsetByteReader(cmd, deltaBuf) + if err != nil { + return 0, plumbing.ZeroHash, err + } + sz, err := decodeSizeByteReader(cmd, deltaBuf) + if err != nil { + return 0, plumbing.ZeroHash, err + } + + if invalidSize(sz, targetSz) || + invalidOffsetSize(offset, sz, srcSz) { + return 0, plumbing.ZeroHash, err + } + + if _, err := sr.Seek(int64(offset), io.SeekStart); err != nil { + return 0, plumbing.ZeroHash, err + } + baselr.N = int64(sz) + if _, err := io.CopyBuffer(mw, baselr, buf); err != nil { + return 0, plumbing.ZeroHash, err + } + remainingTargetSz -= sz + } else if isCopyFromDelta(cmd) { + sz := uint(cmd) // cmd is the size itself + if invalidSize(sz, targetSz) { + return 0, plumbing.ZeroHash, ErrInvalidDelta + } + deltalr.N = int64(sz) + if _, err := io.CopyBuffer(mw, deltalr, buf); err != nil { + return 0, plumbing.ZeroHash, err + } + + remainingTargetSz -= sz + } else { + return 0, plumbing.ZeroHash, err + } + if remainingTargetSz <= 0 { + break + } + } + + return targetSz, hasher.Sum(), nil +} + // Decodes a number encoded as an unsigned LEB128 at the start of some // binary data and returns the decoded number and the rest of the // stream. -- cgit