diff options
Diffstat (limited to 'plumbing/format/packfile')
-rw-r--r-- | plumbing/format/packfile/common.go | 10 | ||||
-rw-r--r-- | plumbing/format/packfile/packfile.go | 182 | ||||
-rw-r--r-- | plumbing/format/packfile/scanner.go | 189 | ||||
-rw-r--r-- | plumbing/format/packfile/scanner_test.go | 49 |
4 files changed, 269 insertions, 161 deletions
diff --git a/plumbing/format/packfile/common.go b/plumbing/format/packfile/common.go index 0d9ed54..f82c1ab 100644 --- a/plumbing/format/packfile/common.go +++ b/plumbing/format/packfile/common.go @@ -2,6 +2,7 @@ package packfile import ( "bytes" + "compress/zlib" "io" "sync" @@ -66,3 +67,12 @@ var bufPool = sync.Pool{ return bytes.NewBuffer(nil) }, } + +var zlibInitBytes = []byte{0x78, 0x9c, 0x01, 0x00, 0x00, 0xff, 0xff, 0x00, 0x00, 0x00, 0x01} + +var zlibReaderPool = sync.Pool{ + New: func() interface{} { + r, _ := zlib.NewReader(bytes.NewReader(zlibInitBytes)) + return r + }, +} diff --git a/plumbing/format/packfile/packfile.go b/plumbing/format/packfile/packfile.go index c09286e..f528073 100644 --- a/plumbing/format/packfile/packfile.go +++ b/plumbing/format/packfile/packfile.go @@ -76,20 +76,18 @@ func (p *Packfile) Get(h plumbing.Hash) (plumbing.EncodedObject, error) { return nil, err } - return p.GetByOffset(offset) + return p.objectAtOffset(offset, h) } -// GetByOffset retrieves the encoded object from the packfile with the given +// GetByOffset retrieves the encoded object from the packfile at the given // offset. func (p *Packfile) GetByOffset(o int64) (plumbing.EncodedObject, error) { hash, err := p.FindHash(o) - if err == nil { - if obj, ok := p.deltaBaseCache.Get(hash); ok { - return obj, nil - } + if err != nil { + return nil, err } - return p.objectAtOffset(o) + return p.objectAtOffset(o, hash) } // GetSizeByOffset retrieves the size of the encoded object from the @@ -122,6 +120,13 @@ func (p *Packfile) nextObjectHeader() (*ObjectHeader, error) { return h, err } +func (p *Packfile) getDeltaObjectSize(buf *bytes.Buffer) int64 { + delta := buf.Bytes() + _, delta = decodeLEB128(delta) // skip src size + sz, _ := decodeLEB128(delta) + return int64(sz) +} + func (p *Packfile) getObjectSize(h *ObjectHeader) (int64, error) { switch h.Type { case plumbing.CommitObject, plumbing.TreeObject, plumbing.BlobObject, plumbing.TagObject: @@ -135,10 +140,7 @@ func (p *Packfile) getObjectSize(h *ObjectHeader) (int64, error) { return 0, err } - delta := buf.Bytes() - _, delta = decodeLEB128(delta) // skip src size - sz, _ := decodeLEB128(delta) - return int64(sz), nil + return p.getDeltaObjectSize(buf), nil default: return 0, ErrInvalidObject.AddDetails("type %q", h.Type) } @@ -176,10 +178,16 @@ func (p *Packfile) getObjectType(h *ObjectHeader) (typ plumbing.ObjectType, err err = ErrInvalidObject.AddDetails("type %q", h.Type) } + p.offsetToType[h.Offset] = typ + return } -func (p *Packfile) objectAtOffset(offset int64) (plumbing.EncodedObject, error) { +func (p *Packfile) objectAtOffset(offset int64, hash plumbing.Hash) (plumbing.EncodedObject, error) { + if obj, ok := p.cacheGet(hash); ok { + return obj, nil + } + h, err := p.objectHeaderAtOffset(offset) if err != nil { if err == io.EOF || isInvalid(err) { @@ -188,27 +196,54 @@ func (p *Packfile) objectAtOffset(offset int64) (plumbing.EncodedObject, error) return nil, err } + return p.getNextObject(h, hash) +} + +func (p *Packfile) getNextObject(h *ObjectHeader, hash plumbing.Hash) (plumbing.EncodedObject, error) { + var err error + // If we have no filesystem, we will return a MemoryObject instead // of an FSObject. if p.fs == nil { - return p.getNextObject(h) + return p.getNextMemoryObject(h) } - // If the object is not a delta and it's small enough then read it - // completely into memory now since it is already read from disk - // into buffer anyway. - if h.Length <= smallObjectThreshold && h.Type != plumbing.OFSDeltaObject && h.Type != plumbing.REFDeltaObject { - return p.getNextObject(h) - } + // If the object is small enough then read it completely into memory now since + // it is already read from disk into buffer anyway. For delta objects we want + // to perform the optimization too, but we have to be careful about applying + // small deltas on big objects. + var size int64 + if h.Length <= smallObjectThreshold { + if h.Type != plumbing.OFSDeltaObject && h.Type != plumbing.REFDeltaObject { + return p.getNextMemoryObject(h) + } - hash, err := p.FindHash(h.Offset) - if err != nil { - return nil, err - } + // For delta objects we read the delta data and apply the small object + // optimization only if the expanded version of the object still meets + // the small object threshold condition. + buf := bufPool.Get().(*bytes.Buffer) + buf.Reset() + if _, _, err := p.s.NextObject(buf); err != nil { + return nil, err + } + defer bufPool.Put(buf) - size, err := p.getObjectSize(h) - if err != nil { - return nil, err + size = p.getDeltaObjectSize(buf) + if size <= smallObjectThreshold { + var obj = new(plumbing.MemoryObject) + obj.SetSize(size) + if h.Type == plumbing.REFDeltaObject { + err = p.fillREFDeltaObjectContentWithBuffer(obj, h.Reference, buf) + } else { + err = p.fillOFSDeltaObjectContentWithBuffer(obj, h.OffsetReference, buf) + } + return obj, err + } + } else { + size, err = p.getObjectSize(h) + if err != nil { + return nil, err + } } typ, err := p.getObjectType(h) @@ -231,25 +266,14 @@ func (p *Packfile) objectAtOffset(offset int64) (plumbing.EncodedObject, error) } func (p *Packfile) getObjectContent(offset int64) (io.ReadCloser, error) { - ref, err := p.FindHash(offset) - if err == nil { - obj, ok := p.cacheGet(ref) - if ok { - reader, err := obj.Reader() - if err != nil { - return nil, err - } - - return reader, nil - } - } - h, err := p.objectHeaderAtOffset(offset) if err != nil { return nil, err } - obj, err := p.getNextObject(h) + // getObjectContent is called from FSObject, so we have to explicitly + // get memory object here to avoid recursive cycle + obj, err := p.getNextMemoryObject(h) if err != nil { return nil, err } @@ -257,7 +281,7 @@ func (p *Packfile) getObjectContent(offset int64) (io.ReadCloser, error) { return obj.Reader() } -func (p *Packfile) getNextObject(h *ObjectHeader) (plumbing.EncodedObject, error) { +func (p *Packfile) getNextMemoryObject(h *ObjectHeader) (plumbing.EncodedObject, error) { var obj = new(plumbing.MemoryObject) obj.SetSize(h.Length) obj.SetType(h.Type) @@ -278,6 +302,8 @@ func (p *Packfile) getNextObject(h *ObjectHeader) (plumbing.EncodedObject, error return nil, err } + p.offsetToType[h.Offset] = obj.Type() + return obj, nil } @@ -300,6 +326,13 @@ func (p *Packfile) fillREFDeltaObjectContent(obj plumbing.EncodedObject, ref plu if err != nil { return err } + defer bufPool.Put(buf) + + return p.fillREFDeltaObjectContentWithBuffer(obj, ref, buf) +} + +func (p *Packfile) fillREFDeltaObjectContentWithBuffer(obj plumbing.EncodedObject, ref plumbing.Hash, buf *bytes.Buffer) error { + var err error base, ok := p.cacheGet(ref) if !ok { @@ -312,30 +345,31 @@ func (p *Packfile) fillREFDeltaObjectContent(obj plumbing.EncodedObject, ref plu obj.SetType(base.Type()) err = ApplyDelta(obj, base, buf.Bytes()) p.cachePut(obj) - bufPool.Put(buf) return err } func (p *Packfile) fillOFSDeltaObjectContent(obj plumbing.EncodedObject, offset int64) error { - buf := bytes.NewBuffer(nil) + buf := bufPool.Get().(*bytes.Buffer) + buf.Reset() _, _, err := p.s.NextObject(buf) if err != nil { return err } + defer bufPool.Put(buf) - var base plumbing.EncodedObject - var ok bool + return p.fillOFSDeltaObjectContentWithBuffer(obj, offset, buf) +} + +func (p *Packfile) fillOFSDeltaObjectContentWithBuffer(obj plumbing.EncodedObject, offset int64, buf *bytes.Buffer) error { hash, err := p.FindHash(offset) - if err == nil { - base, ok = p.cacheGet(hash) + if err != nil { + return err } - if !ok { - base, err = p.GetByOffset(offset) - if err != nil { - return err - } + base, err := p.objectAtOffset(offset, hash) + if err != nil { + return err } obj.SetType(base.Type()) @@ -442,14 +476,50 @@ func (i *objectIter) Next() (plumbing.EncodedObject, error) { return nil, err } - obj, err := i.p.GetByOffset(int64(e.Offset)) + if i.typ != plumbing.AnyObject { + if typ, ok := i.p.offsetToType[int64(e.Offset)]; ok { + if typ != i.typ { + continue + } + } else if obj, ok := i.p.cacheGet(e.Hash); ok { + if obj.Type() != i.typ { + i.p.offsetToType[int64(e.Offset)] = obj.Type() + continue + } + return obj, nil + } else { + h, err := i.p.objectHeaderAtOffset(int64(e.Offset)) + if err != nil { + return nil, err + } + + if h.Type == plumbing.REFDeltaObject || h.Type == plumbing.OFSDeltaObject { + typ, err := i.p.getObjectType(h) + if err != nil { + return nil, err + } + if typ != i.typ { + i.p.offsetToType[int64(e.Offset)] = typ + continue + } + // getObjectType will seek in the file so we cannot use getNextObject safely + return i.p.objectAtOffset(int64(e.Offset), e.Hash) + } else { + if h.Type != i.typ { + i.p.offsetToType[int64(e.Offset)] = h.Type + continue + } + return i.p.getNextObject(h, e.Hash) + } + } + } + + obj, err := i.p.objectAtOffset(int64(e.Offset), e.Hash) if err != nil { return nil, err } - if i.typ == plumbing.AnyObject || obj.Type() == i.typ { - return obj, nil - } + return obj, nil } } diff --git a/plumbing/format/packfile/scanner.go b/plumbing/format/packfile/scanner.go index 614b0d1..7b44192 100644 --- a/plumbing/format/packfile/scanner.go +++ b/plumbing/format/packfile/scanner.go @@ -39,8 +39,7 @@ type ObjectHeader struct { } type Scanner struct { - r reader - zr readerResetter + r *scannerReader crc hash.Hash32 // pendingObject is used to detect if an object has been read, or still @@ -56,19 +55,27 @@ type Scanner struct { // NewScanner returns a new Scanner based on a reader, if the given reader // implements io.ReadSeeker the Scanner will be also Seekable func NewScanner(r io.Reader) *Scanner { - seeker, ok := r.(io.ReadSeeker) - if !ok { - seeker = &trackableReader{Reader: r} - } + _, ok := r.(io.ReadSeeker) crc := crc32.NewIEEE() return &Scanner{ - r: newTeeReader(newByteReadSeeker(seeker), crc), + r: newScannerReader(r, crc), crc: crc, IsSeekable: ok, } } +func (s *Scanner) Reset(r io.Reader) { + _, ok := r.(io.ReadSeeker) + + s.r.Reset(r) + s.crc.Reset() + s.IsSeekable = ok + s.pendingObject = nil + s.version = 0 + s.objects = 0 +} + // Header reads the whole packfile header (signature, version and object count). // It returns the version and the object count and performs checks on the // validity of the signature and the version fields. @@ -182,8 +189,7 @@ func (s *Scanner) NextObjectHeader() (*ObjectHeader, error) { // nextObjectHeader returns the ObjectHeader for the next object in the reader // without the Offset field func (s *Scanner) nextObjectHeader() (*ObjectHeader, error) { - defer s.Flush() - + s.r.Flush() s.crc.Reset() h := &ObjectHeader{} @@ -304,35 +310,29 @@ func (s *Scanner) readLength(first byte) (int64, error) { // NextObject writes the content of the next object into the reader, returns // the number of bytes written, the CRC32 of the content and an error, if any func (s *Scanner) NextObject(w io.Writer) (written int64, crc32 uint32, err error) { - defer s.crc.Reset() - s.pendingObject = nil written, err = s.copyObject(w) - s.Flush() + + s.r.Flush() crc32 = s.crc.Sum32() + s.crc.Reset() + return } // ReadRegularObject reads and write a non-deltified object // from it zlib stream in an object entry in the packfile. func (s *Scanner) copyObject(w io.Writer) (n int64, err error) { - if s.zr == nil { - var zr io.ReadCloser - zr, err = zlib.NewReader(s.r) - if err != nil { - return 0, fmt.Errorf("zlib initialization error: %s", err) - } + zr := zlibReaderPool.Get().(io.ReadCloser) + defer zlibReaderPool.Put(zr) - s.zr = zr.(readerResetter) - } else { - if err = s.zr.Reset(s.r, nil); err != nil { - return 0, fmt.Errorf("zlib reset error: %s", err) - } + if err = zr.(zlib.Resetter).Reset(s.r, nil); err != nil { + return 0, fmt.Errorf("zlib reset error: %s", err) } - defer ioutil.CheckClose(s.zr, &err) + defer ioutil.CheckClose(zr, &err) buf := byteSlicePool.Get().([]byte) - n, err = io.CopyBuffer(w, s.zr, buf) + n, err = io.CopyBuffer(w, zr, buf) byteSlicePool.Put(buf) return } @@ -378,110 +378,89 @@ func (s *Scanner) Close() error { return err } -// Flush finishes writing the buffer to crc hasher in case we are using -// a teeReader. Otherwise it is a no-op. +// Flush is a no-op (deprecated) func (s *Scanner) Flush() error { - tee, ok := s.r.(*teeReader) - if ok { - return tee.Flush() - } return nil } -type trackableReader struct { - count int64 - io.Reader +// scannerReader has the following characteristics: +// - Provides an io.SeekReader impl for bufio.Reader, when the underlying +// reader supports it. +// - Keeps track of the current read position, for when the underlying reader +// isn't an io.SeekReader, but we still want to know the current offset. +// - Writes to the hash writer what it reads, with the aid of a smaller buffer. +// The buffer helps avoid a performance penality for performing small writes +// to the crc32 hash writer. +type scannerReader struct { + reader io.Reader + crc io.Writer + rbuf *bufio.Reader + wbuf *bufio.Writer + offset int64 } -// Read reads up to len(p) bytes into p. -func (r *trackableReader) Read(p []byte) (n int, err error) { - n, err = r.Reader.Read(p) - r.count += int64(n) - - return -} - -// Seek only supports io.SeekCurrent, any other operation fails -func (r *trackableReader) Seek(offset int64, whence int) (int64, error) { - if whence != io.SeekCurrent { - return -1, ErrSeekNotSupported +func newScannerReader(r io.Reader, h io.Writer) *scannerReader { + sr := &scannerReader{ + rbuf: bufio.NewReader(nil), + wbuf: bufio.NewWriterSize(nil, 64), + crc: h, } + sr.Reset(r) - return r.count, nil + return sr } -func newByteReadSeeker(r io.ReadSeeker) *bufferedSeeker { - return &bufferedSeeker{ - r: r, - Reader: *bufio.NewReader(r), - } -} +func (r *scannerReader) Reset(reader io.Reader) { + r.reader = reader + r.rbuf.Reset(r.reader) + r.wbuf.Reset(r.crc) -type bufferedSeeker struct { - r io.ReadSeeker - bufio.Reader -} - -func (r *bufferedSeeker) Seek(offset int64, whence int) (int64, error) { - if whence == io.SeekCurrent && offset == 0 { - current, err := r.r.Seek(offset, whence) - if err != nil { - return current, err - } - - return current - int64(r.Buffered()), nil + r.offset = 0 + if seeker, ok := r.reader.(io.ReadSeeker); ok { + r.offset, _ = seeker.Seek(0, io.SeekCurrent) } - - defer r.Reader.Reset(r.r) - return r.r.Seek(offset, whence) } -type readerResetter interface { - io.ReadCloser - zlib.Resetter -} +func (r *scannerReader) Read(p []byte) (n int, err error) { + n, err = r.rbuf.Read(p) -type reader interface { - io.Reader - io.ByteReader - io.Seeker + r.offset += int64(n) + if _, err := r.wbuf.Write(p[:n]); err != nil { + return n, err + } + return } -type teeReader struct { - reader - w hash.Hash32 - bufWriter *bufio.Writer +func (r *scannerReader) ReadByte() (b byte, err error) { + b, err = r.rbuf.ReadByte() + if err == nil { + r.offset++ + return b, r.wbuf.WriteByte(b) + } + return } -func newTeeReader(r reader, h hash.Hash32) *teeReader { - return &teeReader{ - reader: r, - w: h, - bufWriter: bufio.NewWriter(h), - } +func (r *scannerReader) Flush() error { + return r.wbuf.Flush() } -func (r *teeReader) Read(p []byte) (n int, err error) { - r.Flush() +// Seek seeks to a location. If the underlying reader is not an io.ReadSeeker, +// then only whence=io.SeekCurrent is supported, any other operation fails. +func (r *scannerReader) Seek(offset int64, whence int) (int64, error) { + var err error - n, err = r.reader.Read(p) - if n > 0 { - if n, err := r.w.Write(p[:n]); err != nil { - return n, err + if seeker, ok := r.reader.(io.ReadSeeker); !ok { + if whence != io.SeekCurrent || offset != 0 { + return -1, ErrSeekNotSupported + } + } else { + if whence == io.SeekCurrent && offset == 0 { + return r.offset, nil } - } - return -} -func (r *teeReader) ReadByte() (b byte, err error) { - b, err = r.reader.ReadByte() - if err == nil { - return b, r.bufWriter.WriteByte(b) + r.offset, err = seeker.Seek(offset, whence) + r.rbuf.Reset(r.reader) } - return -} - -func (r *teeReader) Flush() (err error) { - return r.bufWriter.Flush() + return r.offset, err } diff --git a/plumbing/format/packfile/scanner_test.go b/plumbing/format/packfile/scanner_test.go index 091b457..a401d6d 100644 --- a/plumbing/format/packfile/scanner_test.go +++ b/plumbing/format/packfile/scanner_test.go @@ -135,6 +135,55 @@ func (s *ScannerSuite) TestSeekObjectHeaderNonSeekable(c *C) { c.Assert(err, Equals, ErrSeekNotSupported) } +func (s *ScannerSuite) TestReaderReset(c *C) { + r := fixtures.Basic().One().Packfile() + p := NewScanner(r) + + version, objects, err := p.Header() + c.Assert(version, Equals, VersionSupported) + c.Assert(objects, Equals, uint32(31)) + + h, err := p.SeekObjectHeader(expectedHeadersOFS[0].Offset) + c.Assert(err, IsNil) + c.Assert(h, DeepEquals, &expectedHeadersOFS[0]) + + p.Reset(r) + c.Assert(p.pendingObject, IsNil) + c.Assert(p.version, Equals, uint32(0)) + c.Assert(p.objects, Equals, uint32(0)) + c.Assert(p.r.reader, Equals, r) + c.Assert(p.r.offset > expectedHeadersOFS[0].Offset, Equals, true) + + p.Reset(bytes.NewReader(nil)) + c.Assert(p.r.offset, Equals, int64(0)) +} + +func (s *ScannerSuite) TestReaderResetSeeks(c *C) { + r := fixtures.Basic().One().Packfile() + + // seekable + p := NewScanner(r) + c.Assert(p.IsSeekable, Equals, true) + h, err := p.SeekObjectHeader(expectedHeadersOFS[0].Offset) + c.Assert(err, IsNil) + c.Assert(h, DeepEquals, &expectedHeadersOFS[0]) + + // reset with seekable + p.Reset(r) + c.Assert(p.IsSeekable, Equals, true) + h, err = p.SeekObjectHeader(expectedHeadersOFS[1].Offset) + c.Assert(err, IsNil) + c.Assert(h, DeepEquals, &expectedHeadersOFS[1]) + + // reset with non-seekable + f := fixtures.Basic().ByTag("ref-delta").One() + p.Reset(io.MultiReader(f.Packfile())) + c.Assert(p.IsSeekable, Equals, false) + + _, err = p.SeekObjectHeader(expectedHeadersOFS[4].Offset) + c.Assert(err, Equals, ErrSeekNotSupported) +} + var expectedHeadersOFS = []ObjectHeader{ {Type: plumbing.CommitObject, Offset: 12, Length: 254}, {Type: plumbing.OFSDeltaObject, Offset: 186, Length: 93, OffsetReference: 12}, |