diff options
author | Miguel Molina <miguel@erizocosmi.co> | 2018-08-08 16:56:20 +0200 |
---|---|---|
committer | Miguel Molina <miguel@erizocosmi.co> | 2018-08-08 16:56:20 +0200 |
commit | 5889a3b669f0f515ff445aa040afc1e7eeb2bbd1 (patch) | |
tree | 1a7e2bbe9ba7c2ae1111120ed84fe7850a934375 /plumbing | |
parent | 6a24b4c1f0cb9e5daf30fa7979f2643a967af1ad (diff) | |
download | go-git-5889a3b669f0f515ff445aa040afc1e7eeb2bbd1.tar.gz |
plumbing: packfile, allow non-seekable sources on Parser
Signed-off-by: Miguel Molina <miguel@erizocosmi.co>
Diffstat (limited to 'plumbing')
-rw-r--r-- | plumbing/format/idxfile/writer_test.go | 5 | ||||
-rw-r--r-- | plumbing/format/packfile/common.go | 63 | ||||
-rw-r--r-- | plumbing/format/packfile/encoder_advanced_test.go | 5 | ||||
-rw-r--r-- | plumbing/format/packfile/encoder_test.go | 5 | ||||
-rw-r--r-- | plumbing/format/packfile/parser.go | 311 | ||||
-rw-r--r-- | plumbing/format/packfile/parser_test.go | 19 |
6 files changed, 229 insertions, 179 deletions
diff --git a/plumbing/format/idxfile/writer_test.go b/plumbing/format/idxfile/writer_test.go index 7c3cceb..912211d 100644 --- a/plumbing/format/idxfile/writer_test.go +++ b/plumbing/format/idxfile/writer_test.go @@ -24,9 +24,10 @@ func (s *WriterSuite) TestWriter(c *C) { scanner := packfile.NewScanner(f.Packfile()) obs := new(idxfile.Writer) - parser := packfile.NewParser(scanner, obs) + parser, err := packfile.NewParser(scanner, obs) + c.Assert(err, IsNil) - _, err := parser.Parse() + _, err = parser.Parse() c.Assert(err, IsNil) idx, err := obs.Index() diff --git a/plumbing/format/packfile/common.go b/plumbing/format/packfile/common.go index 76254f0..2b4aceb 100644 --- a/plumbing/format/packfile/common.go +++ b/plumbing/format/packfile/common.go @@ -2,11 +2,9 @@ package packfile import ( "bytes" - "errors" "io" "sync" - "gopkg.in/src-d/go-git.v4/plumbing" "gopkg.in/src-d/go-git.v4/plumbing/storer" "gopkg.in/src-d/go-git.v4/utils/ioutil" ) @@ -32,8 +30,12 @@ func UpdateObjectStorage(s storer.Storer, packfile io.Reader) error { return WritePackfileToObjectStorage(pw, packfile) } - updater := newPackfileStorageUpdater(s) - _, err := NewParser(NewScanner(packfile), updater).Parse() + p, err := NewParserWithStorage(NewScanner(packfile), s) + if err != nil { + return err + } + + _, err = p.Parse() return err } @@ -58,56 +60,3 @@ var bufPool = sync.Pool{ return bytes.NewBuffer(nil) }, } - -var errMissingObjectContent = errors.New("missing object content") - -type packfileStorageUpdater struct { - storer.Storer - lastSize int64 - lastType plumbing.ObjectType -} - -func newPackfileStorageUpdater(s storer.Storer) *packfileStorageUpdater { - return &packfileStorageUpdater{Storer: s} -} - -func (p *packfileStorageUpdater) OnHeader(count uint32) error { - return nil -} - -func (p *packfileStorageUpdater) OnInflatedObjectHeader( - t plumbing.ObjectType, - objSize int64, - pos int64, -) error { - if p.lastSize > 0 || p.lastType != plumbing.InvalidObject { - return errMissingObjectContent - } - - p.lastType = t - p.lastSize = objSize - return nil -} - -func (p *packfileStorageUpdater) OnInflatedObjectContent( - h plumbing.Hash, - pos int64, - crc uint32, - content []byte, -) error { - obj := new(plumbing.MemoryObject) - obj.SetSize(p.lastSize) - obj.SetType(p.lastType) - if _, err := obj.Write(content); err != nil { - return err - } - - _, err := p.SetEncodedObject(obj) - p.lastSize = 0 - p.lastType = plumbing.InvalidObject - return err -} - -func (p *packfileStorageUpdater) OnFooter(h plumbing.Hash) error { - return nil -} diff --git a/plumbing/format/packfile/encoder_advanced_test.go b/plumbing/format/packfile/encoder_advanced_test.go index 6ffebc2..78ddc45 100644 --- a/plumbing/format/packfile/encoder_advanced_test.go +++ b/plumbing/format/packfile/encoder_advanced_test.go @@ -94,7 +94,10 @@ func (s *EncoderAdvancedSuite) testEncodeDecode( c.Assert(err, IsNil) w := new(idxfile.Writer) - _, err = NewParser(NewScanner(f), w).Parse() + parser, err := NewParser(NewScanner(f), w) + c.Assert(err, IsNil) + + _, err = parser.Parse() c.Assert(err, IsNil) index, err := w.Index() c.Assert(err, IsNil) diff --git a/plumbing/format/packfile/encoder_test.go b/plumbing/format/packfile/encoder_test.go index 7b6dde2..24e2082 100644 --- a/plumbing/format/packfile/encoder_test.go +++ b/plumbing/format/packfile/encoder_test.go @@ -302,7 +302,10 @@ func packfileFromReader(c *C, buf *bytes.Buffer) (*Packfile, func()) { scanner := NewScanner(file) w := new(idxfile.Writer) - _, err = NewParser(scanner, w).Parse() + p, err := NewParser(scanner, w) + c.Assert(err, IsNil) + + _, err = p.Parse() c.Assert(err, IsNil) index, err := w.Index() diff --git a/plumbing/format/packfile/parser.go b/plumbing/format/packfile/parser.go index f0a7674..beb3e27 100644 --- a/plumbing/format/packfile/parser.go +++ b/plumbing/format/packfile/parser.go @@ -7,16 +7,20 @@ import ( "gopkg.in/src-d/go-git.v4/plumbing" "gopkg.in/src-d/go-git.v4/plumbing/cache" + "gopkg.in/src-d/go-git.v4/plumbing/storer" ) var ( - // ErrObjectContentAlreadyRead is returned when the content of the object - // was already read, since the content can only be read once. - ErrObjectContentAlreadyRead = errors.New("object content was already read") - // ErrReferenceDeltaNotFound is returned when the reference delta is not // found. ErrReferenceDeltaNotFound = errors.New("reference delta not found") + + // ErrNotSeekableSource is returned when the source for the parser is not + // seekable and a storage was not provided, so it can't be parsed. + ErrNotSeekableSource = errors.New("parser source is not seekable and storage was not provided") + + // ErrDeltaNotCached is returned when the delta could not be found in cache. + ErrDeltaNotCached = errors.New("delta could not be found in cache") ) // Observer interface is implemented by index encoders. @@ -34,34 +38,96 @@ type Observer interface { // Parser decodes a packfile and calls any observer associated to it. Is used // to generate indexes. type Parser struct { - scanner *Scanner - count uint32 - oi []*objectInfo - oiByHash map[plumbing.Hash]*objectInfo - oiByOffset map[int64]*objectInfo - hashOffset map[plumbing.Hash]int64 - checksum plumbing.Hash - - cache *cache.ObjectLRU - contentCache map[int64][]byte + storage storer.EncodedObjectStorer + scanner *Scanner + count uint32 + oi []*objectInfo + oiByHash map[plumbing.Hash]*objectInfo + oiByOffset map[int64]*objectInfo + hashOffset map[plumbing.Hash]int64 + pendingRefDeltas map[plumbing.Hash][]*objectInfo + checksum plumbing.Hash + + cache *cache.ObjectLRU + // delta content by offset, only used if source is not seekable + deltas map[int64][]byte ob []Observer } -// NewParser creates a new Parser struct. -func NewParser(scanner *Scanner, ob ...Observer) *Parser { - var contentCache map[int64][]byte +// NewParser creates a new Parser. The Scanner source must be seekable. +// If it's not, NewParserWithStorage should be used instead. +func NewParser(scanner *Scanner, ob ...Observer) (*Parser, error) { + return NewParserWithStorage(scanner, nil, ob...) +} + +// NewParserWithStorage creates a new Parser. The scanner source must either +// be seekable or a storage must be provided. +func NewParserWithStorage( + scanner *Scanner, + storage storer.EncodedObjectStorer, + ob ...Observer, +) (*Parser, error) { + if !scanner.IsSeekable && storage == nil { + return nil, ErrNotSeekableSource + } + + var deltas map[int64][]byte if !scanner.IsSeekable { - contentCache = make(map[int64][]byte) + deltas = make(map[int64][]byte) } return &Parser{ - scanner: scanner, - ob: ob, - count: 0, - cache: cache.NewObjectLRUDefault(), - contentCache: contentCache, + storage: storage, + scanner: scanner, + ob: ob, + count: 0, + cache: cache.NewObjectLRUDefault(), + pendingRefDeltas: make(map[plumbing.Hash][]*objectInfo), + deltas: deltas, + }, nil +} + +func (p *Parser) forEachObserver(f func(o Observer) error) error { + for _, o := range p.ob { + if err := f(o); err != nil { + return err + } } + return nil +} + +func (p *Parser) onHeader(count uint32) error { + return p.forEachObserver(func(o Observer) error { + return o.OnHeader(count) + }) +} + +func (p *Parser) onInflatedObjectHeader( + t plumbing.ObjectType, + objSize int64, + pos int64, +) error { + return p.forEachObserver(func(o Observer) error { + return o.OnInflatedObjectHeader(t, objSize, pos) + }) +} + +func (p *Parser) onInflatedObjectContent( + h plumbing.Hash, + pos int64, + crc uint32, + content []byte, +) error { + return p.forEachObserver(func(o Observer) error { + return o.OnInflatedObjectContent(h, pos, crc, content) + }) +} + +func (p *Parser) onFooter(h plumbing.Hash) error { + return p.forEachObserver(func(o Observer) error { + return o.OnFooter(h) + }) } // Parse start decoding phase of the packfile. @@ -70,7 +136,13 @@ func (p *Parser) Parse() (plumbing.Hash, error) { return plumbing.ZeroHash, err } - if err := p.firstPass(); err != nil { + if err := p.indexObjects(); err != nil { + return plumbing.ZeroHash, err + } + + var err error + p.checksum, err = p.scanner.Checksum() + if err != nil && err != io.EOF { return plumbing.ZeroHash, err } @@ -78,10 +150,12 @@ func (p *Parser) Parse() (plumbing.Hash, error) { return plumbing.ZeroHash, err } - for _, o := range p.ob { - if err := o.OnFooter(p.checksum); err != nil { - return plumbing.ZeroHash, err - } + if len(p.pendingRefDeltas) > 0 { + return plumbing.ZeroHash, ErrReferenceDeltaNotFound + } + + if err := p.onFooter(p.checksum); err != nil { + return plumbing.ZeroHash, err } return p.checksum, nil @@ -93,10 +167,8 @@ func (p *Parser) init() error { return err } - for _, o := range p.ob { - if err := o.OnHeader(c); err != nil { - return err - } + if err := p.onHeader(c); err != nil { + return err } p.count = c @@ -107,7 +179,7 @@ func (p *Parser) init() error { return nil } -func (p *Parser) firstPass() error { +func (p *Parser) indexObjects() error { buf := new(bytes.Buffer) for i := uint32(0); i < p.count; i++ { @@ -121,25 +193,30 @@ func (p *Parser) firstPass() error { delta := false var ota *objectInfo switch t := oh.Type; t { - case plumbing.OFSDeltaObject, plumbing.REFDeltaObject: + case plumbing.OFSDeltaObject: delta = true - var parent *objectInfo - var ok bool - - if t == plumbing.OFSDeltaObject { - parent, ok = p.oiByOffset[oh.OffsetReference] - } else { - parent, ok = p.oiByHash[oh.Reference] - } - + parent, ok := p.oiByOffset[oh.OffsetReference] if !ok { - return ErrReferenceDeltaNotFound + return plumbing.ErrObjectNotFound } ota = newDeltaObject(oh.Offset, oh.Length, t, parent) - parent.Children = append(parent.Children, ota) + case plumbing.REFDeltaObject: + delta = true + + parent, ok := p.oiByHash[oh.Reference] + if ok { + ota = newDeltaObject(oh.Offset, oh.Length, t, parent) + parent.Children = append(parent.Children, ota) + } else { + ota = newBaseObject(oh.Offset, oh.Length, t) + p.pendingRefDeltas[oh.Reference] = append( + p.pendingRefDeltas[oh.Reference], + ota, + ) + } default: ota = newBaseObject(oh.Offset, oh.Length, t) } @@ -153,23 +230,35 @@ func (p *Parser) firstPass() error { ota.PackSize = size ota.Length = oh.Length + data := buf.Bytes() if !delta { - if _, err := ota.Write(buf.Bytes()); err != nil { + if _, err := ota.Write(data); err != nil { return err } ota.SHA1 = ota.Sum() p.oiByHash[ota.SHA1] = ota } - p.oiByOffset[oh.Offset] = ota + if p.storage != nil && !delta { + obj := new(plumbing.MemoryObject) + obj.SetSize(oh.Length) + obj.SetType(oh.Type) + if _, err := obj.Write(data); err != nil { + return err + } - p.oi[i] = ota - } + if _, err := p.storage.SetEncodedObject(obj); err != nil { + return err + } + } - var err error - p.checksum, err = p.scanner.Checksum() - if err != nil && err != io.EOF { - return err + if delta && !p.scanner.IsSeekable { + p.deltas[oh.Offset] = make([]byte, len(data)) + copy(p.deltas[oh.Offset], data) + } + + p.oiByOffset[oh.Offset] = ota + p.oi[i] = ota } return nil @@ -177,21 +266,17 @@ func (p *Parser) firstPass() error { func (p *Parser) resolveDeltas() error { for _, obj := range p.oi { - content, err := obj.Content() + content, err := p.get(obj) if err != nil { return err } - for _, o := range p.ob { - err := o.OnInflatedObjectHeader(obj.Type, obj.Length, obj.Offset) - if err != nil { - return err - } + if err := p.onInflatedObjectHeader(obj.Type, obj.Length, obj.Offset); err != nil { + return err + } - err = o.OnInflatedObjectContent(obj.SHA1, obj.Offset, obj.Crc32, content) - if err != nil { - return err - } + if err := p.onInflatedObjectContent(obj.SHA1, obj.Offset, obj.Crc32, content); err != nil { + return err } if !obj.IsDelta() && len(obj.Children) > 0 { @@ -206,6 +291,11 @@ func (p *Parser) resolveDeltas() error { return err } } + + // Remove the delta from the cache. + if obj.DiskType.IsDelta() && !p.scanner.IsSeekable { + delete(p.deltas, obj.Offset) + } } } @@ -214,7 +304,17 @@ func (p *Parser) resolveDeltas() error { func (p *Parser) get(o *objectInfo) ([]byte, error) { e, ok := p.cache.Get(o.SHA1) - if ok { + // If it's not on the cache and is not a delta we can try to find it in the + // storage, if there's one. + if !ok && p.storage != nil && !o.Type.IsDelta() { + var err error + e, err = p.storage.EncodedObject(plumbing.AnyObject, o.SHA1) + if err != nil { + return nil, err + } + } + + if e != nil { r, err := e.Reader() if err != nil { return nil, err @@ -228,32 +328,23 @@ func (p *Parser) get(o *objectInfo) ([]byte, error) { return buf, nil } - // Read from disk + var data []byte if o.DiskType.IsDelta() { base, err := p.get(o.Parent) if err != nil { return nil, err } - data, err := p.resolveObject(o, base) + data, err = p.resolveObject(o, base) if err != nil { return nil, err } - - if len(o.Children) > 0 { - m := &plumbing.MemoryObject{} - m.Write(data) - m.SetType(o.Type) - m.SetSize(o.Size()) - p.cache.Put(m) + } else { + var err error + data, err = p.readData(o) + if err != nil { + return nil, err } - - return data, nil - } - - data, err := p.readData(o) - if err != nil { - return nil, err } if len(o.Children) > 0 { @@ -285,11 +376,39 @@ func (p *Parser) resolveObject( return nil, err } + if pending, ok := p.pendingRefDeltas[o.SHA1]; ok { + for _, po := range pending { + po.Parent = o + o.Children = append(o.Children, po) + } + delete(p.pendingRefDeltas, o.SHA1) + } + + if p.storage != nil { + obj := new(plumbing.MemoryObject) + obj.SetSize(o.Size()) + obj.SetType(o.Type) + if _, err := obj.Write(data); err != nil { + return nil, err + } + + if _, err := p.storage.SetEncodedObject(obj); err != nil { + return nil, err + } + } + return data, nil } func (p *Parser) readData(o *objectInfo) ([]byte, error) { - buf := new(bytes.Buffer) + if !p.scanner.IsSeekable && o.DiskType.IsDelta() { + data, ok := p.deltas[o.Offset] + if !ok { + return nil, ErrDeltaNotCached + } + + return data, nil + } // TODO: skip header. Header size can be calculated with the offset of the // next offset in the first pass. @@ -301,8 +420,7 @@ func (p *Parser) readData(o *objectInfo) ([]byte, error) { return nil, err } - buf.Reset() - + buf := new(bytes.Buffer) if _, _, err := p.scanner.NextObject(buf); err != nil { return nil, err } @@ -322,6 +440,7 @@ func applyPatchBase(ota *objectInfo, data, base []byte) ([]byte, error) { return nil, err } ota.SHA1 = ota.Sum() + ota.Length = int64(len(patched)) return patched, nil } @@ -341,8 +460,6 @@ type objectInfo struct { Parent *objectInfo Children []*objectInfo SHA1 plumbing.Hash - - content *bytes.Buffer } func newBaseObject(offset, length int64, t plumbing.ObjectType) *objectInfo { @@ -371,30 +488,6 @@ func newDeltaObject( return obj } -func (o *objectInfo) Write(bs []byte) (int, error) { - n, err := o.Hasher.Write(bs) - if err != nil { - return 0, err - } - - o.content = bytes.NewBuffer(nil) - - _, _ = o.content.Write(bs) - return n, nil -} - -// Content returns the content of the object. This operation can only be done -// once. -func (o *objectInfo) Content() ([]byte, error) { - if o.content == nil { - return nil, ErrObjectContentAlreadyRead - } - - r := o.content - o.content = nil - return r.Bytes(), nil -} - func (o *objectInfo) IsDelta() bool { return o.Type.IsDelta() } diff --git a/plumbing/format/packfile/parser_test.go b/plumbing/format/packfile/parser_test.go index b18f20f..7bce737 100644 --- a/plumbing/format/packfile/parser_test.go +++ b/plumbing/format/packfile/parser_test.go @@ -19,7 +19,8 @@ func (s *ParserSuite) TestParserHashes(c *C) { scanner := packfile.NewScanner(f.Packfile()) obs := new(testObserver) - parser := packfile.NewParser(scanner, obs) + parser, err := packfile.NewParser(scanner, obs) + c.Assert(err, IsNil) ch, err := parser.Parse() c.Assert(err, IsNil) @@ -36,7 +37,7 @@ func (s *ParserSuite) TestParserHashes(c *C) { objs := []observerObject{ {"e8d3ffab552895c19b9fcf7aa264d277cde33881", commit, 254, 12, 0xaa07ba4b}, - {"6ecf0ef2c2dffb796033e5a02219af86ec6584e5", commit, 93, 186, 0xf706df58}, + {"6ecf0ef2c2dffb796033e5a02219af86ec6584e5", commit, 245, 186, 0xf706df58}, {"918c48b83bd081e863dbe1b80f8998f058cd8294", commit, 242, 286, 0x12438846}, {"af2d6a6954d532f8ffb47615169c8fdf9d383a1a", commit, 242, 449, 0x2905a38c}, {"1669dce138d9b841a518c64b10914d88f5e488ea", commit, 333, 615, 0xd9429436}, @@ -54,18 +55,18 @@ func (s *ParserSuite) TestParserHashes(c *C) { {"9a48f23120e880dfbe41f7c9b7b708e9ee62a492", blob, 11488, 80998, 0x7316ff70}, {"9dea2395f5403188298c1dabe8bdafe562c491e3", blob, 78, 84032, 0xdb4fce56}, {"dbd3641b371024f44d0e469a9c8f5457b0660de1", tree, 272, 84115, 0x901cce2c}, - {"a8d315b2b1c615d43042c3a62402b8a54288cf5c", tree, 43, 84375, 0xec4552b0}, + {"a8d315b2b1c615d43042c3a62402b8a54288cf5c", tree, 271, 84375, 0xec4552b0}, {"a39771a7651f97faf5c72e08224d857fc35133db", tree, 38, 84430, 0x847905bf}, {"5a877e6a906a2743ad6e45d99c1793642aaf8eda", tree, 75, 84479, 0x3689459a}, {"586af567d0bb5e771e49bdd9434f5e0fb76d25fa", tree, 38, 84559, 0xe67af94a}, {"cf4aa3b38974fb7d81f367c0830f7d78d65ab86b", tree, 34, 84608, 0xc2314a2e}, {"7e59600739c96546163833214c36459e324bad0a", blob, 9, 84653, 0xcd987848}, - {"fb72698cab7617ac416264415f13224dfd7a165e", tree, 6, 84671, 0x8a853a6d}, - {"4d081c50e250fa32ea8b1313cf8bb7c2ad7627fd", tree, 9, 84688, 0x70c6518}, - {"eba74343e2f15d62adedfd8c883ee0262b5c8021", tree, 6, 84708, 0x4f4108e2}, - {"c2d30fa8ef288618f65f6eed6e168e0d514886f4", tree, 5, 84725, 0xd6fe09e9}, - {"8dcef98b1d52143e1e2dbc458ffe38f925786bf2", tree, 8, 84741, 0xf07a2804}, - {"aa9b383c260e1d05fbbf6b30a02914555e20c725", tree, 4, 84760, 0x1d75d6be}, + {"fb72698cab7617ac416264415f13224dfd7a165e", tree, 238, 84671, 0x8a853a6d}, + {"4d081c50e250fa32ea8b1313cf8bb7c2ad7627fd", tree, 179, 84688, 0x70c6518}, + {"eba74343e2f15d62adedfd8c883ee0262b5c8021", tree, 148, 84708, 0x4f4108e2}, + {"c2d30fa8ef288618f65f6eed6e168e0d514886f4", tree, 110, 84725, 0xd6fe09e9}, + {"8dcef98b1d52143e1e2dbc458ffe38f925786bf2", tree, 111, 84741, 0xf07a2804}, + {"aa9b383c260e1d05fbbf6b30a02914555e20c725", tree, 73, 84760, 0x1d75d6be}, } c.Assert(obs.objects, DeepEquals, objs) |