From bd3dd4d421299699854bfe0353aae312bcf8c97c Mon Sep 17 00:00:00 2001 From: Máximo Cuadros Date: Wed, 30 Nov 2016 17:24:11 +0100 Subject: protocol/packp: sideband muxer and demuxer (#143) * format/pakp: sideband demuxer * format/pakp: sideband muxer * format/pakp: sideband demuxer and muxer * protocol/pakp: sideband demuxer and muxer * documentation and improvements * improvements * handle scan errors properly --- plumbing/protocol/packp/sideband/common.go | 33 ++++++ plumbing/protocol/packp/sideband/demux.go | 147 ++++++++++++++++++++++++ plumbing/protocol/packp/sideband/demux_test.go | 151 +++++++++++++++++++++++++ plumbing/protocol/packp/sideband/doc.go | 31 +++++ plumbing/protocol/packp/sideband/muxer.go | 65 +++++++++++ plumbing/protocol/packp/sideband/muxer_test.go | 39 +++++++ 6 files changed, 466 insertions(+) create mode 100644 plumbing/protocol/packp/sideband/common.go create mode 100644 plumbing/protocol/packp/sideband/demux.go create mode 100644 plumbing/protocol/packp/sideband/demux_test.go create mode 100644 plumbing/protocol/packp/sideband/doc.go create mode 100644 plumbing/protocol/packp/sideband/muxer.go create mode 100644 plumbing/protocol/packp/sideband/muxer_test.go (limited to 'plumbing') diff --git a/plumbing/protocol/packp/sideband/common.go b/plumbing/protocol/packp/sideband/common.go new file mode 100644 index 0000000..de50012 --- /dev/null +++ b/plumbing/protocol/packp/sideband/common.go @@ -0,0 +1,33 @@ +package sideband + +// Type sideband type "side-band" or "side-band-64k" +type Type int8 + +const ( + // Sideband legacy sideband type up to 1000-byte messages + Sideband Type = iota + // Sideband64k sideband type up to 65519-byte messages + Sideband64k Type = iota + + // MaxPackedSize for Sideband type + MaxPackedSize = 1000 + // MaxPackedSize64k for Sideband64k type + MaxPackedSize64k = 65520 +) + +// Channel sideband channel +type Channel byte + +// WithPayload encode the payload as a message +func (ch Channel) WithPayload(payload []byte) []byte { + return append([]byte{byte(ch)}, payload...) +} + +const ( + // PackData packfile content + PackData Channel = 1 + // ProgressMessage progress messages + ProgressMessage Channel = 2 + // ErrorMessage fatal error message just before stream aborts + ErrorMessage Channel = 3 +) diff --git a/plumbing/protocol/packp/sideband/demux.go b/plumbing/protocol/packp/sideband/demux.go new file mode 100644 index 0000000..09fe57d --- /dev/null +++ b/plumbing/protocol/packp/sideband/demux.go @@ -0,0 +1,147 @@ +package sideband + +import ( + "bytes" + "errors" + "fmt" + "io" + + "gopkg.in/src-d/go-git.v4/plumbing/format/pktline" +) + +// ErrMaxPackedExceeded returned by Read, if the maximum packed size is exceeded +var ErrMaxPackedExceeded = errors.New("max. packed size exceeded") + +// Progress allows to read the progress information +type Progress interface { + io.Reader +} + +// Demuxer demultiplex the progress reports and error info interleaved with the +// packfile itself. +// +// A sideband has three diferent channels the main one call PackData contains +// the packfile data, the ErrorMessage channel, that contains server errors and +// the last one ProgressMessage channel containing information about the ongoing +// tast happening in the server (optinal, can be suppressed sending NoProgress +// or Quiet capabilities to the server) +// +// In order to demultiplex the data stream, method `Read` should be called to +// retrieve the PackData channel, the incoming data from the ProgressMessage is +// stored and can be read from `Progress` field, if any message is retrieved +// from the ErrorMessage channel an error is returned and we can assume that the +// conection has been closed. +type Demuxer struct { + t Type + r io.Reader + s *pktline.Scanner + + max int + pending []byte + + // Progress contains progress information + Progress Progress +} + +// NewDemuxer returns a new Demuxer for the given t and read from r +func NewDemuxer(t Type, r io.Reader) *Demuxer { + max := MaxPackedSize64k + if t == Sideband { + max = MaxPackedSize + } + + return &Demuxer{ + t: t, + r: r, + max: max, + s: pktline.NewScanner(r), + Progress: bytes.NewBuffer(nil), + } +} + +// Read reads up to len(p) bytes from the PackData channel into p, an error can +// be return if an error happends when reading or if a message is sent in the +// ErrorMessage channel. +// +// If a ProgressMessage is read, it won't be copied to b. Instead of this, it is +// stored and can be read through the reader Progress. If the n value returned +// is zero, err will be nil unless an error reading happens. +func (d *Demuxer) Read(b []byte) (n int, err error) { + var read, req int + + req = len(b) + for read < req { + n, err := d.doRead(b[read:req]) + read += n + + if err != nil { + return read, err + } + } + + return read, nil +} + +func (d *Demuxer) doRead(b []byte) (int, error) { + read, err := d.nextPackData() + size := len(read) + wanted := len(b) + + if size > wanted { + d.pending = read[wanted:] + } + + if wanted > size { + wanted = size + } + + size = copy(b, read[:wanted]) + return size, err +} + +func (d *Demuxer) nextPackData() ([]byte, error) { + content := d.getPending() + if len(content) != 0 { + return content, nil + } + + if !d.s.Scan() { + if err := d.s.Err(); err != nil { + return nil, err + } + + return nil, io.EOF + } + + content = d.s.Bytes() + + size := len(content) + if size == 0 { + return nil, nil + } else if size > d.max { + return nil, ErrMaxPackedExceeded + } + + switch Channel(content[0]) { + case PackData: + return content[1:], nil + case ProgressMessage: + _, err := d.Progress.(io.Writer).Write(content[1:]) + return nil, err + case ErrorMessage: + return nil, fmt.Errorf("unexpected error: %s", content[1:]) + default: + return nil, fmt.Errorf("unknown channel %s", content) + } +} + +func (d *Demuxer) getPending() (b []byte) { + if len(d.pending) == 0 { + return nil + } + + content := d.pending + d.pending = nil + + return content +} diff --git a/plumbing/protocol/packp/sideband/demux_test.go b/plumbing/protocol/packp/sideband/demux_test.go new file mode 100644 index 0000000..4814d89 --- /dev/null +++ b/plumbing/protocol/packp/sideband/demux_test.go @@ -0,0 +1,151 @@ +package sideband + +import ( + "bytes" + "errors" + "io" + "io/ioutil" + "testing" + + "gopkg.in/src-d/go-git.v4/plumbing/format/pktline" + + . "gopkg.in/check.v1" +) + +func Test(t *testing.T) { TestingT(t) } + +type SidebandSuite struct{} + +var _ = Suite(&SidebandSuite{}) + +func (s *SidebandSuite) TestDecode(c *C) { + expected := []byte("abcdefghijklmnopqrstuvwxyz") + + buf := bytes.NewBuffer(nil) + e := pktline.NewEncoder(buf) + e.Encode(PackData.WithPayload(expected[0:8])) + e.Encode(PackData.WithPayload(expected[8:16])) + e.Encode(PackData.WithPayload(expected[16:26])) + + content := make([]byte, 26) + d := NewDemuxer(Sideband64k, buf) + n, err := io.ReadFull(d, content) + c.Assert(err, IsNil) + c.Assert(n, Equals, 26) + c.Assert(content, DeepEquals, expected) +} + +func (s *SidebandSuite) TestDecodeMoreThanContain(c *C) { + expected := []byte("abcdefghijklmnopqrstuvwxyz") + + buf := bytes.NewBuffer(nil) + e := pktline.NewEncoder(buf) + e.Encode(PackData.WithPayload(expected)) + + content := make([]byte, 42) + d := NewDemuxer(Sideband64k, buf) + n, err := io.ReadFull(d, content) + c.Assert(err, Equals, io.ErrUnexpectedEOF) + c.Assert(n, Equals, 26) + c.Assert(content[0:26], DeepEquals, expected) +} + +func (s *SidebandSuite) TestDecodeWithError(c *C) { + expected := []byte("abcdefghijklmnopqrstuvwxyz") + + buf := bytes.NewBuffer(nil) + e := pktline.NewEncoder(buf) + e.Encode(PackData.WithPayload(expected[0:8])) + e.Encode(ErrorMessage.WithPayload([]byte{'F', 'O', 'O', '\n'})) + e.Encode(PackData.WithPayload(expected[8:16])) + e.Encode(PackData.WithPayload(expected[16:26])) + + content := make([]byte, 26) + d := NewDemuxer(Sideband64k, buf) + n, err := io.ReadFull(d, content) + c.Assert(err, ErrorMatches, "unexpected error: FOO\n") + c.Assert(n, Equals, 8) + c.Assert(content[0:8], DeepEquals, expected[0:8]) +} + +type mockReader struct{} + +func (r *mockReader) Read([]byte) (int, error) { return 0, errors.New("foo") } + +func (s *SidebandSuite) TestDecodeFromFailingReader(c *C) { + content := make([]byte, 26) + d := NewDemuxer(Sideband64k, &mockReader{}) + n, err := io.ReadFull(d, content) + c.Assert(err, ErrorMatches, "foo") + c.Assert(n, Equals, 0) +} + +func (s *SidebandSuite) TestDecodeWithProgress(c *C) { + expected := []byte("abcdefghijklmnopqrstuvwxyz") + + buf := bytes.NewBuffer(nil) + e := pktline.NewEncoder(buf) + e.Encode(PackData.WithPayload(expected[0:8])) + e.Encode(ProgressMessage.WithPayload([]byte{'F', 'O', 'O', '\n'})) + e.Encode(PackData.WithPayload(expected[8:16])) + e.Encode(PackData.WithPayload(expected[16:26])) + + content := make([]byte, 26) + d := NewDemuxer(Sideband64k, buf) + n, err := io.ReadFull(d, content) + c.Assert(err, IsNil) + c.Assert(n, Equals, 26) + c.Assert(content, DeepEquals, expected) + + progress, err := ioutil.ReadAll(d.Progress) + c.Assert(err, IsNil) + c.Assert(progress, DeepEquals, []byte{'F', 'O', 'O', '\n'}) +} + +func (s *SidebandSuite) TestDecodeWithUnknownChannel(c *C) { + + buf := bytes.NewBuffer(nil) + e := pktline.NewEncoder(buf) + e.Encode([]byte{'4', 'F', 'O', 'O', '\n'}) + + content := make([]byte, 26) + d := NewDemuxer(Sideband64k, buf) + n, err := io.ReadFull(d, content) + c.Assert(err, ErrorMatches, "unknown channel 4FOO\n") + c.Assert(n, Equals, 0) +} + +func (s *SidebandSuite) TestDecodeWithPending(c *C) { + expected := []byte("abcdefghijklmnopqrstuvwxyz") + + buf := bytes.NewBuffer(nil) + e := pktline.NewEncoder(buf) + e.Encode(PackData.WithPayload(expected[0:8])) + e.Encode(PackData.WithPayload(expected[8:16])) + e.Encode(PackData.WithPayload(expected[16:26])) + + content := make([]byte, 13) + d := NewDemuxer(Sideband64k, buf) + n, err := io.ReadFull(d, content) + c.Assert(err, IsNil) + c.Assert(n, Equals, 13) + c.Assert(content, DeepEquals, expected[0:13]) + + n, err = d.Read(content) + c.Assert(err, IsNil) + c.Assert(n, Equals, 13) + c.Assert(content, DeepEquals, expected[13:26]) +} + +func (s *SidebandSuite) TestDecodeErrMaxPacked(c *C) { + buf := bytes.NewBuffer(nil) + e := pktline.NewEncoder(buf) + e.Encode(PackData.WithPayload(bytes.Repeat([]byte{'0'}, MaxPackedSize+1))) + + content := make([]byte, 13) + d := NewDemuxer(Sideband, buf) + n, err := io.ReadFull(d, content) + c.Assert(err, Equals, ErrMaxPackedExceeded) + c.Assert(n, Equals, 0) + +} diff --git a/plumbing/protocol/packp/sideband/doc.go b/plumbing/protocol/packp/sideband/doc.go new file mode 100644 index 0000000..c5d2429 --- /dev/null +++ b/plumbing/protocol/packp/sideband/doc.go @@ -0,0 +1,31 @@ +// Package sideband implements a sideband mutiplex/demultiplexer +package sideband + +// If 'side-band' or 'side-band-64k' capabilities have been specified by +// the client, the server will send the packfile data multiplexed. +// +// Either mode indicates that the packfile data will be streamed broken +// up into packets of up to either 1000 bytes in the case of 'side_band', +// or 65520 bytes in the case of 'side_band_64k'. Each packet is made up +// of a leading 4-byte pkt-line length of how much data is in the packet, +// followed by a 1-byte stream code, followed by the actual data. +// +// The stream code can be one of: +// +// 1 - pack data +// 2 - progress messages +// 3 - fatal error message just before stream aborts +// +// The "side-band-64k" capability came about as a way for newer clients +// that can handle much larger packets to request packets that are +// actually crammed nearly full, while maintaining backward compatibility +// for the older clients. +// +// Further, with side-band and its up to 1000-byte messages, it's actually +// 999 bytes of payload and 1 byte for the stream code. With side-band-64k, +// same deal, you have up to 65519 bytes of data and 1 byte for the stream +// code. +// +// The client MUST send only maximum of one of "side-band" and "side- +// band-64k". Server MUST diagnose it as an error if client requests +// both. diff --git a/plumbing/protocol/packp/sideband/muxer.go b/plumbing/protocol/packp/sideband/muxer.go new file mode 100644 index 0000000..2ab7da8 --- /dev/null +++ b/plumbing/protocol/packp/sideband/muxer.go @@ -0,0 +1,65 @@ +package sideband + +import ( + "io" + + "gopkg.in/src-d/go-git.v4/plumbing/format/pktline" +) + +// Muxer multiplex the packfile along with the progress messages and the error +// information. The multiplex is perform using pktline format. +type Muxer struct { + max int + e *pktline.Encoder +} + +const chLen = 1 + +// NewMuxer returns a new Muxer for the given t that writes on w. +// +// If t is equal to `Sideband` the max pack size is set to MaxPackedSize, in any +// other value is given, max pack is set to MaxPackedSize64k, that is the +// maximum lenght of a line in pktline format. +func NewMuxer(t Type, w io.Writer) *Muxer { + max := MaxPackedSize64k + if t == Sideband { + max = MaxPackedSize + } + + return &Muxer{ + max: max - chLen, + e: pktline.NewEncoder(w), + } +} + +// Write writes p in the PackData channel +func (m *Muxer) Write(p []byte) (int, error) { + return m.WriteChannel(PackData, p) +} + +// WriteChannel writes p in the given channel. This method can be used with any +// channel, but is recommend use it only for the ProgressMessage and +// ErrorMessage channels and use Write for the PackData channel +func (m *Muxer) WriteChannel(t Channel, p []byte) (int, error) { + wrote := 0 + size := len(p) + for wrote < size { + n, err := m.doWrite(t, p[wrote:]) + wrote += n + + if err != nil { + return wrote, err + } + } + + return wrote, nil +} + +func (m *Muxer) doWrite(ch Channel, p []byte) (int, error) { + sz := len(p) + if sz > m.max { + sz = m.max + } + + return sz, m.e.Encode(ch.WithPayload(p[:sz])) +} diff --git a/plumbing/protocol/packp/sideband/muxer_test.go b/plumbing/protocol/packp/sideband/muxer_test.go new file mode 100644 index 0000000..38fc4bd --- /dev/null +++ b/plumbing/protocol/packp/sideband/muxer_test.go @@ -0,0 +1,39 @@ +package sideband + +import ( + "bytes" + + . "gopkg.in/check.v1" +) + +func (s *SidebandSuite) TestMuxerWrite(c *C) { + buf := bytes.NewBuffer(nil) + + m := NewMuxer(Sideband, buf) + + n, err := m.Write(bytes.Repeat([]byte{'F'}, (MaxPackedSize-1)*2)) + c.Assert(err, IsNil) + c.Assert(n, Equals, 1998) + c.Assert(buf.Len(), Equals, 2008) +} + +func (s *SidebandSuite) TestMuxerWriteChannelMultipleChannels(c *C) { + buf := bytes.NewBuffer(nil) + + m := NewMuxer(Sideband, buf) + + n, err := m.WriteChannel(PackData, bytes.Repeat([]byte{'D'}, 4)) + c.Assert(err, IsNil) + c.Assert(n, Equals, 4) + + n, err = m.WriteChannel(ProgressMessage, bytes.Repeat([]byte{'P'}, 4)) + c.Assert(err, IsNil) + c.Assert(n, Equals, 4) + + n, err = m.WriteChannel(PackData, bytes.Repeat([]byte{'D'}, 4)) + c.Assert(err, IsNil) + c.Assert(n, Equals, 4) + + c.Assert(buf.Len(), Equals, 27) + c.Assert(buf.String(), Equals, "0009\x01DDDD0009\x02PPPP0009\x01DDDD") +} -- cgit