From b0d807a1ae0687ef3a01d78c1dc5e55f7217268f Mon Sep 17 00:00:00 2001 From: Antonio Jesus Navarro Perez Date: Tue, 5 Jun 2018 18:33:27 +0200 Subject: dotgit: Move package outside internal. Signed-off-by: Antonio Jesus Navarro Perez --- storage/filesystem/dotgit/writers.go | 282 +++++++++++++++++++++++++++++++++++ 1 file changed, 282 insertions(+) create mode 100644 storage/filesystem/dotgit/writers.go (limited to 'storage/filesystem/dotgit/writers.go') diff --git a/storage/filesystem/dotgit/writers.go b/storage/filesystem/dotgit/writers.go new file mode 100644 index 0000000..c2b420f --- /dev/null +++ b/storage/filesystem/dotgit/writers.go @@ -0,0 +1,282 @@ +package dotgit + +import ( + "fmt" + "io" + "sync/atomic" + + "gopkg.in/src-d/go-git.v4/plumbing" + "gopkg.in/src-d/go-git.v4/plumbing/format/idxfile" + "gopkg.in/src-d/go-git.v4/plumbing/format/objfile" + "gopkg.in/src-d/go-git.v4/plumbing/format/packfile" + + "gopkg.in/src-d/go-billy.v4" +) + +// PackWriter is a io.Writer that generates the packfile index simultaneously, +// a packfile.Decoder is used with a file reader to read the file being written +// this operation is synchronized with the write operations. +// The packfile is written in a temp file, when Close is called this file +// is renamed/moved (depends on the Filesystem implementation) to the final +// location, if the PackWriter is not used, nothing is written +type PackWriter struct { + Notify func(plumbing.Hash, *packfile.Index) + + fs billy.Filesystem + fr, fw billy.File + synced *syncedReader + checksum plumbing.Hash + index *packfile.Index + result chan error +} + +func newPackWrite(fs billy.Filesystem) (*PackWriter, error) { + fw, err := fs.TempFile(fs.Join(objectsPath, packPath), "tmp_pack_") + if err != nil { + return nil, err + } + + fr, err := fs.Open(fw.Name()) + if err != nil { + return nil, err + } + + writer := &PackWriter{ + fs: fs, + fw: fw, + fr: fr, + synced: newSyncedReader(fw, fr), + result: make(chan error), + } + + go writer.buildIndex() + return writer, nil +} + +func (w *PackWriter) buildIndex() { + s := packfile.NewScanner(w.synced) + d, err := packfile.NewDecoder(s, nil) + if err != nil { + w.result <- err + return + } + + checksum, err := d.Decode() + if err != nil { + w.result <- err + return + } + + w.checksum = checksum + w.index = d.Index() + w.result <- err +} + +// waitBuildIndex waits until buildIndex function finishes, this can terminate +// with a packfile.ErrEmptyPackfile, this means that nothing was written so we +// ignore the error +func (w *PackWriter) waitBuildIndex() error { + err := <-w.result + if err == packfile.ErrEmptyPackfile { + return nil + } + + return err +} + +func (w *PackWriter) Write(p []byte) (int, error) { + return w.synced.Write(p) +} + +// Close closes all the file descriptors and save the final packfile, if nothing +// was written, the tempfiles are deleted without writing a packfile. +func (w *PackWriter) Close() error { + defer func() { + if w.Notify != nil && w.index != nil && w.index.Size() > 0 { + w.Notify(w.checksum, w.index) + } + + close(w.result) + }() + + if err := w.synced.Close(); err != nil { + return err + } + + if err := w.waitBuildIndex(); err != nil { + return err + } + + if err := w.fr.Close(); err != nil { + return err + } + + if err := w.fw.Close(); err != nil { + return err + } + + if w.index == nil || w.index.Size() == 0 { + return w.clean() + } + + return w.save() +} + +func (w *PackWriter) clean() error { + return w.fs.Remove(w.fw.Name()) +} + +func (w *PackWriter) save() error { + base := w.fs.Join(objectsPath, packPath, fmt.Sprintf("pack-%s", w.checksum)) + idx, err := w.fs.Create(fmt.Sprintf("%s.idx", base)) + if err != nil { + return err + } + + if err := w.encodeIdx(idx); err != nil { + return err + } + + if err := idx.Close(); err != nil { + return err + } + + return w.fs.Rename(w.fw.Name(), fmt.Sprintf("%s.pack", base)) +} + +func (w *PackWriter) encodeIdx(writer io.Writer) error { + idx := w.index.ToIdxFile() + idx.PackfileChecksum = w.checksum + idx.Version = idxfile.VersionSupported + e := idxfile.NewEncoder(writer) + _, err := e.Encode(idx) + return err +} + +type syncedReader struct { + w io.Writer + r io.ReadSeeker + + blocked, done uint32 + written, read uint64 + news chan bool +} + +func newSyncedReader(w io.Writer, r io.ReadSeeker) *syncedReader { + return &syncedReader{ + w: w, + r: r, + news: make(chan bool), + } +} + +func (s *syncedReader) Write(p []byte) (n int, err error) { + defer func() { + written := atomic.AddUint64(&s.written, uint64(n)) + read := atomic.LoadUint64(&s.read) + if written > read { + s.wake() + } + }() + + n, err = s.w.Write(p) + return +} + +func (s *syncedReader) Read(p []byte) (n int, err error) { + defer func() { atomic.AddUint64(&s.read, uint64(n)) }() + + for { + s.sleep() + n, err = s.r.Read(p) + if err == io.EOF && !s.isDone() && n == 0 { + continue + } + + break + } + + return +} + +func (s *syncedReader) isDone() bool { + return atomic.LoadUint32(&s.done) == 1 +} + +func (s *syncedReader) isBlocked() bool { + return atomic.LoadUint32(&s.blocked) == 1 +} + +func (s *syncedReader) wake() { + if s.isBlocked() { + // fmt.Println("wake") + atomic.StoreUint32(&s.blocked, 0) + s.news <- true + } +} + +func (s *syncedReader) sleep() { + read := atomic.LoadUint64(&s.read) + written := atomic.LoadUint64(&s.written) + if read >= written { + atomic.StoreUint32(&s.blocked, 1) + // fmt.Println("sleep", read, written) + <-s.news + } + +} + +func (s *syncedReader) Seek(offset int64, whence int) (int64, error) { + if whence == io.SeekCurrent { + return s.r.Seek(offset, whence) + } + + p, err := s.r.Seek(offset, whence) + atomic.StoreUint64(&s.read, uint64(p)) + + return p, err +} + +func (s *syncedReader) Close() error { + atomic.StoreUint32(&s.done, 1) + close(s.news) + return nil +} + +type ObjectWriter struct { + objfile.Writer + fs billy.Filesystem + f billy.File +} + +func newObjectWriter(fs billy.Filesystem) (*ObjectWriter, error) { + f, err := fs.TempFile(fs.Join(objectsPath, packPath), "tmp_obj_") + if err != nil { + return nil, err + } + + return &ObjectWriter{ + Writer: (*objfile.NewWriter(f)), + fs: fs, + f: f, + }, nil +} + +func (w *ObjectWriter) Close() error { + if err := w.Writer.Close(); err != nil { + return err + } + + if err := w.f.Close(); err != nil { + return err + } + + return w.save() +} + +func (w *ObjectWriter) save() error { + hash := w.Hash().String() + file := w.fs.Join(objectsPath, hash[0:2], hash[2:40]) + + return w.fs.Rename(w.f.Name(), file) +} -- cgit From 79f249465b24104b73c9dc220d9098cecdab4d77 Mon Sep 17 00:00:00 2001 From: Javi Fontan Date: Thu, 26 Jul 2018 13:42:51 +0200 Subject: plumbing, storage: integrate new index Now dotgit.PackWriter uses the new packfile.Parser and index. Signed-off-by: Javi Fontan --- storage/filesystem/dotgit/writers.go | 33 +++++++++++++++------------------ 1 file changed, 15 insertions(+), 18 deletions(-) (limited to 'storage/filesystem/dotgit/writers.go') diff --git a/storage/filesystem/dotgit/writers.go b/storage/filesystem/dotgit/writers.go index c2b420f..e1ede3c 100644 --- a/storage/filesystem/dotgit/writers.go +++ b/storage/filesystem/dotgit/writers.go @@ -20,13 +20,14 @@ import ( // is renamed/moved (depends on the Filesystem implementation) to the final // location, if the PackWriter is not used, nothing is written type PackWriter struct { - Notify func(plumbing.Hash, *packfile.Index) + Notify func(plumbing.Hash, *idxfile.Writer) fs billy.Filesystem fr, fw billy.File synced *syncedReader checksum plumbing.Hash - index *packfile.Index + parser *packfile.Parser + writer *idxfile.Writer result chan error } @@ -55,20 +56,16 @@ func newPackWrite(fs billy.Filesystem) (*PackWriter, error) { func (w *PackWriter) buildIndex() { s := packfile.NewScanner(w.synced) - d, err := packfile.NewDecoder(s, nil) - if err != nil { - w.result <- err - return - } + w.writer = new(idxfile.Writer) + w.parser = packfile.NewParser(s, w.writer) - checksum, err := d.Decode() + checksum, err := w.parser.Parse() if err != nil { w.result <- err return } w.checksum = checksum - w.index = d.Index() w.result <- err } @@ -92,8 +89,8 @@ func (w *PackWriter) Write(p []byte) (int, error) { // was written, the tempfiles are deleted without writing a packfile. func (w *PackWriter) Close() error { defer func() { - if w.Notify != nil && w.index != nil && w.index.Size() > 0 { - w.Notify(w.checksum, w.index) + if w.Notify != nil && w.writer != nil && w.writer.Finished() { + w.Notify(w.checksum, w.writer) } close(w.result) @@ -115,7 +112,7 @@ func (w *PackWriter) Close() error { return err } - if w.index == nil || w.index.Size() == 0 { + if w.writer == nil || !w.writer.Finished() { return w.clean() } @@ -145,11 +142,13 @@ func (w *PackWriter) save() error { } func (w *PackWriter) encodeIdx(writer io.Writer) error { - idx := w.index.ToIdxFile() - idx.PackfileChecksum = w.checksum - idx.Version = idxfile.VersionSupported + idx, err := w.writer.Index() + if err != nil { + return err + } + e := idxfile.NewEncoder(writer) - _, err := e.Encode(idx) + _, err = e.Encode(idx) return err } @@ -209,7 +208,6 @@ func (s *syncedReader) isBlocked() bool { func (s *syncedReader) wake() { if s.isBlocked() { - // fmt.Println("wake") atomic.StoreUint32(&s.blocked, 0) s.news <- true } @@ -220,7 +218,6 @@ func (s *syncedReader) sleep() { written := atomic.LoadUint64(&s.written) if read >= written { atomic.StoreUint32(&s.blocked, 1) - // fmt.Println("sleep", read, written) <-s.news } -- cgit From 5889a3b669f0f515ff445aa040afc1e7eeb2bbd1 Mon Sep 17 00:00:00 2001 From: Miguel Molina Date: Wed, 8 Aug 2018 16:56:20 +0200 Subject: plumbing: packfile, allow non-seekable sources on Parser Signed-off-by: Miguel Molina --- storage/filesystem/dotgit/writers.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'storage/filesystem/dotgit/writers.go') diff --git a/storage/filesystem/dotgit/writers.go b/storage/filesystem/dotgit/writers.go index e1ede3c..93d2d8c 100644 --- a/storage/filesystem/dotgit/writers.go +++ b/storage/filesystem/dotgit/writers.go @@ -57,7 +57,12 @@ func newPackWrite(fs billy.Filesystem) (*PackWriter, error) { func (w *PackWriter) buildIndex() { s := packfile.NewScanner(w.synced) w.writer = new(idxfile.Writer) - w.parser = packfile.NewParser(s, w.writer) + var err error + w.parser, err = packfile.NewParser(s, w.writer) + if err != nil { + w.result <- err + return + } checksum, err := w.parser.Parse() if err != nil { -- cgit