aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorMáximo Cuadros <mcuadros@gmail.com>2016-09-25 23:58:59 +0200
committerMáximo Cuadros <mcuadros@gmail.com>2016-09-25 23:58:59 +0200
commitb9c0a09435392913c0054382500c805cd7cb596b (patch)
treed5a4bebff33b02215b25515ab769f277c0c07bb9 /storage
parent859775d320d574979c63a114de1437e3c5d9114c (diff)
downloadgo-git-b9c0a09435392913c0054382500c805cd7cb596b.tar.gz
formats: objfile idomatic reader/writer
Diffstat (limited to 'storage')
-rw-r--r--storage/filesystem/internal/dotgit/dotgit.go348
-rw-r--r--storage/filesystem/internal/dotgit/dotgit_test.go138
-rw-r--r--storage/filesystem/internal/dotgit/refs.go149
-rw-r--r--storage/filesystem/internal/dotgit/writers.go263
-rw-r--r--storage/filesystem/internal/dotgit/writers_test.go89
-rw-r--r--storage/filesystem/object.go44
6 files changed, 572 insertions, 459 deletions
diff --git a/storage/filesystem/internal/dotgit/dotgit.go b/storage/filesystem/internal/dotgit/dotgit.go
index c4392a2..ba293af 100644
--- a/storage/filesystem/internal/dotgit/dotgit.go
+++ b/storage/filesystem/internal/dotgit/dotgit.go
@@ -2,18 +2,14 @@
package dotgit
import (
- "crypto/sha1"
+ "bufio"
"errors"
"fmt"
- "io"
+ "io/ioutil"
"os"
"strings"
- "sync/atomic"
- "time"
"gopkg.in/src-d/go-git.v4/core"
- "gopkg.in/src-d/go-git.v4/formats/idxfile"
- "gopkg.in/src-d/go-git.v4/formats/packfile"
"gopkg.in/src-d/go-git.v4/utils/fs"
)
@@ -24,6 +20,7 @@ const (
objectsPath = "objects"
packPath = "pack"
+ refsPath = "refs"
packExt = ".pack"
idxExt = ".idx"
@@ -38,6 +35,16 @@ var (
ErrPackfileNotFound = errors.New("packfile not found")
// ErrConfigNotFound is returned by Config when the config is not found
ErrConfigNotFound = errors.New("config file not found")
+ // ErrPackedRefsDuplicatedRef is returned when a duplicated reference is
+ // found in the packed-ref file. This is usually the case for corrupted git
+ // repositories.
+ ErrPackedRefsDuplicatedRef = errors.New("duplicated ref found in packed-ref file")
+ // ErrPackedRefsBadFormat is returned when the packed-ref file corrupt.
+ ErrPackedRefsBadFormat = errors.New("malformed packed-ref")
+ // ErrSymRefTargetNotFound is returned when a symbolic reference is
+ // targeting a non-existing object. This usually means the repository
+ // is corrupt.
+ ErrSymRefTargetNotFound = errors.New("symbolic reference target not found")
)
// The DotGit type represents a local git repository on disk. This
@@ -62,45 +69,6 @@ func (d *DotGit) Config() (fs.File, error) {
return d.fs.Open(configPath)
}
-func (d *DotGit) SetRef(r *core.Reference) error {
- var content string
- switch r.Type() {
- case core.SymbolicReference:
- content = fmt.Sprintf("ref: %s\n", r.Target())
- case core.HashReference:
- content = fmt.Sprintln(r.Hash().String())
- }
-
- f, err := d.fs.Create(r.Name().String())
- if err != nil {
- return err
- }
-
- if _, err := f.Write([]byte(content)); err != nil {
- return err
- }
- return f.Close()
-}
-
-// Refs scans the git directory collecting references, which it returns.
-// Symbolic references are resolved and included in the output.
-func (d *DotGit) Refs() ([]*core.Reference, error) {
- var refs []*core.Reference
- if err := d.addRefsFromPackedRefs(&refs); err != nil {
- return nil, err
- }
-
- if err := d.addRefsFromRefDir(&refs); err != nil {
- return nil, err
- }
-
- if err := d.addRefFromHEAD(&refs); err != nil {
- return nil, err
- }
-
- return refs, nil
-}
-
// NewObjectPack return a writer for a new packfile, it saves the packfile to
// disk and also generates and save the index for the given packfile.
func (d *DotGit) NewObjectPack() (*PackWriter, error) {
@@ -165,6 +133,11 @@ func (d *DotGit) ObjectPackIdx(hash core.Hash) (fs.File, error) {
return idx, nil
}
+// NewObject return a writer for a new object file.
+func (d *DotGit) NewObject() (*ObjectWriter, error) {
+ return newObjectWriter(d.fs)
+}
+
// Objects returns a slice with the hashes of objects found under the
// .git/objects/ directory.
func (d *DotGit) Objects() ([]core.Hash, error) {
@@ -203,232 +176,185 @@ func (d *DotGit) Object(h core.Hash) (fs.File, error) {
return d.fs.Open(file)
}
-func isHex(s string) bool {
- for _, b := range []byte(s) {
- if isNum(b) {
- continue
- }
- if isHexAlpha(b) {
- continue
- }
-
- return false
+func (d *DotGit) SetRef(r *core.Reference) error {
+ var content string
+ switch r.Type() {
+ case core.SymbolicReference:
+ content = fmt.Sprintf("ref: %s\n", r.Target())
+ case core.HashReference:
+ content = fmt.Sprintln(r.Hash().String())
}
- return true
-}
-
-func isNum(b byte) bool {
- return b >= '0' && b <= '9'
-}
-
-func isHexAlpha(b byte) bool {
- return b >= 'a' && b <= 'f' || b >= 'A' && b <= 'F'
-}
-
-type PackWriter struct {
- Notify func(h core.Hash, i idxfile.Idxfile)
+ f, err := d.fs.Create(r.Name().String())
+ if err != nil {
+ return err
+ }
- fs fs.Filesystem
- fr, fw fs.File
- synced *syncedReader
- checksum core.Hash
- index idxfile.Idxfile
- result chan error
+ if _, err := f.Write([]byte(content)); err != nil {
+ return err
+ }
+ return f.Close()
}
-func newPackWrite(fs fs.Filesystem) (*PackWriter, error) {
- seed := sha1.Sum([]byte(time.Now().String()))
- tmp := fs.Join(objectsPath, packPath, fmt.Sprintf("tmp_pack_%x", seed))
-
- fw, err := fs.Create(tmp)
- if err != nil {
+// Refs scans the git directory collecting references, which it returns.
+// Symbolic references are resolved and included in the output.
+func (d *DotGit) Refs() ([]*core.Reference, error) {
+ var refs []*core.Reference
+ if err := d.addRefsFromPackedRefs(&refs); err != nil {
return nil, err
}
- fr, err := fs.Open(tmp)
- if err != nil {
+ if err := d.addRefsFromRefDir(&refs); err != nil {
return nil, err
}
- writer := &PackWriter{
- fs: fs,
- fw: fw,
- fr: fr,
- synced: newSyncedReader(fw, fr),
- result: make(chan error),
+ if err := d.addRefFromHEAD(&refs); err != nil {
+ return nil, err
}
- go writer.buildIndex()
- return writer, nil
+ return refs, nil
}
-func (w *PackWriter) buildIndex() {
- s := packfile.NewScanner(w.synced)
- d, err := packfile.NewDecoder(s, nil)
+func (d *DotGit) addRefsFromPackedRefs(refs *[]*core.Reference) (err error) {
+ f, err := d.fs.Open(packedRefsPath)
if err != nil {
- w.result <- err
- return
+ if os.IsNotExist(err) {
+ return nil
+ }
+ return err
}
- checksum, err := d.Decode()
- if err != nil {
- w.result <- err
- return
- }
+ defer func() {
+ if errClose := f.Close(); err == nil {
+ err = errClose
+ }
+ }()
- w.checksum = checksum
- w.index.PackfileChecksum = checksum
- w.index.Version = idxfile.VersionSupported
+ s := bufio.NewScanner(f)
+ for s.Scan() {
+ ref, err := d.processLine(s.Text())
+ if err != nil {
+ return err
+ }
- offsets := d.Offsets()
- for h, crc := range d.CRCs() {
- w.index.Add(h, uint64(offsets[h]), crc)
+ if ref != nil {
+ *refs = append(*refs, ref)
+ }
}
- w.result <- err
+ return s.Err()
}
-func (w *PackWriter) Write(p []byte) (n int, err error) {
- return w.synced.Write(p)
+// process lines from a packed-refs file
+func (d *DotGit) processLine(line string) (*core.Reference, error) {
+ switch line[0] {
+ case '#': // comment - ignore
+ return nil, nil
+ case '^': // annotated tag commit of the previous line - ignore
+ return nil, nil
+ default:
+ ws := strings.Split(line, " ") // hash then ref
+ if len(ws) != 2 {
+ return nil, ErrPackedRefsBadFormat
+ }
+
+ return core.NewReferenceFromStrings(ws[1], ws[0]), nil
+ }
}
-func (w *PackWriter) Close() error {
- defer func() {
- close(w.result)
- }()
+func (d *DotGit) addRefsFromRefDir(refs *[]*core.Reference) error {
+ return d.walkReferencesTree(refs, refsPath)
+}
+
+func (d *DotGit) walkReferencesTree(refs *[]*core.Reference, relPath string) error {
+ files, err := d.fs.ReadDir(relPath)
+ if err != nil {
+ if os.IsNotExist(err) {
+ return nil
+ }
- pipe := []func() error{
- w.synced.Close,
- func() error { return <-w.result },
- w.fr.Close,
- w.fw.Close,
- w.save,
+ return err
}
- for _, f := range pipe {
- if err := f(); err != nil {
+ for _, f := range files {
+ newRelPath := d.fs.Join(relPath, f.Name())
+ if f.IsDir() {
+ if err = d.walkReferencesTree(refs, newRelPath); err != nil {
+ return err
+ }
+
+ continue
+ }
+
+ ref, err := d.readReferenceFile(".", newRelPath)
+ if err != nil {
return err
}
- }
- if w.Notify != nil {
- w.Notify(w.checksum, w.index)
+ if ref != nil {
+ *refs = append(*refs, ref)
+ }
}
return nil
}
-func (w *PackWriter) save() error {
- base := w.fs.Join(objectsPath, packPath, fmt.Sprintf("pack-%s", w.checksum))
- idx, err := w.fs.Create(fmt.Sprintf("%s.idx", base))
+func (d *DotGit) addRefFromHEAD(refs *[]*core.Reference) error {
+ ref, err := d.readReferenceFile(".", "HEAD")
if err != nil {
- return err
- }
-
- if err := w.encodeIdx(idx); err != nil {
- return err
- }
+ if os.IsNotExist(err) {
+ return nil
+ }
- if err := idx.Close(); err != nil {
return err
}
- return w.fs.Rename(w.fw.Filename(), fmt.Sprintf("%s.pack", base))
-}
-
-func (w *PackWriter) encodeIdx(writer io.Writer) error {
- e := idxfile.NewEncoder(writer)
- _, err := e.Encode(&w.index)
- return err
+ *refs = append(*refs, ref)
+ return nil
}
-type syncedReader struct {
- w io.Writer
- r io.ReadSeeker
+func (d *DotGit) readReferenceFile(refsPath, refFile string) (ref *core.Reference, err error) {
+ path := d.fs.Join(refsPath, refFile)
- blocked, done uint32
- written, read uint64
- news chan bool
-}
-
-func newSyncedReader(w io.Writer, r io.ReadSeeker) *syncedReader {
- return &syncedReader{
- w: w,
- r: r,
- news: make(chan bool),
+ f, err := d.fs.Open(path)
+ if err != nil {
+ return nil, err
}
-}
-func (s *syncedReader) Write(p []byte) (n int, err error) {
defer func() {
- written := atomic.AddUint64(&s.written, uint64(n))
- read := atomic.LoadUint64(&s.read)
- if written > read {
- s.wake()
+ if errClose := f.Close(); err == nil {
+ err = errClose
}
}()
- n, err = s.w.Write(p)
- return
-}
-
-func (s *syncedReader) Read(p []byte) (n int, err error) {
- defer func() { atomic.AddUint64(&s.read, uint64(n)) }()
-
- s.sleep()
- n, err = s.r.Read(p)
- if err == io.EOF && !s.isDone() {
- if n == 0 {
- return s.Read(p)
- }
-
- return n, nil
+ b, err := ioutil.ReadAll(f)
+ if err != nil {
+ return nil, err
}
- return
-}
-
-func (s *syncedReader) isDone() bool {
- return atomic.LoadUint32(&s.done) == 1
-}
-
-func (s *syncedReader) isBlocked() bool {
- return atomic.LoadUint32(&s.blocked) == 1
+ line := strings.TrimSpace(string(b))
+ return core.NewReferenceFromStrings(refFile, line), nil
}
-func (s *syncedReader) wake() {
- if s.isBlocked() {
- // fmt.Println("wake")
- atomic.StoreUint32(&s.blocked, 0)
- s.news <- true
- }
-}
+func isHex(s string) bool {
+ for _, b := range []byte(s) {
+ if isNum(b) {
+ continue
+ }
+ if isHexAlpha(b) {
+ continue
+ }
-func (s *syncedReader) sleep() {
- read := atomic.LoadUint64(&s.read)
- written := atomic.LoadUint64(&s.written)
- if read >= written {
- atomic.StoreUint32(&s.blocked, 1)
- // fmt.Println("sleep", read, written)
- <-s.news
+ return false
}
+ return true
}
-func (s *syncedReader) Seek(offset int64, whence int) (int64, error) {
- if whence == io.SeekCurrent {
- return s.r.Seek(offset, whence)
- }
-
- p, err := s.r.Seek(offset, whence)
- s.read = uint64(p)
-
- return p, err
+func isNum(b byte) bool {
+ return b >= '0' && b <= '9'
}
-func (s *syncedReader) Close() error {
- atomic.StoreUint32(&s.done, 1)
- close(s.news)
- return nil
+func isHexAlpha(b byte) bool {
+ return b >= 'a' && b <= 'f' || b >= 'A' && b <= 'F'
}
diff --git a/storage/filesystem/internal/dotgit/dotgit_test.go b/storage/filesystem/internal/dotgit/dotgit_test.go
index f105c58..ebd8596 100644
--- a/storage/filesystem/internal/dotgit/dotgit_test.go
+++ b/storage/filesystem/internal/dotgit/dotgit_test.go
@@ -1,13 +1,9 @@
package dotgit
import (
- "fmt"
- "io"
"io/ioutil"
- "log"
"os"
"path/filepath"
- "strconv"
"strings"
"testing"
@@ -26,6 +22,41 @@ type SuiteDotGit struct {
var _ = Suite(&SuiteDotGit{})
+func (s *SuiteDotGit) TestSetRefs(c *C) {
+ tmp, err := ioutil.TempDir("", "dot-git")
+ c.Assert(err, IsNil)
+ defer os.RemoveAll(tmp)
+
+ fs := fs.NewOS(tmp)
+ dir := New(fs)
+
+ err = dir.SetRef(core.NewReferenceFromStrings(
+ "refs/heads/foo",
+ "e8d3ffab552895c19b9fcf7aa264d277cde33881",
+ ))
+
+ c.Assert(err, IsNil)
+
+ err = dir.SetRef(core.NewReferenceFromStrings(
+ "refs/heads/symbolic",
+ "ref: refs/heads/foo",
+ ))
+
+ c.Assert(err, IsNil)
+
+ refs, err := dir.Refs()
+ c.Assert(err, IsNil)
+ c.Assert(refs, HasLen, 2)
+
+ ref := findReference(refs, "refs/heads/foo")
+ c.Assert(ref, NotNil)
+ c.Assert(ref.Hash().String(), Equals, "e8d3ffab552895c19b9fcf7aa264d277cde33881")
+
+ ref = findReference(refs, "refs/heads/symbolic")
+ c.Assert(ref, NotNil)
+ c.Assert(ref.Target().String(), Equals, "refs/heads/foo")
+}
+
func (s *SuiteDotGit) TestRefsFromPackedRefs(c *C) {
fs := fixtures.Basic().ByTag(".git").One().DotGit()
dir := New(fs)
@@ -128,6 +159,31 @@ func (s *SuiteDotGit) TestObjectPackNotFound(c *C) {
c.Assert(idx, IsNil)
}
+func (s *SuiteDotGit) TestNewObject(c *C) {
+ tmp, err := ioutil.TempDir("", "dot-git")
+ c.Assert(err, IsNil)
+ defer os.RemoveAll(tmp)
+
+ fs := fs.NewOS(tmp)
+ dir := New(fs)
+ w, err := dir.NewObject()
+ c.Assert(err, IsNil)
+
+ err = w.WriteHeader(core.BlobObject, 14)
+ n, err := w.Write([]byte("this is a test"))
+ c.Assert(err, IsNil)
+ c.Assert(n, Equals, 14)
+
+ c.Assert(w.Hash().String(), Equals, "a8a940627d132695a9769df883f85992f0ff4a43")
+
+ err = w.Close()
+ c.Assert(err, IsNil)
+
+ i, err := fs.Stat("objects/a8/a940627d132695a9769df883f85992f0ff4a43")
+ c.Assert(err, IsNil)
+ c.Assert(i.Size(), Equals, int64(34))
+}
+
func (s *SuiteDotGit) TestObjects(c *C) {
fs := fixtures.ByTag(".git").ByTag("unpacked").One().DotGit()
dir := New(fs)
@@ -162,77 +218,3 @@ func (s *SuiteDotGit) TestObjectNotFound(c *C) {
c.Assert(err, NotNil)
c.Assert(file, IsNil)
}
-
-func (s *SuiteDotGit) TestNewObjectPack(c *C) {
- f := fixtures.Basic().One()
-
- dir, err := ioutil.TempDir("", "example")
- if err != nil {
- log.Fatal(err)
- }
-
- defer os.RemoveAll(dir)
-
- fs := fs.NewOS(dir)
- dot := New(fs)
-
- w, err := dot.NewObjectPack()
- c.Assert(err, IsNil)
-
- _, err = io.Copy(w, f.Packfile())
- c.Assert(err, IsNil)
-
- c.Assert(w.Close(), IsNil)
-
- stat, err := fs.Stat(fmt.Sprintf("objects/pack/pack-%s.pack", f.PackfileHash))
- c.Assert(err, IsNil)
- c.Assert(stat.Size(), Equals, int64(84794))
-
- stat, err = fs.Stat(fmt.Sprintf("objects/pack/pack-%s.idx", f.PackfileHash))
- c.Assert(err, IsNil)
- c.Assert(stat.Size(), Equals, int64(1940))
-}
-
-func (s *SuiteDotGit) TestSyncedReader(c *C) {
- tmpw, err := ioutil.TempFile("", "example")
- c.Assert(err, IsNil)
-
- tmpr, err := os.Open(tmpw.Name())
- c.Assert(err, IsNil)
-
- defer func() {
- tmpw.Close()
- tmpr.Close()
- os.Remove(tmpw.Name())
- }()
-
- synced := newSyncedReader(tmpw, tmpr)
-
- go func() {
- for i := 0; i < 281; i++ {
- _, err := synced.Write([]byte(strconv.Itoa(i) + "\n"))
- c.Assert(err, IsNil)
- }
-
- synced.Close()
- }()
-
- o, err := synced.Seek(1002, io.SeekStart)
- c.Assert(err, IsNil)
- c.Assert(o, Equals, int64(1002))
-
- head := make([]byte, 3)
- n, err := io.ReadFull(synced, head)
- c.Assert(err, IsNil)
- c.Assert(n, Equals, 3)
- c.Assert(string(head), Equals, "278")
-
- o, err = synced.Seek(1010, io.SeekStart)
- c.Assert(err, IsNil)
- c.Assert(o, Equals, int64(1010))
-
- n, err = io.ReadFull(synced, head)
- c.Assert(err, IsNil)
- c.Assert(n, Equals, 3)
- c.Assert(string(head), Equals, "280")
-}
diff --git a/storage/filesystem/internal/dotgit/refs.go b/storage/filesystem/internal/dotgit/refs.go
deleted file mode 100644
index 8f28332..0000000
--- a/storage/filesystem/internal/dotgit/refs.go
+++ /dev/null
@@ -1,149 +0,0 @@
-package dotgit
-
-import (
- "bufio"
- "errors"
- "io/ioutil"
- "os"
- "strings"
-
- "gopkg.in/src-d/go-git.v4/core"
-)
-
-var (
- // ErrPackedRefsDuplicatedRef is returned when a duplicated
- // reference is found in the packed-ref file. This is usually the
- // case for corrupted git repositories.
- ErrPackedRefsDuplicatedRef = errors.New("duplicated ref found in packed-ref file")
- // ErrPackedRefsBadFormat is returned when the packed-ref file
- // corrupt.
- ErrPackedRefsBadFormat = errors.New("malformed packed-ref")
- // ErrSymRefTargetNotFound is returned when a symbolic reference is
- // targeting a non-existing object. This usually means the
- // repository is corrupt.
- ErrSymRefTargetNotFound = errors.New("symbolic reference target not found")
-)
-
-const (
- refsPath = "refs"
-)
-
-func (d *DotGit) addRefsFromPackedRefs(refs *[]*core.Reference) (err error) {
- f, err := d.fs.Open(packedRefsPath)
- if err != nil {
- if os.IsNotExist(err) {
- return nil
- }
- return err
- }
-
- defer func() {
- if errClose := f.Close(); err == nil {
- err = errClose
- }
- }()
- s := bufio.NewScanner(f)
- for s.Scan() {
- ref, err := d.processLine(s.Text())
- if err != nil {
- return err
- }
-
- if ref != nil {
- *refs = append(*refs, ref)
- }
- }
-
- return s.Err()
-}
-
-// process lines from a packed-refs file
-func (d *DotGit) processLine(line string) (*core.Reference, error) {
- switch line[0] {
- case '#': // comment - ignore
- return nil, nil
- case '^': // annotated tag commit of the previous line - ignore
- return nil, nil
- default:
- ws := strings.Split(line, " ") // hash then ref
- if len(ws) != 2 {
- return nil, ErrPackedRefsBadFormat
- }
-
- return core.NewReferenceFromStrings(ws[1], ws[0]), nil
- }
-}
-
-func (d *DotGit) addRefsFromRefDir(refs *[]*core.Reference) error {
- return d.walkReferencesTree(refs, refsPath)
-}
-
-func (d *DotGit) walkReferencesTree(refs *[]*core.Reference, relPath string) error {
- files, err := d.fs.ReadDir(relPath)
- if err != nil {
- if os.IsNotExist(err) {
- return nil
- }
-
- return err
- }
-
- for _, f := range files {
- newRelPath := d.fs.Join(relPath, f.Name())
- if f.IsDir() {
- if err = d.walkReferencesTree(refs, newRelPath); err != nil {
- return err
- }
-
- continue
- }
-
- ref, err := d.readReferenceFile(".", newRelPath)
- if err != nil {
- return err
- }
-
- if ref != nil {
- *refs = append(*refs, ref)
- }
- }
-
- return nil
-}
-
-func (d *DotGit) addRefFromHEAD(refs *[]*core.Reference) error {
- ref, err := d.readReferenceFile(".", "HEAD")
- if err != nil {
- if os.IsNotExist(err) {
- return nil
- }
-
- return err
- }
-
- *refs = append(*refs, ref)
- return nil
-}
-
-func (d *DotGit) readReferenceFile(refsPath, refFile string) (ref *core.Reference, err error) {
- path := d.fs.Join(refsPath, refFile)
-
- f, err := d.fs.Open(path)
- if err != nil {
- return nil, err
- }
-
- defer func() {
- if errClose := f.Close(); err == nil {
- err = errClose
- }
- }()
-
- b, err := ioutil.ReadAll(f)
- if err != nil {
- return nil, err
- }
-
- line := strings.TrimSpace(string(b))
- return core.NewReferenceFromStrings(refFile, line), nil
-}
diff --git a/storage/filesystem/internal/dotgit/writers.go b/storage/filesystem/internal/dotgit/writers.go
new file mode 100644
index 0000000..40b004f
--- /dev/null
+++ b/storage/filesystem/internal/dotgit/writers.go
@@ -0,0 +1,263 @@
+package dotgit
+
+import (
+ "crypto/sha1"
+ "fmt"
+ "io"
+ "sync/atomic"
+ "time"
+
+ "gopkg.in/src-d/go-git.v4/core"
+ "gopkg.in/src-d/go-git.v4/formats/idxfile"
+ "gopkg.in/src-d/go-git.v4/formats/objfile"
+ "gopkg.in/src-d/go-git.v4/formats/packfile"
+ "gopkg.in/src-d/go-git.v4/utils/fs"
+)
+
+type PackWriter struct {
+ Notify func(h core.Hash, i idxfile.Idxfile)
+
+ fs fs.Filesystem
+ fr, fw fs.File
+ synced *syncedReader
+ checksum core.Hash
+ index idxfile.Idxfile
+ result chan error
+}
+
+func newPackWrite(fs fs.Filesystem) (*PackWriter, error) {
+ seed := sha1.Sum([]byte(time.Now().String()))
+ tmp := fs.Join(objectsPath, packPath, fmt.Sprintf("tmp_pack_%x", seed))
+
+ fw, err := fs.Create(tmp)
+ if err != nil {
+ return nil, err
+ }
+
+ fr, err := fs.Open(tmp)
+ if err != nil {
+ return nil, err
+ }
+
+ writer := &PackWriter{
+ fs: fs,
+ fw: fw,
+ fr: fr,
+ synced: newSyncedReader(fw, fr),
+ result: make(chan error),
+ }
+
+ go writer.buildIndex()
+ return writer, nil
+}
+
+func (w *PackWriter) buildIndex() {
+ s := packfile.NewScanner(w.synced)
+ d, err := packfile.NewDecoder(s, nil)
+ if err != nil {
+ w.result <- err
+ return
+ }
+
+ checksum, err := d.Decode()
+ if err != nil {
+ w.result <- err
+ return
+ }
+
+ w.checksum = checksum
+ w.index.PackfileChecksum = checksum
+ w.index.Version = idxfile.VersionSupported
+
+ offsets := d.Offsets()
+ for h, crc := range d.CRCs() {
+ w.index.Add(h, uint64(offsets[h]), crc)
+ }
+
+ w.result <- err
+}
+
+func (w *PackWriter) Write(p []byte) (n int, err error) {
+ return w.synced.Write(p)
+}
+
+func (w *PackWriter) Close() error {
+ defer func() {
+ close(w.result)
+ }()
+
+ pipe := []func() error{
+ w.synced.Close,
+ func() error { return <-w.result },
+ w.fr.Close,
+ w.fw.Close,
+ w.save,
+ }
+
+ for _, f := range pipe {
+ if err := f(); err != nil {
+ return err
+ }
+ }
+
+ if w.Notify != nil {
+ w.Notify(w.checksum, w.index)
+ }
+
+ return nil
+}
+
+func (w *PackWriter) save() error {
+ base := w.fs.Join(objectsPath, packPath, fmt.Sprintf("pack-%s", w.checksum))
+ idx, err := w.fs.Create(fmt.Sprintf("%s.idx", base))
+ if err != nil {
+ return err
+ }
+
+ if err := w.encodeIdx(idx); err != nil {
+ return err
+ }
+
+ if err := idx.Close(); err != nil {
+ return err
+ }
+
+ return w.fs.Rename(w.fw.Filename(), fmt.Sprintf("%s.pack", base))
+}
+
+func (w *PackWriter) encodeIdx(writer io.Writer) error {
+ e := idxfile.NewEncoder(writer)
+ _, err := e.Encode(&w.index)
+ return err
+}
+
+type syncedReader struct {
+ w io.Writer
+ r io.ReadSeeker
+
+ blocked, done uint32
+ written, read uint64
+ news chan bool
+}
+
+func newSyncedReader(w io.Writer, r io.ReadSeeker) *syncedReader {
+ return &syncedReader{
+ w: w,
+ r: r,
+ news: make(chan bool),
+ }
+}
+
+func (s *syncedReader) Write(p []byte) (n int, err error) {
+ defer func() {
+ written := atomic.AddUint64(&s.written, uint64(n))
+ read := atomic.LoadUint64(&s.read)
+ if written > read {
+ s.wake()
+ }
+ }()
+
+ n, err = s.w.Write(p)
+ return
+}
+
+func (s *syncedReader) Read(p []byte) (n int, err error) {
+ defer func() { atomic.AddUint64(&s.read, uint64(n)) }()
+
+ s.sleep()
+ n, err = s.r.Read(p)
+ if err == io.EOF && !s.isDone() {
+ if n == 0 {
+ return s.Read(p)
+ }
+
+ return n, nil
+ }
+
+ return
+}
+
+func (s *syncedReader) isDone() bool {
+ return atomic.LoadUint32(&s.done) == 1
+}
+
+func (s *syncedReader) isBlocked() bool {
+ return atomic.LoadUint32(&s.blocked) == 1
+}
+
+func (s *syncedReader) wake() {
+ if s.isBlocked() {
+ // fmt.Println("wake")
+ atomic.StoreUint32(&s.blocked, 0)
+ s.news <- true
+ }
+}
+
+func (s *syncedReader) sleep() {
+ read := atomic.LoadUint64(&s.read)
+ written := atomic.LoadUint64(&s.written)
+ if read >= written {
+ atomic.StoreUint32(&s.blocked, 1)
+ // fmt.Println("sleep", read, written)
+ <-s.news
+ }
+
+}
+
+func (s *syncedReader) Seek(offset int64, whence int) (int64, error) {
+ if whence == io.SeekCurrent {
+ return s.r.Seek(offset, whence)
+ }
+
+ p, err := s.r.Seek(offset, whence)
+ s.read = uint64(p)
+
+ return p, err
+}
+
+func (s *syncedReader) Close() error {
+ atomic.StoreUint32(&s.done, 1)
+ close(s.news)
+ return nil
+}
+
+type ObjectWriter struct {
+ objfile.Writer
+ fs fs.Filesystem
+ f fs.File
+}
+
+func newObjectWriter(fs fs.Filesystem) (*ObjectWriter, error) {
+ seed := sha1.Sum([]byte(time.Now().String()))
+ tmp := fs.Join(objectsPath, fmt.Sprintf("tmp_obj_%x", seed))
+
+ f, err := fs.Create(tmp)
+ if err != nil {
+ return nil, err
+ }
+
+ return &ObjectWriter{
+ Writer: (*objfile.NewWriter(f)),
+ fs: fs,
+ f: f,
+ }, nil
+}
+
+func (w *ObjectWriter) Close() error {
+ if err := w.Writer.Close(); err != nil {
+ return err
+ }
+
+ if err := w.f.Close(); err != nil {
+ return err
+ }
+
+ return w.save()
+}
+
+func (w *ObjectWriter) save() error {
+ hash := w.Hash().String()
+ file := w.fs.Join(objectsPath, hash[0:2], hash[2:40])
+
+ return w.fs.Rename(w.f.Filename(), file)
+}
diff --git a/storage/filesystem/internal/dotgit/writers_test.go b/storage/filesystem/internal/dotgit/writers_test.go
new file mode 100644
index 0000000..ebecbb4
--- /dev/null
+++ b/storage/filesystem/internal/dotgit/writers_test.go
@@ -0,0 +1,89 @@
+package dotgit
+
+import (
+ "fmt"
+ "io"
+ "io/ioutil"
+ "log"
+ "os"
+ "strconv"
+
+ "gopkg.in/src-d/go-git.v4/fixtures"
+ "gopkg.in/src-d/go-git.v4/utils/fs"
+
+ . "gopkg.in/check.v1"
+)
+
+func (s *SuiteDotGit) TestNewObjectPack(c *C) {
+ f := fixtures.Basic().One()
+
+ dir, err := ioutil.TempDir("", "example")
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ defer os.RemoveAll(dir)
+
+ fs := fs.NewOS(dir)
+ dot := New(fs)
+
+ w, err := dot.NewObjectPack()
+ c.Assert(err, IsNil)
+
+ _, err = io.Copy(w, f.Packfile())
+ c.Assert(err, IsNil)
+
+ c.Assert(w.Close(), IsNil)
+
+ stat, err := fs.Stat(fmt.Sprintf("objects/pack/pack-%s.pack", f.PackfileHash))
+ c.Assert(err, IsNil)
+ c.Assert(stat.Size(), Equals, int64(84794))
+
+ stat, err = fs.Stat(fmt.Sprintf("objects/pack/pack-%s.idx", f.PackfileHash))
+ c.Assert(err, IsNil)
+ c.Assert(stat.Size(), Equals, int64(1940))
+}
+
+func (s *SuiteDotGit) TestSyncedReader(c *C) {
+ tmpw, err := ioutil.TempFile("", "example")
+ c.Assert(err, IsNil)
+
+ tmpr, err := os.Open(tmpw.Name())
+ c.Assert(err, IsNil)
+
+ defer func() {
+ tmpw.Close()
+ tmpr.Close()
+ os.Remove(tmpw.Name())
+ }()
+
+ synced := newSyncedReader(tmpw, tmpr)
+
+ go func() {
+ for i := 0; i < 281; i++ {
+ _, err := synced.Write([]byte(strconv.Itoa(i) + "\n"))
+ c.Assert(err, IsNil)
+ }
+
+ synced.Close()
+ }()
+
+ o, err := synced.Seek(1002, io.SeekStart)
+ c.Assert(err, IsNil)
+ c.Assert(o, Equals, int64(1002))
+
+ head := make([]byte, 3)
+ n, err := io.ReadFull(synced, head)
+ c.Assert(err, IsNil)
+ c.Assert(n, Equals, 3)
+ c.Assert(string(head), Equals, "278")
+
+ o, err = synced.Seek(1010, io.SeekStart)
+ c.Assert(err, IsNil)
+ c.Assert(o, Equals, int64(1010))
+
+ n, err = io.ReadFull(synced, head)
+ c.Assert(err, IsNil)
+ c.Assert(n, Equals, 3)
+ c.Assert(string(head), Equals, "280")
+}
diff --git a/storage/filesystem/object.go b/storage/filesystem/object.go
index 03939ce..f2f5351 100644
--- a/storage/filesystem/object.go
+++ b/storage/filesystem/object.go
@@ -118,31 +118,30 @@ func (s *ObjectStorage) getFromUnpacked(h core.Hash) (obj core.Object, err error
return nil, err
}
- defer func() {
- errClose := f.Close()
- if err == nil {
- err = errClose
- }
- }()
+ defer f.Close()
obj = s.NewObject()
- objReader, err := objfile.NewReader(f)
+ r, err := objfile.NewReader(f)
if err != nil {
return nil, err
}
- defer func() {
- errClose := objReader.Close()
- if err == nil {
- err = errClose
- }
- }()
+ defer r.Close()
- if err := objReader.FillObject(obj); err != nil {
+ t, size, err := r.Header()
+ if err != nil {
return nil, err
}
- return obj, nil
+ obj.SetType(t)
+ obj.SetSize(size)
+ w, err := obj.Writer()
+ if err != nil {
+ return nil, err
+ }
+
+ _, err = io.Copy(w, r)
+ return obj, err
}
// Get returns the object with the given hash, by searching for it in
@@ -278,11 +277,7 @@ type packfileIter struct {
total uint32
}
-func newPackfileIter(
- f fs.File,
- t core.ObjectType,
- seen map[core.Hash]bool,
-) (core.ObjectIter, error) {
+func newPackfileIter(f fs.File, t core.ObjectType, seen map[core.Hash]bool) (core.ObjectIter, error) {
s := packfile.NewScanner(f)
_, total, err := s.Header()
if err != nil {
@@ -294,7 +289,14 @@ func newPackfileIter(
return nil, err
}
- return &packfileIter{f: f, d: d, t: t, total: total, seen: seen}, nil
+ return &packfileIter{
+ f: f,
+ d: d,
+ t: t,
+
+ total: total,
+ seen: seen,
+ }, nil
}
func (iter *packfileIter) Next() (core.Object, error) {