aboutsummaryrefslogtreecommitdiffstats
path: root/plumbing
diff options
context:
space:
mode:
authorArran Walker <arran.walker@fiveturns.org>2019-04-21 01:22:07 +0000
committerArran Walker <arran.walker@fiveturns.org>2019-04-22 21:34:09 +0000
commit9d4279fd355c1777e0884075f9e64576ddea1a76 (patch)
treebecbde840864314759bcfa3229496ccf8e0c71e4 /plumbing
parente5268e9c3c94f60e3c2008dc2ab4762c75352bfc (diff)
downloadgo-git-9d4279fd355c1777e0884075f9e64576ddea1a76.tar.gz
plumbing: packfile/scanner, readability/performance improvements, zlib pooling
Signed-off-by: Arran Walker <arran.walker@fiveturns.org>
Diffstat (limited to 'plumbing')
-rw-r--r--plumbing/format/packfile/common.go10
-rw-r--r--plumbing/format/packfile/scanner.go189
-rw-r--r--plumbing/format/packfile/scanner_test.go49
3 files changed, 143 insertions, 105 deletions
diff --git a/plumbing/format/packfile/common.go b/plumbing/format/packfile/common.go
index 0d9ed54..f82c1ab 100644
--- a/plumbing/format/packfile/common.go
+++ b/plumbing/format/packfile/common.go
@@ -2,6 +2,7 @@ package packfile
import (
"bytes"
+ "compress/zlib"
"io"
"sync"
@@ -66,3 +67,12 @@ var bufPool = sync.Pool{
return bytes.NewBuffer(nil)
},
}
+
+var zlibInitBytes = []byte{0x78, 0x9c, 0x01, 0x00, 0x00, 0xff, 0xff, 0x00, 0x00, 0x00, 0x01}
+
+var zlibReaderPool = sync.Pool{
+ New: func() interface{} {
+ r, _ := zlib.NewReader(bytes.NewReader(zlibInitBytes))
+ return r
+ },
+}
diff --git a/plumbing/format/packfile/scanner.go b/plumbing/format/packfile/scanner.go
index 614b0d1..7b44192 100644
--- a/plumbing/format/packfile/scanner.go
+++ b/plumbing/format/packfile/scanner.go
@@ -39,8 +39,7 @@ type ObjectHeader struct {
}
type Scanner struct {
- r reader
- zr readerResetter
+ r *scannerReader
crc hash.Hash32
// pendingObject is used to detect if an object has been read, or still
@@ -56,19 +55,27 @@ type Scanner struct {
// NewScanner returns a new Scanner based on a reader, if the given reader
// implements io.ReadSeeker the Scanner will be also Seekable
func NewScanner(r io.Reader) *Scanner {
- seeker, ok := r.(io.ReadSeeker)
- if !ok {
- seeker = &trackableReader{Reader: r}
- }
+ _, ok := r.(io.ReadSeeker)
crc := crc32.NewIEEE()
return &Scanner{
- r: newTeeReader(newByteReadSeeker(seeker), crc),
+ r: newScannerReader(r, crc),
crc: crc,
IsSeekable: ok,
}
}
+func (s *Scanner) Reset(r io.Reader) {
+ _, ok := r.(io.ReadSeeker)
+
+ s.r.Reset(r)
+ s.crc.Reset()
+ s.IsSeekable = ok
+ s.pendingObject = nil
+ s.version = 0
+ s.objects = 0
+}
+
// Header reads the whole packfile header (signature, version and object count).
// It returns the version and the object count and performs checks on the
// validity of the signature and the version fields.
@@ -182,8 +189,7 @@ func (s *Scanner) NextObjectHeader() (*ObjectHeader, error) {
// nextObjectHeader returns the ObjectHeader for the next object in the reader
// without the Offset field
func (s *Scanner) nextObjectHeader() (*ObjectHeader, error) {
- defer s.Flush()
-
+ s.r.Flush()
s.crc.Reset()
h := &ObjectHeader{}
@@ -304,35 +310,29 @@ func (s *Scanner) readLength(first byte) (int64, error) {
// NextObject writes the content of the next object into the reader, returns
// the number of bytes written, the CRC32 of the content and an error, if any
func (s *Scanner) NextObject(w io.Writer) (written int64, crc32 uint32, err error) {
- defer s.crc.Reset()
-
s.pendingObject = nil
written, err = s.copyObject(w)
- s.Flush()
+
+ s.r.Flush()
crc32 = s.crc.Sum32()
+ s.crc.Reset()
+
return
}
// ReadRegularObject reads and write a non-deltified object
// from it zlib stream in an object entry in the packfile.
func (s *Scanner) copyObject(w io.Writer) (n int64, err error) {
- if s.zr == nil {
- var zr io.ReadCloser
- zr, err = zlib.NewReader(s.r)
- if err != nil {
- return 0, fmt.Errorf("zlib initialization error: %s", err)
- }
+ zr := zlibReaderPool.Get().(io.ReadCloser)
+ defer zlibReaderPool.Put(zr)
- s.zr = zr.(readerResetter)
- } else {
- if err = s.zr.Reset(s.r, nil); err != nil {
- return 0, fmt.Errorf("zlib reset error: %s", err)
- }
+ if err = zr.(zlib.Resetter).Reset(s.r, nil); err != nil {
+ return 0, fmt.Errorf("zlib reset error: %s", err)
}
- defer ioutil.CheckClose(s.zr, &err)
+ defer ioutil.CheckClose(zr, &err)
buf := byteSlicePool.Get().([]byte)
- n, err = io.CopyBuffer(w, s.zr, buf)
+ n, err = io.CopyBuffer(w, zr, buf)
byteSlicePool.Put(buf)
return
}
@@ -378,110 +378,89 @@ func (s *Scanner) Close() error {
return err
}
-// Flush finishes writing the buffer to crc hasher in case we are using
-// a teeReader. Otherwise it is a no-op.
+// Flush is a no-op (deprecated)
func (s *Scanner) Flush() error {
- tee, ok := s.r.(*teeReader)
- if ok {
- return tee.Flush()
- }
return nil
}
-type trackableReader struct {
- count int64
- io.Reader
+// scannerReader has the following characteristics:
+// - Provides an io.SeekReader impl for bufio.Reader, when the underlying
+// reader supports it.
+// - Keeps track of the current read position, for when the underlying reader
+// isn't an io.SeekReader, but we still want to know the current offset.
+// - Writes to the hash writer what it reads, with the aid of a smaller buffer.
+// The buffer helps avoid a performance penality for performing small writes
+// to the crc32 hash writer.
+type scannerReader struct {
+ reader io.Reader
+ crc io.Writer
+ rbuf *bufio.Reader
+ wbuf *bufio.Writer
+ offset int64
}
-// Read reads up to len(p) bytes into p.
-func (r *trackableReader) Read(p []byte) (n int, err error) {
- n, err = r.Reader.Read(p)
- r.count += int64(n)
-
- return
-}
-
-// Seek only supports io.SeekCurrent, any other operation fails
-func (r *trackableReader) Seek(offset int64, whence int) (int64, error) {
- if whence != io.SeekCurrent {
- return -1, ErrSeekNotSupported
+func newScannerReader(r io.Reader, h io.Writer) *scannerReader {
+ sr := &scannerReader{
+ rbuf: bufio.NewReader(nil),
+ wbuf: bufio.NewWriterSize(nil, 64),
+ crc: h,
}
+ sr.Reset(r)
- return r.count, nil
+ return sr
}
-func newByteReadSeeker(r io.ReadSeeker) *bufferedSeeker {
- return &bufferedSeeker{
- r: r,
- Reader: *bufio.NewReader(r),
- }
-}
+func (r *scannerReader) Reset(reader io.Reader) {
+ r.reader = reader
+ r.rbuf.Reset(r.reader)
+ r.wbuf.Reset(r.crc)
-type bufferedSeeker struct {
- r io.ReadSeeker
- bufio.Reader
-}
-
-func (r *bufferedSeeker) Seek(offset int64, whence int) (int64, error) {
- if whence == io.SeekCurrent && offset == 0 {
- current, err := r.r.Seek(offset, whence)
- if err != nil {
- return current, err
- }
-
- return current - int64(r.Buffered()), nil
+ r.offset = 0
+ if seeker, ok := r.reader.(io.ReadSeeker); ok {
+ r.offset, _ = seeker.Seek(0, io.SeekCurrent)
}
-
- defer r.Reader.Reset(r.r)
- return r.r.Seek(offset, whence)
}
-type readerResetter interface {
- io.ReadCloser
- zlib.Resetter
-}
+func (r *scannerReader) Read(p []byte) (n int, err error) {
+ n, err = r.rbuf.Read(p)
-type reader interface {
- io.Reader
- io.ByteReader
- io.Seeker
+ r.offset += int64(n)
+ if _, err := r.wbuf.Write(p[:n]); err != nil {
+ return n, err
+ }
+ return
}
-type teeReader struct {
- reader
- w hash.Hash32
- bufWriter *bufio.Writer
+func (r *scannerReader) ReadByte() (b byte, err error) {
+ b, err = r.rbuf.ReadByte()
+ if err == nil {
+ r.offset++
+ return b, r.wbuf.WriteByte(b)
+ }
+ return
}
-func newTeeReader(r reader, h hash.Hash32) *teeReader {
- return &teeReader{
- reader: r,
- w: h,
- bufWriter: bufio.NewWriter(h),
- }
+func (r *scannerReader) Flush() error {
+ return r.wbuf.Flush()
}
-func (r *teeReader) Read(p []byte) (n int, err error) {
- r.Flush()
+// Seek seeks to a location. If the underlying reader is not an io.ReadSeeker,
+// then only whence=io.SeekCurrent is supported, any other operation fails.
+func (r *scannerReader) Seek(offset int64, whence int) (int64, error) {
+ var err error
- n, err = r.reader.Read(p)
- if n > 0 {
- if n, err := r.w.Write(p[:n]); err != nil {
- return n, err
+ if seeker, ok := r.reader.(io.ReadSeeker); !ok {
+ if whence != io.SeekCurrent || offset != 0 {
+ return -1, ErrSeekNotSupported
+ }
+ } else {
+ if whence == io.SeekCurrent && offset == 0 {
+ return r.offset, nil
}
- }
- return
-}
-func (r *teeReader) ReadByte() (b byte, err error) {
- b, err = r.reader.ReadByte()
- if err == nil {
- return b, r.bufWriter.WriteByte(b)
+ r.offset, err = seeker.Seek(offset, whence)
+ r.rbuf.Reset(r.reader)
}
- return
-}
-
-func (r *teeReader) Flush() (err error) {
- return r.bufWriter.Flush()
+ return r.offset, err
}
diff --git a/plumbing/format/packfile/scanner_test.go b/plumbing/format/packfile/scanner_test.go
index 091b457..a401d6d 100644
--- a/plumbing/format/packfile/scanner_test.go
+++ b/plumbing/format/packfile/scanner_test.go
@@ -135,6 +135,55 @@ func (s *ScannerSuite) TestSeekObjectHeaderNonSeekable(c *C) {
c.Assert(err, Equals, ErrSeekNotSupported)
}
+func (s *ScannerSuite) TestReaderReset(c *C) {
+ r := fixtures.Basic().One().Packfile()
+ p := NewScanner(r)
+
+ version, objects, err := p.Header()
+ c.Assert(version, Equals, VersionSupported)
+ c.Assert(objects, Equals, uint32(31))
+
+ h, err := p.SeekObjectHeader(expectedHeadersOFS[0].Offset)
+ c.Assert(err, IsNil)
+ c.Assert(h, DeepEquals, &expectedHeadersOFS[0])
+
+ p.Reset(r)
+ c.Assert(p.pendingObject, IsNil)
+ c.Assert(p.version, Equals, uint32(0))
+ c.Assert(p.objects, Equals, uint32(0))
+ c.Assert(p.r.reader, Equals, r)
+ c.Assert(p.r.offset > expectedHeadersOFS[0].Offset, Equals, true)
+
+ p.Reset(bytes.NewReader(nil))
+ c.Assert(p.r.offset, Equals, int64(0))
+}
+
+func (s *ScannerSuite) TestReaderResetSeeks(c *C) {
+ r := fixtures.Basic().One().Packfile()
+
+ // seekable
+ p := NewScanner(r)
+ c.Assert(p.IsSeekable, Equals, true)
+ h, err := p.SeekObjectHeader(expectedHeadersOFS[0].Offset)
+ c.Assert(err, IsNil)
+ c.Assert(h, DeepEquals, &expectedHeadersOFS[0])
+
+ // reset with seekable
+ p.Reset(r)
+ c.Assert(p.IsSeekable, Equals, true)
+ h, err = p.SeekObjectHeader(expectedHeadersOFS[1].Offset)
+ c.Assert(err, IsNil)
+ c.Assert(h, DeepEquals, &expectedHeadersOFS[1])
+
+ // reset with non-seekable
+ f := fixtures.Basic().ByTag("ref-delta").One()
+ p.Reset(io.MultiReader(f.Packfile()))
+ c.Assert(p.IsSeekable, Equals, false)
+
+ _, err = p.SeekObjectHeader(expectedHeadersOFS[4].Offset)
+ c.Assert(err, Equals, ErrSeekNotSupported)
+}
+
var expectedHeadersOFS = []ObjectHeader{
{Type: plumbing.CommitObject, Offset: 12, Length: 254},
{Type: plumbing.OFSDeltaObject, Offset: 186, Length: 93, OffsetReference: 12},