diff options
Diffstat (limited to 'plumbing')
-rw-r--r-- | plumbing/protocol/packp/srvresp.go | 49 | ||||
-rw-r--r-- | plumbing/protocol/packp/srvresp_test.go | 39 | ||||
-rw-r--r-- | plumbing/protocol/packp/updreq.go | 1 | ||||
-rw-r--r-- | plumbing/protocol/packp/uppackresp.go | 10 | ||||
-rw-r--r-- | plumbing/protocol/packp/uppackresp_test.go | 14 | ||||
-rw-r--r-- | plumbing/transport/server/receive_pack_test.go | 5 | ||||
-rw-r--r-- | plumbing/transport/server/server.go | 75 | ||||
-rw-r--r-- | plumbing/transport/server/server_test.go | 8 | ||||
-rw-r--r-- | plumbing/transport/server/upload_pack_test.go | 17 | ||||
-rw-r--r-- | plumbing/transport/test/receive_pack.go | 4 |
10 files changed, 161 insertions, 61 deletions
diff --git a/plumbing/protocol/packp/srvresp.go b/plumbing/protocol/packp/srvresp.go index 0c89e47..b214341 100644 --- a/plumbing/protocol/packp/srvresp.go +++ b/plumbing/protocol/packp/srvresp.go @@ -1,6 +1,7 @@ package packp import ( + "bufio" "bytes" "errors" "fmt" @@ -18,8 +19,8 @@ type ServerResponse struct { } // Decode decodes the response into the struct, isMultiACK should be true, if -// the request was done with multi_ack or multi_ack_detailed capabilities -func (r *ServerResponse) Decode(reader io.Reader, isMultiACK bool) error { +// the request was done with multi_ack or multi_ack_detailed capabilities. +func (r *ServerResponse) Decode(reader *bufio.Reader, isMultiACK bool) error { // TODO: implement support for multi_ack or multi_ack_detailed responses if isMultiACK { return errors.New("multi_ack and multi_ack_detailed are not supported") @@ -34,7 +35,15 @@ func (r *ServerResponse) Decode(reader io.Reader, isMultiACK bool) error { return err } - if !isMultiACK { + // we need to detect when the end of a response header and the begining + // of a packfile header happend, some requests to the git daemon + // produces a duplicate ACK header even when multi_ack is not supported. + stop, err := r.stopReading(reader) + if err != nil { + return err + } + + if stop { break } } @@ -42,6 +51,40 @@ func (r *ServerResponse) Decode(reader io.Reader, isMultiACK bool) error { return s.Err() } +// stopReading detects when a valid command such as ACK or NAK is found to be +// read in the buffer without moving the read pointer. +func (r *ServerResponse) stopReading(reader *bufio.Reader) (bool, error) { + ahead, err := reader.Peek(7) + if err == io.EOF { + return true, nil + } + + if err != nil { + return false, err + } + + if len(ahead) > 4 && r.isValidCommand(ahead[0:3]) { + return false, nil + } + + if len(ahead) == 7 && r.isValidCommand(ahead[4:]) { + return false, nil + } + + return true, nil +} + +func (r *ServerResponse) isValidCommand(b []byte) bool { + commands := [][]byte{ack, nak} + for _, c := range commands { + if bytes.Compare(b, c) == 0 { + return true + } + } + + return false +} + func (r *ServerResponse) decodeLine(line []byte) error { if len(line) == 0 { return fmt.Errorf("unexpected flush") diff --git a/plumbing/protocol/packp/srvresp_test.go b/plumbing/protocol/packp/srvresp_test.go index 6078855..c8ef520 100644 --- a/plumbing/protocol/packp/srvresp_test.go +++ b/plumbing/protocol/packp/srvresp_test.go @@ -1,6 +1,7 @@ package packp import ( + "bufio" "bytes" "gopkg.in/src-d/go-git.v4/plumbing" @@ -16,7 +17,7 @@ func (s *ServerResponseSuite) TestDecodeNAK(c *C) { raw := "0008NAK\n" sr := &ServerResponse{} - err := sr.Decode(bytes.NewBufferString(raw), false) + err := sr.Decode(bufio.NewReader(bytes.NewBufferString(raw)), false) c.Assert(err, IsNil) c.Assert(sr.ACKs, HasLen, 0) @@ -26,23 +27,53 @@ func (s *ServerResponseSuite) TestDecodeACK(c *C) { raw := "0031ACK 6ecf0ef2c2dffb796033e5a02219af86ec6584e5\n" sr := &ServerResponse{} - err := sr.Decode(bytes.NewBufferString(raw), false) + err := sr.Decode(bufio.NewReader(bytes.NewBufferString(raw)), false) c.Assert(err, IsNil) c.Assert(sr.ACKs, HasLen, 1) c.Assert(sr.ACKs[0], Equals, plumbing.NewHash("6ecf0ef2c2dffb796033e5a02219af86ec6584e5")) } +func (s *ServerResponseSuite) TestDecodeMultipleACK(c *C) { + raw := "" + + "0031ACK 1111111111111111111111111111111111111111\n" + + "0031ACK 6ecf0ef2c2dffb796033e5a02219af86ec6584e5\n" + + "00080PACK\n" + + sr := &ServerResponse{} + err := sr.Decode(bufio.NewReader(bytes.NewBufferString(raw)), false) + c.Assert(err, IsNil) + + c.Assert(sr.ACKs, HasLen, 2) + c.Assert(sr.ACKs[0], Equals, plumbing.NewHash("1111111111111111111111111111111111111111")) + c.Assert(sr.ACKs[1], Equals, plumbing.NewHash("6ecf0ef2c2dffb796033e5a02219af86ec6584e5")) +} + +func (s *ServerResponseSuite) TestDecodeMultipleACKWithSideband(c *C) { + raw := "" + + "0031ACK 1111111111111111111111111111111111111111\n" + + "0031ACK 6ecf0ef2c2dffb796033e5a02219af86ec6584e5\n" + + "00080aaaa\n" + + sr := &ServerResponse{} + err := sr.Decode(bufio.NewReader(bytes.NewBufferString(raw)), false) + c.Assert(err, IsNil) + + c.Assert(sr.ACKs, HasLen, 2) + c.Assert(sr.ACKs[0], Equals, plumbing.NewHash("1111111111111111111111111111111111111111")) + c.Assert(sr.ACKs[1], Equals, plumbing.NewHash("6ecf0ef2c2dffb796033e5a02219af86ec6584e5")) +} + func (s *ServerResponseSuite) TestDecodeMalformed(c *C) { raw := "0029ACK 6ecf0ef2c2dffb796033e5a02219af86ec6584e\n" sr := &ServerResponse{} - err := sr.Decode(bytes.NewBufferString(raw), false) + err := sr.Decode(bufio.NewReader(bytes.NewBufferString(raw)), false) c.Assert(err, NotNil) } func (s *ServerResponseSuite) TestDecodeMultiACK(c *C) { sr := &ServerResponse{} - err := sr.Decode(bytes.NewBuffer(nil), true) + err := sr.Decode(bufio.NewReader(bytes.NewBuffer(nil)), true) c.Assert(err, NotNil) } diff --git a/plumbing/protocol/packp/updreq.go b/plumbing/protocol/packp/updreq.go index 0624930..b246613 100644 --- a/plumbing/protocol/packp/updreq.go +++ b/plumbing/protocol/packp/updreq.go @@ -42,6 +42,7 @@ func NewReferenceUpdateRequest() *ReferenceUpdateRequest { // - report-status // - ofs-delta // - ref-delta +// - delete-refs // It leaves up to the user to add the following capabilities later: // - atomic // - ofs-delta diff --git a/plumbing/protocol/packp/uppackresp.go b/plumbing/protocol/packp/uppackresp.go index ac456f3..c18e159 100644 --- a/plumbing/protocol/packp/uppackresp.go +++ b/plumbing/protocol/packp/uppackresp.go @@ -4,6 +4,8 @@ import ( "errors" "io" + "bufio" + "gopkg.in/src-d/go-git.v4/plumbing/protocol/packp/capability" "gopkg.in/src-d/go-git.v4/utils/ioutil" ) @@ -51,18 +53,20 @@ func NewUploadPackResponseWithPackfile(req *UploadPackRequest, // Decode decodes all the responses sent by upload-pack service into the struct // and prepares it to read the packfile using the Read method func (r *UploadPackResponse) Decode(reader io.ReadCloser) error { + buf := bufio.NewReader(reader) + if r.isShallow { - if err := r.ShallowUpdate.Decode(reader); err != nil { + if err := r.ShallowUpdate.Decode(buf); err != nil { return err } } - if err := r.ServerResponse.Decode(reader, r.isMultiACK); err != nil { + if err := r.ServerResponse.Decode(buf, r.isMultiACK); err != nil { return err } // now the reader is ready to read the packfile content - r.r = reader + r.r = ioutil.NewReadCloser(buf, reader) return nil } diff --git a/plumbing/protocol/packp/uppackresp_test.go b/plumbing/protocol/packp/uppackresp_test.go index c27fdda..789444d 100644 --- a/plumbing/protocol/packp/uppackresp_test.go +++ b/plumbing/protocol/packp/uppackresp_test.go @@ -15,7 +15,7 @@ type UploadPackResponseSuite struct{} var _ = Suite(&UploadPackResponseSuite{}) func (s *UploadPackResponseSuite) TestDecodeNAK(c *C) { - raw := "0008NAK\n[PACK]" + raw := "0008NAK\nPACK" req := NewUploadPackRequest() res := NewUploadPackResponse(req) @@ -26,11 +26,11 @@ func (s *UploadPackResponseSuite) TestDecodeNAK(c *C) { pack, err := ioutil.ReadAll(res) c.Assert(err, IsNil) - c.Assert(pack, DeepEquals, []byte("[PACK]")) + c.Assert(pack, DeepEquals, []byte("PACK")) } func (s *UploadPackResponseSuite) TestDecodeDepth(c *C) { - raw := "00000008NAK\n[PACK]" + raw := "00000008NAK\nPACK" req := NewUploadPackRequest() req.Depth = DepthCommits(1) @@ -43,11 +43,11 @@ func (s *UploadPackResponseSuite) TestDecodeDepth(c *C) { pack, err := ioutil.ReadAll(res) c.Assert(err, IsNil) - c.Assert(pack, DeepEquals, []byte("[PACK]")) + c.Assert(pack, DeepEquals, []byte("PACK")) } func (s *UploadPackResponseSuite) TestDecodeMalformed(c *C) { - raw := "00000008ACK\n[PACK]" + raw := "00000008ACK\nPACK" req := NewUploadPackRequest() req.Depth = DepthCommits(1) @@ -96,7 +96,7 @@ func (s *UploadPackResponseSuite) TestEncodeNAK(c *C) { } func (s *UploadPackResponseSuite) TestEncodeDepth(c *C) { - pf := ioutil.NopCloser(bytes.NewBuffer([]byte("[PACK]"))) + pf := ioutil.NopCloser(bytes.NewBuffer([]byte("PACK"))) req := NewUploadPackRequest() req.Depth = DepthCommits(1) @@ -106,7 +106,7 @@ func (s *UploadPackResponseSuite) TestEncodeDepth(c *C) { b := bytes.NewBuffer(nil) c.Assert(res.Encode(b), IsNil) - expected := "00000008NAK\n[PACK]" + expected := "00000008NAK\nPACK" c.Assert(string(b.Bytes()), Equals, expected) } diff --git a/plumbing/transport/server/receive_pack_test.go b/plumbing/transport/server/receive_pack_test.go index 73ba60b..54c2fba 100644 --- a/plumbing/transport/server/receive_pack_test.go +++ b/plumbing/transport/server/receive_pack_test.go @@ -27,11 +27,6 @@ func (s *ReceivePackSuite) TearDownTest(c *C) { s.Suite.TearDownSuite(c) } -// TODO -func (s *ReceivePackSuite) TestSendPackAddDeleteReference(c *C) { - c.Skip("delete reference not supported yet") -} - // Overwritten, server returns error earlier. func (s *ReceivePackSuite) TestAdvertisedReferencesNotExists(c *C) { r, err := s.Client.NewReceivePackSession(s.NonExistentEndpoint, s.EmptyAuth) diff --git a/plumbing/transport/server/server.go b/plumbing/transport/server/server.go index 89fce5f..7c78afe 100644 --- a/plumbing/transport/server/server.go +++ b/plumbing/transport/server/server.go @@ -26,7 +26,19 @@ type server struct { // NewServer returns a transport.Transport implementing a git server, // independent of transport. Each transport must wrap this. func NewServer(loader Loader) transport.Transport { - return &server{loader, &handler{}} + return &server{ + loader, + &handler{asClient: false}, + } +} + +// NewClient returns a transport.Transport implementing a client with an +// embedded server. +func NewClient(loader Loader) transport.Transport { + return &server{ + loader, + &handler{asClient: true}, + } } func (s *server) NewUploadPackSession(ep transport.Endpoint, auth transport.AuthMethod) (transport.UploadPackSession, error) { @@ -47,24 +59,27 @@ func (s *server) NewReceivePackSession(ep transport.Endpoint, auth transport.Aut return s.handler.NewReceivePackSession(sto) } -type handler struct{} +type handler struct { + asClient bool +} func (h *handler) NewUploadPackSession(s storer.Storer) (transport.UploadPackSession, error) { return &upSession{ - session: session{storer: s}, + session: session{storer: s, asClient: h.asClient}, }, nil } func (h *handler) NewReceivePackSession(s storer.Storer) (transport.ReceivePackSession, error) { return &rpSession{ - session: session{storer: s}, + session: session{storer: s, asClient: h.asClient}, cmdStatus: map[plumbing.ReferenceName]error{}, }, nil } type session struct { - storer storer.Storer - caps *capability.List + storer storer.Storer + caps *capability.List + asClient bool } func (s *session) Close() error { @@ -107,6 +122,10 @@ func (s *upSession) AdvertisedReferences() (*packp.AdvRefs, error) { return nil, err } + if s.asClient && len(ar.References) == 0 { + return nil, transport.ErrEmptyRemoteRepository + } + return ar, nil } @@ -225,31 +244,11 @@ func (s *rpSession) ReceivePack(req *packp.ReferenceUpdateRequest) (*packp.Repor return s.reportStatus(), err } - updatedRefs := s.updatedReferences(req) - - if s.caps.Supports(capability.Atomic) && s.firstErr != nil { - //TODO: add support for 'atomic' once we have reference - // transactions, currently we do not announce it. - rs := s.reportStatus() - for _, cs := range rs.CommandStatuses { - if cs.Error() == nil { - cs.Status = "" - } - } - } - - for name, ref := range updatedRefs { - //TODO: add support for 'delete-refs' once we can delete - // references, currently we do not announce it. - err := s.storer.SetReference(ref) - s.setStatus(name, err) - } - + s.updateReferences(req) return s.reportStatus(), s.firstErr } -func (s *rpSession) updatedReferences(req *packp.ReferenceUpdateRequest) map[plumbing.ReferenceName]*plumbing.Reference { - refs := map[plumbing.ReferenceName]*plumbing.Reference{} +func (s *rpSession) updateReferences(req *packp.ReferenceUpdateRequest) { for _, cmd := range req.Commands { exists, err := referenceExists(s.storer, cmd.Name) if err != nil { @@ -265,19 +264,16 @@ func (s *rpSession) updatedReferences(req *packp.ReferenceUpdateRequest) map[plu } ref := plumbing.NewHashReference(cmd.Name, cmd.New) - refs[ref.Name()] = ref + err := s.storer.SetReference(ref) + s.setStatus(cmd.Name, err) case packp.Delete: if !exists { s.setStatus(cmd.Name, ErrUpdateReference) continue } - if !s.caps.Supports(capability.DeleteRefs) { - s.setStatus(cmd.Name, fmt.Errorf("delete not supported")) - continue - } - - refs[cmd.Name] = nil + err := s.storer.RemoveReference(cmd.Name) + s.setStatus(cmd.Name, err) case packp.Update: if !exists { s.setStatus(cmd.Name, ErrUpdateReference) @@ -290,11 +286,10 @@ func (s *rpSession) updatedReferences(req *packp.ReferenceUpdateRequest) map[plu } ref := plumbing.NewHashReference(cmd.Name, cmd.New) - refs[ref.Name()] = ref + err := s.storer.SetReference(ref) + s.setStatus(cmd.Name, err) } } - - return refs } func (s *rpSession) failAtomicUpdate() (*packp.ReportStatus, error) { @@ -368,6 +363,10 @@ func (*rpSession) setSupportedCapabilities(c *capability.List) error { return err } + if err := c.Set(capability.DeleteRefs); err != nil { + return err + } + return c.Set(capability.ReportStatus) } diff --git a/plumbing/transport/server/server_test.go b/plumbing/transport/server/server_test.go index 0f7201c..7912768 100644 --- a/plumbing/transport/server/server_test.go +++ b/plumbing/transport/server/server_test.go @@ -20,12 +20,18 @@ type BaseSuite struct { loader server.MapLoader client transport.Transport clientBackup transport.Transport + asClient bool } func (s *BaseSuite) SetUpSuite(c *C) { s.Suite.SetUpSuite(c) s.loader = server.MapLoader{} - s.client = server.NewServer(s.loader) + if s.asClient { + s.client = server.NewClient(s.loader) + } else { + s.client = server.NewServer(s.loader) + } + s.clientBackup = client.Protocols["file"] client.Protocols["file"] = s.client } diff --git a/plumbing/transport/server/upload_pack_test.go b/plumbing/transport/server/upload_pack_test.go index 137f887..bd2b791 100644 --- a/plumbing/transport/server/upload_pack_test.go +++ b/plumbing/transport/server/upload_pack_test.go @@ -38,3 +38,20 @@ func (s *UploadPackSuite) TestAdvertisedReferencesNotExists(c *C) { c.Assert(err, Equals, transport.ErrRepositoryNotFound) c.Assert(r, IsNil) } + +// Tests server with `asClient = true`. This is recommended when using a server +// registered directly with `client.InstallProtocol`. +type ClientLikeUploadPackSuite struct { + UploadPackSuite +} + +var _ = Suite(&ClientLikeUploadPackSuite{}) + +func (s *ClientLikeUploadPackSuite) SetUpSuite(c *C) { + s.asClient = true + s.UploadPackSuite.SetUpSuite(c) +} + +func (s *ClientLikeUploadPackSuite) TestAdvertisedReferencesEmpty(c *C) { + s.UploadPackSuite.UploadPackSuite.TestAdvertisedReferencesEmpty(c) +} diff --git a/plumbing/transport/test/receive_pack.go b/plumbing/transport/test/receive_pack.go index bb1c58a..15172c8 100644 --- a/plumbing/transport/test/receive_pack.go +++ b/plumbing/transport/test/receive_pack.go @@ -308,6 +308,10 @@ func (s *ReceivePackSuite) testSendPackDeleteReference(c *C) { req.Capabilities.Set(capability.ReportStatus) } + if !ar.Capabilities.Supports(capability.DeleteRefs) { + c.Fatal("capability delete-refs not supported") + } + c.Assert(r.Close(), IsNil) s.receivePack(c, s.Endpoint, req, nil, false) |