package dotgit import ( "fmt" "io" "sync/atomic" "gopkg.in/src-d/go-git.v4/plumbing" "gopkg.in/src-d/go-git.v4/plumbing/format/idxfile" "gopkg.in/src-d/go-git.v4/plumbing/format/objfile" "gopkg.in/src-d/go-git.v4/plumbing/format/packfile" "gopkg.in/src-d/go-billy.v3" ) // PackWriter is a io.Writer that generates the packfile index simultaneously, // a packfile.Decoder is used with a file reader to read the file being written // this operation is synchronized with the write operations. // The packfile is written in a temp file, when Close is called this file // is renamed/moved (depends on the Filesystem implementation) to the final // location, if the PackWriter is not used, nothing is written type PackWriter struct { Notify func(h plumbing.Hash, i idxfile.Idxfile) fs billy.Filesystem fr, fw billy.File synced *syncedReader checksum plumbing.Hash index idxfile.Idxfile result chan error } func newPackWrite(fs billy.Filesystem) (*PackWriter, error) { fw, err := fs.TempFile(fs.Join(objectsPath, packPath), "tmp_pack_") if err != nil { return nil, err } fr, err := fs.Open(fw.Name()) if err != nil { return nil, err } writer := &PackWriter{ fs: fs, fw: fw, fr: fr, synced: newSyncedReader(fw, fr), result: make(chan error), } go writer.buildIndex() return writer, nil } func (w *PackWriter) buildIndex() { s := packfile.NewScanner(w.synced) d, err := packfile.NewDecoder(s, nil) if err != nil { w.result <- err return } checksum, err := d.Decode() if err != nil { w.result <- err return } w.checksum = checksum w.index.PackfileChecksum = checksum w.index.Version = idxfile.VersionSupported offsets := d.Offsets() for h, crc := range d.CRCs() { w.index.Add(h, uint64(offsets[h]), crc) } w.result <- err } // waitBuildIndex waits until buildIndex function finishes, this can terminate // with a packfile.ErrEmptyPackfile, this means that nothing was written so we // ignore the error func (w *PackWriter) waitBuildIndex() error { err := <-w.result if err == packfile.ErrEmptyPackfile { return nil } return err } func (w *PackWriter) Write(p []byte) (int, error) { return w.synced.Write(p) } // Close closes all the file descriptors and save the final packfile, if nothing // was written, the tempfiles are deleted without writing a packfile. func (w *PackWriter) Close() error { defer func() { if w.Notify != nil { w.Notify(w.checksum, w.index) } close(w.result) }() if err := w.synced.Close(); err != nil { return err } if err := w.waitBuildIndex(); err != nil { return err } if err := w.fr.Close(); err != nil { return err } if err := w.fw.Close(); err != nil { return err } if len(w.index.Entries) == 0 { return w.clean() } return w.save() } func (w *PackWriter) clean() error { return w.fs.Remove(w.fw.Name()) } func (w *PackWriter) save() error { base := w.fs.Join(objectsPath, packPath, fmt.Sprintf("pack-%s", w.checksum)) idx, err := w.fs.Create(fmt.Sprintf("%s.idx", base)) if err != nil { return err } if err := w.encodeIdx(idx); err != nil { return err } if err := idx.Close(); err != nil { return err } return w.fs.Rename(w.fw.Name(), fmt.Sprintf("%s.pack", base)) } func (w *PackWriter) encodeIdx(writer io.Writer) error { e := idxfile.NewEncoder(writer) _, err := e.Encode(&w.index) return err } type syncedReader struct { w io.Writer r io.ReadSeeker blocked, done uint32 written, read uint64 news chan bool } func newSyncedReader(w io.Writer, r io.ReadSeeker) *syncedReader { return &syncedReader{ w: w, r: r, news: make(chan bool), } } func (s *syncedReader) Write(p []byte) (n int, err error) { defer func() { written := atomic.AddUint64(&s.written, uint64(n)) read := atomic.LoadUint64(&s.read) if written > read { s.wake() } }() n, err = s.w.Write(p) return } func (s *syncedReader) Read(p []byte) (n int, err error) { defer func() { atomic.AddUint64(&s.read, uint64(n)) }() for { s.sleep() n, err = s.r.Read(p) if err == io.EOF && !s.isDone() && n == 0 { continue } break } return } func (s *syncedReader) isDone() bool { return atomic.LoadUint32(&s.done) == 1 } func (s *syncedReader) isBlocked() bool { return atomic.LoadUint32(&s.blocked) == 1 } func (s *syncedReader) wake() { if s.isBlocked() { // fmt.Println("wake") atomic.StoreUint32(&s.blocked, 0) s.news <- true } } func (s *syncedReader) sleep() { read := atomic.LoadUint64(&s.read) written := atomic.LoadUint64(&s.written) if read >= written { atomic.StoreUint32(&s.blocked, 1) // fmt.Println("sleep", read, written) <-s.news } } func (s *syncedReader) Seek(offset int64, whence int) (int64, error) { if whence == io.SeekCurrent { return s.r.Seek(offset, whence) } p, err := s.r.Seek(offset, whence) atomic.StoreUint64(&s.read, uint64(p)) return p, err } func (s *syncedReader) Close() error { atomic.StoreUint32(&s.done, 1) close(s.news) return nil } type ObjectWriter struct { objfile.Writer fs billy.Filesystem f billy.File } func newObjectWriter(fs billy.Filesystem) (*ObjectWriter, error) { f, err := fs.TempFile(fs.Join(objectsPath, packPath), "tmp_obj_") if err != nil { return nil, err } return &ObjectWriter{ Writer: (*objfile.NewWriter(f)), fs: fs, f: f, }, nil } func (w *ObjectWriter) Close() error { if err := w.Writer.Close(); err != nil { return err } if err := w.f.Close(); err != nil { return err } return w.save() } func (w *ObjectWriter) save() error { hash := w.Hash().String() file := w.fs.Join(objectsPath, hash[0:2], hash[2:40]) return w.fs.Rename(w.f.Name(), file) }