diff options
author | Máximo Cuadros <mcuadros@gmail.com> | 2016-09-25 23:58:59 +0200 |
---|---|---|
committer | Máximo Cuadros <mcuadros@gmail.com> | 2016-09-25 23:58:59 +0200 |
commit | b9c0a09435392913c0054382500c805cd7cb596b (patch) | |
tree | d5a4bebff33b02215b25515ab769f277c0c07bb9 /storage/filesystem/internal | |
parent | 859775d320d574979c63a114de1437e3c5d9114c (diff) | |
download | go-git-b9c0a09435392913c0054382500c805cd7cb596b.tar.gz |
formats: objfile idomatic reader/writer
Diffstat (limited to 'storage/filesystem/internal')
-rw-r--r-- | storage/filesystem/internal/dotgit/dotgit.go | 348 | ||||
-rw-r--r-- | storage/filesystem/internal/dotgit/dotgit_test.go | 138 | ||||
-rw-r--r-- | storage/filesystem/internal/dotgit/refs.go | 149 | ||||
-rw-r--r-- | storage/filesystem/internal/dotgit/writers.go | 263 | ||||
-rw-r--r-- | storage/filesystem/internal/dotgit/writers_test.go | 89 |
5 files changed, 549 insertions, 438 deletions
diff --git a/storage/filesystem/internal/dotgit/dotgit.go b/storage/filesystem/internal/dotgit/dotgit.go index c4392a2..ba293af 100644 --- a/storage/filesystem/internal/dotgit/dotgit.go +++ b/storage/filesystem/internal/dotgit/dotgit.go @@ -2,18 +2,14 @@ package dotgit import ( - "crypto/sha1" + "bufio" "errors" "fmt" - "io" + "io/ioutil" "os" "strings" - "sync/atomic" - "time" "gopkg.in/src-d/go-git.v4/core" - "gopkg.in/src-d/go-git.v4/formats/idxfile" - "gopkg.in/src-d/go-git.v4/formats/packfile" "gopkg.in/src-d/go-git.v4/utils/fs" ) @@ -24,6 +20,7 @@ const ( objectsPath = "objects" packPath = "pack" + refsPath = "refs" packExt = ".pack" idxExt = ".idx" @@ -38,6 +35,16 @@ var ( ErrPackfileNotFound = errors.New("packfile not found") // ErrConfigNotFound is returned by Config when the config is not found ErrConfigNotFound = errors.New("config file not found") + // ErrPackedRefsDuplicatedRef is returned when a duplicated reference is + // found in the packed-ref file. This is usually the case for corrupted git + // repositories. + ErrPackedRefsDuplicatedRef = errors.New("duplicated ref found in packed-ref file") + // ErrPackedRefsBadFormat is returned when the packed-ref file corrupt. + ErrPackedRefsBadFormat = errors.New("malformed packed-ref") + // ErrSymRefTargetNotFound is returned when a symbolic reference is + // targeting a non-existing object. This usually means the repository + // is corrupt. + ErrSymRefTargetNotFound = errors.New("symbolic reference target not found") ) // The DotGit type represents a local git repository on disk. This @@ -62,45 +69,6 @@ func (d *DotGit) Config() (fs.File, error) { return d.fs.Open(configPath) } -func (d *DotGit) SetRef(r *core.Reference) error { - var content string - switch r.Type() { - case core.SymbolicReference: - content = fmt.Sprintf("ref: %s\n", r.Target()) - case core.HashReference: - content = fmt.Sprintln(r.Hash().String()) - } - - f, err := d.fs.Create(r.Name().String()) - if err != nil { - return err - } - - if _, err := f.Write([]byte(content)); err != nil { - return err - } - return f.Close() -} - -// Refs scans the git directory collecting references, which it returns. -// Symbolic references are resolved and included in the output. -func (d *DotGit) Refs() ([]*core.Reference, error) { - var refs []*core.Reference - if err := d.addRefsFromPackedRefs(&refs); err != nil { - return nil, err - } - - if err := d.addRefsFromRefDir(&refs); err != nil { - return nil, err - } - - if err := d.addRefFromHEAD(&refs); err != nil { - return nil, err - } - - return refs, nil -} - // NewObjectPack return a writer for a new packfile, it saves the packfile to // disk and also generates and save the index for the given packfile. func (d *DotGit) NewObjectPack() (*PackWriter, error) { @@ -165,6 +133,11 @@ func (d *DotGit) ObjectPackIdx(hash core.Hash) (fs.File, error) { return idx, nil } +// NewObject return a writer for a new object file. +func (d *DotGit) NewObject() (*ObjectWriter, error) { + return newObjectWriter(d.fs) +} + // Objects returns a slice with the hashes of objects found under the // .git/objects/ directory. func (d *DotGit) Objects() ([]core.Hash, error) { @@ -203,232 +176,185 @@ func (d *DotGit) Object(h core.Hash) (fs.File, error) { return d.fs.Open(file) } -func isHex(s string) bool { - for _, b := range []byte(s) { - if isNum(b) { - continue - } - if isHexAlpha(b) { - continue - } - - return false +func (d *DotGit) SetRef(r *core.Reference) error { + var content string + switch r.Type() { + case core.SymbolicReference: + content = fmt.Sprintf("ref: %s\n", r.Target()) + case core.HashReference: + content = fmt.Sprintln(r.Hash().String()) } - return true -} - -func isNum(b byte) bool { - return b >= '0' && b <= '9' -} - -func isHexAlpha(b byte) bool { - return b >= 'a' && b <= 'f' || b >= 'A' && b <= 'F' -} - -type PackWriter struct { - Notify func(h core.Hash, i idxfile.Idxfile) + f, err := d.fs.Create(r.Name().String()) + if err != nil { + return err + } - fs fs.Filesystem - fr, fw fs.File - synced *syncedReader - checksum core.Hash - index idxfile.Idxfile - result chan error + if _, err := f.Write([]byte(content)); err != nil { + return err + } + return f.Close() } -func newPackWrite(fs fs.Filesystem) (*PackWriter, error) { - seed := sha1.Sum([]byte(time.Now().String())) - tmp := fs.Join(objectsPath, packPath, fmt.Sprintf("tmp_pack_%x", seed)) - - fw, err := fs.Create(tmp) - if err != nil { +// Refs scans the git directory collecting references, which it returns. +// Symbolic references are resolved and included in the output. +func (d *DotGit) Refs() ([]*core.Reference, error) { + var refs []*core.Reference + if err := d.addRefsFromPackedRefs(&refs); err != nil { return nil, err } - fr, err := fs.Open(tmp) - if err != nil { + if err := d.addRefsFromRefDir(&refs); err != nil { return nil, err } - writer := &PackWriter{ - fs: fs, - fw: fw, - fr: fr, - synced: newSyncedReader(fw, fr), - result: make(chan error), + if err := d.addRefFromHEAD(&refs); err != nil { + return nil, err } - go writer.buildIndex() - return writer, nil + return refs, nil } -func (w *PackWriter) buildIndex() { - s := packfile.NewScanner(w.synced) - d, err := packfile.NewDecoder(s, nil) +func (d *DotGit) addRefsFromPackedRefs(refs *[]*core.Reference) (err error) { + f, err := d.fs.Open(packedRefsPath) if err != nil { - w.result <- err - return + if os.IsNotExist(err) { + return nil + } + return err } - checksum, err := d.Decode() - if err != nil { - w.result <- err - return - } + defer func() { + if errClose := f.Close(); err == nil { + err = errClose + } + }() - w.checksum = checksum - w.index.PackfileChecksum = checksum - w.index.Version = idxfile.VersionSupported + s := bufio.NewScanner(f) + for s.Scan() { + ref, err := d.processLine(s.Text()) + if err != nil { + return err + } - offsets := d.Offsets() - for h, crc := range d.CRCs() { - w.index.Add(h, uint64(offsets[h]), crc) + if ref != nil { + *refs = append(*refs, ref) + } } - w.result <- err + return s.Err() } -func (w *PackWriter) Write(p []byte) (n int, err error) { - return w.synced.Write(p) +// process lines from a packed-refs file +func (d *DotGit) processLine(line string) (*core.Reference, error) { + switch line[0] { + case '#': // comment - ignore + return nil, nil + case '^': // annotated tag commit of the previous line - ignore + return nil, nil + default: + ws := strings.Split(line, " ") // hash then ref + if len(ws) != 2 { + return nil, ErrPackedRefsBadFormat + } + + return core.NewReferenceFromStrings(ws[1], ws[0]), nil + } } -func (w *PackWriter) Close() error { - defer func() { - close(w.result) - }() +func (d *DotGit) addRefsFromRefDir(refs *[]*core.Reference) error { + return d.walkReferencesTree(refs, refsPath) +} + +func (d *DotGit) walkReferencesTree(refs *[]*core.Reference, relPath string) error { + files, err := d.fs.ReadDir(relPath) + if err != nil { + if os.IsNotExist(err) { + return nil + } - pipe := []func() error{ - w.synced.Close, - func() error { return <-w.result }, - w.fr.Close, - w.fw.Close, - w.save, + return err } - for _, f := range pipe { - if err := f(); err != nil { + for _, f := range files { + newRelPath := d.fs.Join(relPath, f.Name()) + if f.IsDir() { + if err = d.walkReferencesTree(refs, newRelPath); err != nil { + return err + } + + continue + } + + ref, err := d.readReferenceFile(".", newRelPath) + if err != nil { return err } - } - if w.Notify != nil { - w.Notify(w.checksum, w.index) + if ref != nil { + *refs = append(*refs, ref) + } } return nil } -func (w *PackWriter) save() error { - base := w.fs.Join(objectsPath, packPath, fmt.Sprintf("pack-%s", w.checksum)) - idx, err := w.fs.Create(fmt.Sprintf("%s.idx", base)) +func (d *DotGit) addRefFromHEAD(refs *[]*core.Reference) error { + ref, err := d.readReferenceFile(".", "HEAD") if err != nil { - return err - } - - if err := w.encodeIdx(idx); err != nil { - return err - } + if os.IsNotExist(err) { + return nil + } - if err := idx.Close(); err != nil { return err } - return w.fs.Rename(w.fw.Filename(), fmt.Sprintf("%s.pack", base)) -} - -func (w *PackWriter) encodeIdx(writer io.Writer) error { - e := idxfile.NewEncoder(writer) - _, err := e.Encode(&w.index) - return err + *refs = append(*refs, ref) + return nil } -type syncedReader struct { - w io.Writer - r io.ReadSeeker +func (d *DotGit) readReferenceFile(refsPath, refFile string) (ref *core.Reference, err error) { + path := d.fs.Join(refsPath, refFile) - blocked, done uint32 - written, read uint64 - news chan bool -} - -func newSyncedReader(w io.Writer, r io.ReadSeeker) *syncedReader { - return &syncedReader{ - w: w, - r: r, - news: make(chan bool), + f, err := d.fs.Open(path) + if err != nil { + return nil, err } -} -func (s *syncedReader) Write(p []byte) (n int, err error) { defer func() { - written := atomic.AddUint64(&s.written, uint64(n)) - read := atomic.LoadUint64(&s.read) - if written > read { - s.wake() + if errClose := f.Close(); err == nil { + err = errClose } }() - n, err = s.w.Write(p) - return -} - -func (s *syncedReader) Read(p []byte) (n int, err error) { - defer func() { atomic.AddUint64(&s.read, uint64(n)) }() - - s.sleep() - n, err = s.r.Read(p) - if err == io.EOF && !s.isDone() { - if n == 0 { - return s.Read(p) - } - - return n, nil + b, err := ioutil.ReadAll(f) + if err != nil { + return nil, err } - return -} - -func (s *syncedReader) isDone() bool { - return atomic.LoadUint32(&s.done) == 1 -} - -func (s *syncedReader) isBlocked() bool { - return atomic.LoadUint32(&s.blocked) == 1 + line := strings.TrimSpace(string(b)) + return core.NewReferenceFromStrings(refFile, line), nil } -func (s *syncedReader) wake() { - if s.isBlocked() { - // fmt.Println("wake") - atomic.StoreUint32(&s.blocked, 0) - s.news <- true - } -} +func isHex(s string) bool { + for _, b := range []byte(s) { + if isNum(b) { + continue + } + if isHexAlpha(b) { + continue + } -func (s *syncedReader) sleep() { - read := atomic.LoadUint64(&s.read) - written := atomic.LoadUint64(&s.written) - if read >= written { - atomic.StoreUint32(&s.blocked, 1) - // fmt.Println("sleep", read, written) - <-s.news + return false } + return true } -func (s *syncedReader) Seek(offset int64, whence int) (int64, error) { - if whence == io.SeekCurrent { - return s.r.Seek(offset, whence) - } - - p, err := s.r.Seek(offset, whence) - s.read = uint64(p) - - return p, err +func isNum(b byte) bool { + return b >= '0' && b <= '9' } -func (s *syncedReader) Close() error { - atomic.StoreUint32(&s.done, 1) - close(s.news) - return nil +func isHexAlpha(b byte) bool { + return b >= 'a' && b <= 'f' || b >= 'A' && b <= 'F' } diff --git a/storage/filesystem/internal/dotgit/dotgit_test.go b/storage/filesystem/internal/dotgit/dotgit_test.go index f105c58..ebd8596 100644 --- a/storage/filesystem/internal/dotgit/dotgit_test.go +++ b/storage/filesystem/internal/dotgit/dotgit_test.go @@ -1,13 +1,9 @@ package dotgit import ( - "fmt" - "io" "io/ioutil" - "log" "os" "path/filepath" - "strconv" "strings" "testing" @@ -26,6 +22,41 @@ type SuiteDotGit struct { var _ = Suite(&SuiteDotGit{}) +func (s *SuiteDotGit) TestSetRefs(c *C) { + tmp, err := ioutil.TempDir("", "dot-git") + c.Assert(err, IsNil) + defer os.RemoveAll(tmp) + + fs := fs.NewOS(tmp) + dir := New(fs) + + err = dir.SetRef(core.NewReferenceFromStrings( + "refs/heads/foo", + "e8d3ffab552895c19b9fcf7aa264d277cde33881", + )) + + c.Assert(err, IsNil) + + err = dir.SetRef(core.NewReferenceFromStrings( + "refs/heads/symbolic", + "ref: refs/heads/foo", + )) + + c.Assert(err, IsNil) + + refs, err := dir.Refs() + c.Assert(err, IsNil) + c.Assert(refs, HasLen, 2) + + ref := findReference(refs, "refs/heads/foo") + c.Assert(ref, NotNil) + c.Assert(ref.Hash().String(), Equals, "e8d3ffab552895c19b9fcf7aa264d277cde33881") + + ref = findReference(refs, "refs/heads/symbolic") + c.Assert(ref, NotNil) + c.Assert(ref.Target().String(), Equals, "refs/heads/foo") +} + func (s *SuiteDotGit) TestRefsFromPackedRefs(c *C) { fs := fixtures.Basic().ByTag(".git").One().DotGit() dir := New(fs) @@ -128,6 +159,31 @@ func (s *SuiteDotGit) TestObjectPackNotFound(c *C) { c.Assert(idx, IsNil) } +func (s *SuiteDotGit) TestNewObject(c *C) { + tmp, err := ioutil.TempDir("", "dot-git") + c.Assert(err, IsNil) + defer os.RemoveAll(tmp) + + fs := fs.NewOS(tmp) + dir := New(fs) + w, err := dir.NewObject() + c.Assert(err, IsNil) + + err = w.WriteHeader(core.BlobObject, 14) + n, err := w.Write([]byte("this is a test")) + c.Assert(err, IsNil) + c.Assert(n, Equals, 14) + + c.Assert(w.Hash().String(), Equals, "a8a940627d132695a9769df883f85992f0ff4a43") + + err = w.Close() + c.Assert(err, IsNil) + + i, err := fs.Stat("objects/a8/a940627d132695a9769df883f85992f0ff4a43") + c.Assert(err, IsNil) + c.Assert(i.Size(), Equals, int64(34)) +} + func (s *SuiteDotGit) TestObjects(c *C) { fs := fixtures.ByTag(".git").ByTag("unpacked").One().DotGit() dir := New(fs) @@ -162,77 +218,3 @@ func (s *SuiteDotGit) TestObjectNotFound(c *C) { c.Assert(err, NotNil) c.Assert(file, IsNil) } - -func (s *SuiteDotGit) TestNewObjectPack(c *C) { - f := fixtures.Basic().One() - - dir, err := ioutil.TempDir("", "example") - if err != nil { - log.Fatal(err) - } - - defer os.RemoveAll(dir) - - fs := fs.NewOS(dir) - dot := New(fs) - - w, err := dot.NewObjectPack() - c.Assert(err, IsNil) - - _, err = io.Copy(w, f.Packfile()) - c.Assert(err, IsNil) - - c.Assert(w.Close(), IsNil) - - stat, err := fs.Stat(fmt.Sprintf("objects/pack/pack-%s.pack", f.PackfileHash)) - c.Assert(err, IsNil) - c.Assert(stat.Size(), Equals, int64(84794)) - - stat, err = fs.Stat(fmt.Sprintf("objects/pack/pack-%s.idx", f.PackfileHash)) - c.Assert(err, IsNil) - c.Assert(stat.Size(), Equals, int64(1940)) -} - -func (s *SuiteDotGit) TestSyncedReader(c *C) { - tmpw, err := ioutil.TempFile("", "example") - c.Assert(err, IsNil) - - tmpr, err := os.Open(tmpw.Name()) - c.Assert(err, IsNil) - - defer func() { - tmpw.Close() - tmpr.Close() - os.Remove(tmpw.Name()) - }() - - synced := newSyncedReader(tmpw, tmpr) - - go func() { - for i := 0; i < 281; i++ { - _, err := synced.Write([]byte(strconv.Itoa(i) + "\n")) - c.Assert(err, IsNil) - } - - synced.Close() - }() - - o, err := synced.Seek(1002, io.SeekStart) - c.Assert(err, IsNil) - c.Assert(o, Equals, int64(1002)) - - head := make([]byte, 3) - n, err := io.ReadFull(synced, head) - c.Assert(err, IsNil) - c.Assert(n, Equals, 3) - c.Assert(string(head), Equals, "278") - - o, err = synced.Seek(1010, io.SeekStart) - c.Assert(err, IsNil) - c.Assert(o, Equals, int64(1010)) - - n, err = io.ReadFull(synced, head) - c.Assert(err, IsNil) - c.Assert(n, Equals, 3) - c.Assert(string(head), Equals, "280") -} diff --git a/storage/filesystem/internal/dotgit/refs.go b/storage/filesystem/internal/dotgit/refs.go deleted file mode 100644 index 8f28332..0000000 --- a/storage/filesystem/internal/dotgit/refs.go +++ /dev/null @@ -1,149 +0,0 @@ -package dotgit - -import ( - "bufio" - "errors" - "io/ioutil" - "os" - "strings" - - "gopkg.in/src-d/go-git.v4/core" -) - -var ( - // ErrPackedRefsDuplicatedRef is returned when a duplicated - // reference is found in the packed-ref file. This is usually the - // case for corrupted git repositories. - ErrPackedRefsDuplicatedRef = errors.New("duplicated ref found in packed-ref file") - // ErrPackedRefsBadFormat is returned when the packed-ref file - // corrupt. - ErrPackedRefsBadFormat = errors.New("malformed packed-ref") - // ErrSymRefTargetNotFound is returned when a symbolic reference is - // targeting a non-existing object. This usually means the - // repository is corrupt. - ErrSymRefTargetNotFound = errors.New("symbolic reference target not found") -) - -const ( - refsPath = "refs" -) - -func (d *DotGit) addRefsFromPackedRefs(refs *[]*core.Reference) (err error) { - f, err := d.fs.Open(packedRefsPath) - if err != nil { - if os.IsNotExist(err) { - return nil - } - return err - } - - defer func() { - if errClose := f.Close(); err == nil { - err = errClose - } - }() - s := bufio.NewScanner(f) - for s.Scan() { - ref, err := d.processLine(s.Text()) - if err != nil { - return err - } - - if ref != nil { - *refs = append(*refs, ref) - } - } - - return s.Err() -} - -// process lines from a packed-refs file -func (d *DotGit) processLine(line string) (*core.Reference, error) { - switch line[0] { - case '#': // comment - ignore - return nil, nil - case '^': // annotated tag commit of the previous line - ignore - return nil, nil - default: - ws := strings.Split(line, " ") // hash then ref - if len(ws) != 2 { - return nil, ErrPackedRefsBadFormat - } - - return core.NewReferenceFromStrings(ws[1], ws[0]), nil - } -} - -func (d *DotGit) addRefsFromRefDir(refs *[]*core.Reference) error { - return d.walkReferencesTree(refs, refsPath) -} - -func (d *DotGit) walkReferencesTree(refs *[]*core.Reference, relPath string) error { - files, err := d.fs.ReadDir(relPath) - if err != nil { - if os.IsNotExist(err) { - return nil - } - - return err - } - - for _, f := range files { - newRelPath := d.fs.Join(relPath, f.Name()) - if f.IsDir() { - if err = d.walkReferencesTree(refs, newRelPath); err != nil { - return err - } - - continue - } - - ref, err := d.readReferenceFile(".", newRelPath) - if err != nil { - return err - } - - if ref != nil { - *refs = append(*refs, ref) - } - } - - return nil -} - -func (d *DotGit) addRefFromHEAD(refs *[]*core.Reference) error { - ref, err := d.readReferenceFile(".", "HEAD") - if err != nil { - if os.IsNotExist(err) { - return nil - } - - return err - } - - *refs = append(*refs, ref) - return nil -} - -func (d *DotGit) readReferenceFile(refsPath, refFile string) (ref *core.Reference, err error) { - path := d.fs.Join(refsPath, refFile) - - f, err := d.fs.Open(path) - if err != nil { - return nil, err - } - - defer func() { - if errClose := f.Close(); err == nil { - err = errClose - } - }() - - b, err := ioutil.ReadAll(f) - if err != nil { - return nil, err - } - - line := strings.TrimSpace(string(b)) - return core.NewReferenceFromStrings(refFile, line), nil -} diff --git a/storage/filesystem/internal/dotgit/writers.go b/storage/filesystem/internal/dotgit/writers.go new file mode 100644 index 0000000..40b004f --- /dev/null +++ b/storage/filesystem/internal/dotgit/writers.go @@ -0,0 +1,263 @@ +package dotgit + +import ( + "crypto/sha1" + "fmt" + "io" + "sync/atomic" + "time" + + "gopkg.in/src-d/go-git.v4/core" + "gopkg.in/src-d/go-git.v4/formats/idxfile" + "gopkg.in/src-d/go-git.v4/formats/objfile" + "gopkg.in/src-d/go-git.v4/formats/packfile" + "gopkg.in/src-d/go-git.v4/utils/fs" +) + +type PackWriter struct { + Notify func(h core.Hash, i idxfile.Idxfile) + + fs fs.Filesystem + fr, fw fs.File + synced *syncedReader + checksum core.Hash + index idxfile.Idxfile + result chan error +} + +func newPackWrite(fs fs.Filesystem) (*PackWriter, error) { + seed := sha1.Sum([]byte(time.Now().String())) + tmp := fs.Join(objectsPath, packPath, fmt.Sprintf("tmp_pack_%x", seed)) + + fw, err := fs.Create(tmp) + if err != nil { + return nil, err + } + + fr, err := fs.Open(tmp) + if err != nil { + return nil, err + } + + writer := &PackWriter{ + fs: fs, + fw: fw, + fr: fr, + synced: newSyncedReader(fw, fr), + result: make(chan error), + } + + go writer.buildIndex() + return writer, nil +} + +func (w *PackWriter) buildIndex() { + s := packfile.NewScanner(w.synced) + d, err := packfile.NewDecoder(s, nil) + if err != nil { + w.result <- err + return + } + + checksum, err := d.Decode() + if err != nil { + w.result <- err + return + } + + w.checksum = checksum + w.index.PackfileChecksum = checksum + w.index.Version = idxfile.VersionSupported + + offsets := d.Offsets() + for h, crc := range d.CRCs() { + w.index.Add(h, uint64(offsets[h]), crc) + } + + w.result <- err +} + +func (w *PackWriter) Write(p []byte) (n int, err error) { + return w.synced.Write(p) +} + +func (w *PackWriter) Close() error { + defer func() { + close(w.result) + }() + + pipe := []func() error{ + w.synced.Close, + func() error { return <-w.result }, + w.fr.Close, + w.fw.Close, + w.save, + } + + for _, f := range pipe { + if err := f(); err != nil { + return err + } + } + + if w.Notify != nil { + w.Notify(w.checksum, w.index) + } + + return nil +} + +func (w *PackWriter) save() error { + base := w.fs.Join(objectsPath, packPath, fmt.Sprintf("pack-%s", w.checksum)) + idx, err := w.fs.Create(fmt.Sprintf("%s.idx", base)) + if err != nil { + return err + } + + if err := w.encodeIdx(idx); err != nil { + return err + } + + if err := idx.Close(); err != nil { + return err + } + + return w.fs.Rename(w.fw.Filename(), fmt.Sprintf("%s.pack", base)) +} + +func (w *PackWriter) encodeIdx(writer io.Writer) error { + e := idxfile.NewEncoder(writer) + _, err := e.Encode(&w.index) + return err +} + +type syncedReader struct { + w io.Writer + r io.ReadSeeker + + blocked, done uint32 + written, read uint64 + news chan bool +} + +func newSyncedReader(w io.Writer, r io.ReadSeeker) *syncedReader { + return &syncedReader{ + w: w, + r: r, + news: make(chan bool), + } +} + +func (s *syncedReader) Write(p []byte) (n int, err error) { + defer func() { + written := atomic.AddUint64(&s.written, uint64(n)) + read := atomic.LoadUint64(&s.read) + if written > read { + s.wake() + } + }() + + n, err = s.w.Write(p) + return +} + +func (s *syncedReader) Read(p []byte) (n int, err error) { + defer func() { atomic.AddUint64(&s.read, uint64(n)) }() + + s.sleep() + n, err = s.r.Read(p) + if err == io.EOF && !s.isDone() { + if n == 0 { + return s.Read(p) + } + + return n, nil + } + + return +} + +func (s *syncedReader) isDone() bool { + return atomic.LoadUint32(&s.done) == 1 +} + +func (s *syncedReader) isBlocked() bool { + return atomic.LoadUint32(&s.blocked) == 1 +} + +func (s *syncedReader) wake() { + if s.isBlocked() { + // fmt.Println("wake") + atomic.StoreUint32(&s.blocked, 0) + s.news <- true + } +} + +func (s *syncedReader) sleep() { + read := atomic.LoadUint64(&s.read) + written := atomic.LoadUint64(&s.written) + if read >= written { + atomic.StoreUint32(&s.blocked, 1) + // fmt.Println("sleep", read, written) + <-s.news + } + +} + +func (s *syncedReader) Seek(offset int64, whence int) (int64, error) { + if whence == io.SeekCurrent { + return s.r.Seek(offset, whence) + } + + p, err := s.r.Seek(offset, whence) + s.read = uint64(p) + + return p, err +} + +func (s *syncedReader) Close() error { + atomic.StoreUint32(&s.done, 1) + close(s.news) + return nil +} + +type ObjectWriter struct { + objfile.Writer + fs fs.Filesystem + f fs.File +} + +func newObjectWriter(fs fs.Filesystem) (*ObjectWriter, error) { + seed := sha1.Sum([]byte(time.Now().String())) + tmp := fs.Join(objectsPath, fmt.Sprintf("tmp_obj_%x", seed)) + + f, err := fs.Create(tmp) + if err != nil { + return nil, err + } + + return &ObjectWriter{ + Writer: (*objfile.NewWriter(f)), + fs: fs, + f: f, + }, nil +} + +func (w *ObjectWriter) Close() error { + if err := w.Writer.Close(); err != nil { + return err + } + + if err := w.f.Close(); err != nil { + return err + } + + return w.save() +} + +func (w *ObjectWriter) save() error { + hash := w.Hash().String() + file := w.fs.Join(objectsPath, hash[0:2], hash[2:40]) + + return w.fs.Rename(w.f.Filename(), file) +} diff --git a/storage/filesystem/internal/dotgit/writers_test.go b/storage/filesystem/internal/dotgit/writers_test.go new file mode 100644 index 0000000..ebecbb4 --- /dev/null +++ b/storage/filesystem/internal/dotgit/writers_test.go @@ -0,0 +1,89 @@ +package dotgit + +import ( + "fmt" + "io" + "io/ioutil" + "log" + "os" + "strconv" + + "gopkg.in/src-d/go-git.v4/fixtures" + "gopkg.in/src-d/go-git.v4/utils/fs" + + . "gopkg.in/check.v1" +) + +func (s *SuiteDotGit) TestNewObjectPack(c *C) { + f := fixtures.Basic().One() + + dir, err := ioutil.TempDir("", "example") + if err != nil { + log.Fatal(err) + } + + defer os.RemoveAll(dir) + + fs := fs.NewOS(dir) + dot := New(fs) + + w, err := dot.NewObjectPack() + c.Assert(err, IsNil) + + _, err = io.Copy(w, f.Packfile()) + c.Assert(err, IsNil) + + c.Assert(w.Close(), IsNil) + + stat, err := fs.Stat(fmt.Sprintf("objects/pack/pack-%s.pack", f.PackfileHash)) + c.Assert(err, IsNil) + c.Assert(stat.Size(), Equals, int64(84794)) + + stat, err = fs.Stat(fmt.Sprintf("objects/pack/pack-%s.idx", f.PackfileHash)) + c.Assert(err, IsNil) + c.Assert(stat.Size(), Equals, int64(1940)) +} + +func (s *SuiteDotGit) TestSyncedReader(c *C) { + tmpw, err := ioutil.TempFile("", "example") + c.Assert(err, IsNil) + + tmpr, err := os.Open(tmpw.Name()) + c.Assert(err, IsNil) + + defer func() { + tmpw.Close() + tmpr.Close() + os.Remove(tmpw.Name()) + }() + + synced := newSyncedReader(tmpw, tmpr) + + go func() { + for i := 0; i < 281; i++ { + _, err := synced.Write([]byte(strconv.Itoa(i) + "\n")) + c.Assert(err, IsNil) + } + + synced.Close() + }() + + o, err := synced.Seek(1002, io.SeekStart) + c.Assert(err, IsNil) + c.Assert(o, Equals, int64(1002)) + + head := make([]byte, 3) + n, err := io.ReadFull(synced, head) + c.Assert(err, IsNil) + c.Assert(n, Equals, 3) + c.Assert(string(head), Equals, "278") + + o, err = synced.Seek(1010, io.SeekStart) + c.Assert(err, IsNil) + c.Assert(o, Equals, int64(1010)) + + n, err = io.ReadFull(synced, head) + c.Assert(err, IsNil) + c.Assert(n, Equals, 3) + c.Assert(string(head), Equals, "280") +} |