aboutsummaryrefslogtreecommitdiffstats
path: root/storage/filesystem/internal/dotgit/writers.go
diff options
context:
space:
mode:
authorMáximo Cuadros <mcuadros@gmail.com>2016-09-25 23:58:59 +0200
committerMáximo Cuadros <mcuadros@gmail.com>2016-09-25 23:58:59 +0200
commitb9c0a09435392913c0054382500c805cd7cb596b (patch)
treed5a4bebff33b02215b25515ab769f277c0c07bb9 /storage/filesystem/internal/dotgit/writers.go
parent859775d320d574979c63a114de1437e3c5d9114c (diff)
downloadgo-git-b9c0a09435392913c0054382500c805cd7cb596b.tar.gz
formats: objfile idomatic reader/writer
Diffstat (limited to 'storage/filesystem/internal/dotgit/writers.go')
-rw-r--r--storage/filesystem/internal/dotgit/writers.go263
1 files changed, 263 insertions, 0 deletions
diff --git a/storage/filesystem/internal/dotgit/writers.go b/storage/filesystem/internal/dotgit/writers.go
new file mode 100644
index 0000000..40b004f
--- /dev/null
+++ b/storage/filesystem/internal/dotgit/writers.go
@@ -0,0 +1,263 @@
+package dotgit
+
+import (
+ "crypto/sha1"
+ "fmt"
+ "io"
+ "sync/atomic"
+ "time"
+
+ "gopkg.in/src-d/go-git.v4/core"
+ "gopkg.in/src-d/go-git.v4/formats/idxfile"
+ "gopkg.in/src-d/go-git.v4/formats/objfile"
+ "gopkg.in/src-d/go-git.v4/formats/packfile"
+ "gopkg.in/src-d/go-git.v4/utils/fs"
+)
+
+type PackWriter struct {
+ Notify func(h core.Hash, i idxfile.Idxfile)
+
+ fs fs.Filesystem
+ fr, fw fs.File
+ synced *syncedReader
+ checksum core.Hash
+ index idxfile.Idxfile
+ result chan error
+}
+
+func newPackWrite(fs fs.Filesystem) (*PackWriter, error) {
+ seed := sha1.Sum([]byte(time.Now().String()))
+ tmp := fs.Join(objectsPath, packPath, fmt.Sprintf("tmp_pack_%x", seed))
+
+ fw, err := fs.Create(tmp)
+ if err != nil {
+ return nil, err
+ }
+
+ fr, err := fs.Open(tmp)
+ if err != nil {
+ return nil, err
+ }
+
+ writer := &PackWriter{
+ fs: fs,
+ fw: fw,
+ fr: fr,
+ synced: newSyncedReader(fw, fr),
+ result: make(chan error),
+ }
+
+ go writer.buildIndex()
+ return writer, nil
+}
+
+func (w *PackWriter) buildIndex() {
+ s := packfile.NewScanner(w.synced)
+ d, err := packfile.NewDecoder(s, nil)
+ if err != nil {
+ w.result <- err
+ return
+ }
+
+ checksum, err := d.Decode()
+ if err != nil {
+ w.result <- err
+ return
+ }
+
+ w.checksum = checksum
+ w.index.PackfileChecksum = checksum
+ w.index.Version = idxfile.VersionSupported
+
+ offsets := d.Offsets()
+ for h, crc := range d.CRCs() {
+ w.index.Add(h, uint64(offsets[h]), crc)
+ }
+
+ w.result <- err
+}
+
+func (w *PackWriter) Write(p []byte) (n int, err error) {
+ return w.synced.Write(p)
+}
+
+func (w *PackWriter) Close() error {
+ defer func() {
+ close(w.result)
+ }()
+
+ pipe := []func() error{
+ w.synced.Close,
+ func() error { return <-w.result },
+ w.fr.Close,
+ w.fw.Close,
+ w.save,
+ }
+
+ for _, f := range pipe {
+ if err := f(); err != nil {
+ return err
+ }
+ }
+
+ if w.Notify != nil {
+ w.Notify(w.checksum, w.index)
+ }
+
+ return nil
+}
+
+func (w *PackWriter) save() error {
+ base := w.fs.Join(objectsPath, packPath, fmt.Sprintf("pack-%s", w.checksum))
+ idx, err := w.fs.Create(fmt.Sprintf("%s.idx", base))
+ if err != nil {
+ return err
+ }
+
+ if err := w.encodeIdx(idx); err != nil {
+ return err
+ }
+
+ if err := idx.Close(); err != nil {
+ return err
+ }
+
+ return w.fs.Rename(w.fw.Filename(), fmt.Sprintf("%s.pack", base))
+}
+
+func (w *PackWriter) encodeIdx(writer io.Writer) error {
+ e := idxfile.NewEncoder(writer)
+ _, err := e.Encode(&w.index)
+ return err
+}
+
+type syncedReader struct {
+ w io.Writer
+ r io.ReadSeeker
+
+ blocked, done uint32
+ written, read uint64
+ news chan bool
+}
+
+func newSyncedReader(w io.Writer, r io.ReadSeeker) *syncedReader {
+ return &syncedReader{
+ w: w,
+ r: r,
+ news: make(chan bool),
+ }
+}
+
+func (s *syncedReader) Write(p []byte) (n int, err error) {
+ defer func() {
+ written := atomic.AddUint64(&s.written, uint64(n))
+ read := atomic.LoadUint64(&s.read)
+ if written > read {
+ s.wake()
+ }
+ }()
+
+ n, err = s.w.Write(p)
+ return
+}
+
+func (s *syncedReader) Read(p []byte) (n int, err error) {
+ defer func() { atomic.AddUint64(&s.read, uint64(n)) }()
+
+ s.sleep()
+ n, err = s.r.Read(p)
+ if err == io.EOF && !s.isDone() {
+ if n == 0 {
+ return s.Read(p)
+ }
+
+ return n, nil
+ }
+
+ return
+}
+
+func (s *syncedReader) isDone() bool {
+ return atomic.LoadUint32(&s.done) == 1
+}
+
+func (s *syncedReader) isBlocked() bool {
+ return atomic.LoadUint32(&s.blocked) == 1
+}
+
+func (s *syncedReader) wake() {
+ if s.isBlocked() {
+ // fmt.Println("wake")
+ atomic.StoreUint32(&s.blocked, 0)
+ s.news <- true
+ }
+}
+
+func (s *syncedReader) sleep() {
+ read := atomic.LoadUint64(&s.read)
+ written := atomic.LoadUint64(&s.written)
+ if read >= written {
+ atomic.StoreUint32(&s.blocked, 1)
+ // fmt.Println("sleep", read, written)
+ <-s.news
+ }
+
+}
+
+func (s *syncedReader) Seek(offset int64, whence int) (int64, error) {
+ if whence == io.SeekCurrent {
+ return s.r.Seek(offset, whence)
+ }
+
+ p, err := s.r.Seek(offset, whence)
+ s.read = uint64(p)
+
+ return p, err
+}
+
+func (s *syncedReader) Close() error {
+ atomic.StoreUint32(&s.done, 1)
+ close(s.news)
+ return nil
+}
+
+type ObjectWriter struct {
+ objfile.Writer
+ fs fs.Filesystem
+ f fs.File
+}
+
+func newObjectWriter(fs fs.Filesystem) (*ObjectWriter, error) {
+ seed := sha1.Sum([]byte(time.Now().String()))
+ tmp := fs.Join(objectsPath, fmt.Sprintf("tmp_obj_%x", seed))
+
+ f, err := fs.Create(tmp)
+ if err != nil {
+ return nil, err
+ }
+
+ return &ObjectWriter{
+ Writer: (*objfile.NewWriter(f)),
+ fs: fs,
+ f: f,
+ }, nil
+}
+
+func (w *ObjectWriter) Close() error {
+ if err := w.Writer.Close(); err != nil {
+ return err
+ }
+
+ if err := w.f.Close(); err != nil {
+ return err
+ }
+
+ return w.save()
+}
+
+func (w *ObjectWriter) save() error {
+ hash := w.Hash().String()
+ file := w.fs.Join(objectsPath, hash[0:2], hash[2:40])
+
+ return w.fs.Rename(w.f.Filename(), file)
+}