package sideband import ( "errors" "fmt" "io" "gopkg.in/src-d/go-git.v4/plumbing/format/pktline" ) // ErrMaxPackedExceeded returned by Read, if the maximum packed size is exceeded var ErrMaxPackedExceeded = errors.New("max. packed size exceeded") // Progress where the progress information is stored type Progress interface { io.Writer } // Demuxer demultiplex the progress reports and error info interleaved with the // packfile itself. // // A sideband has three different channels the main one, call PackData, contains // the packfile data, the ErrorMessage channel, that contains server errors and // the last one ProgressMessage channel containing information about the ongoing // task happening in the server (optional, can be suppressed sending NoProgress // or Quiet capabilities to the server) // // In order to demultiplex the data stream, method `Read` should be called to // retrieve the PackData channel, the incoming data from the ProgressMessage is // written at `Progress` (if any), if any message is retrieved from the // ErrorMessage channel an error is returned and we can assume that the // connection has been closed. type Demuxer struct { t Type r io.Reader s *pktline.Scanner max int pending []byte // Progress is where the progress messages are stored Progress Progress } // NewDemuxer returns a new Demuxer for the given t and read from r func NewDemuxer(t Type, r io.Reader) *Demuxer { max := MaxPackedSize64k if t == Sideband { max = MaxPackedSize } return &Demuxer{ t: t, r: r, max: max, s: pktline.NewScanner(r), } } // Read reads up to len(p) bytes from the PackData channel into p, an error can // be return if an error happens when reading or if a message is sent in the // ErrorMessage channel. // // When a ProgressMessage is read, is not copy to b, instead of this is written // to the Progress func (d *Demuxer) Read(b []byte) (n int, err error) { var read, req int req = len(b) for read < req { n, err := d.doRead(b[read:req]) read += n if err != nil { return read, err } } return read, nil } func (d *Demuxer) doRead(b []byte) (int, error) { read, err := d.nextPackData() size := len(read) wanted := len(b) if size > wanted { d.pending = read[wanted:] } if wanted > size { wanted = size } size = copy(b, read[:wanted]) return size, err } func (d *Demuxer) nextPackData() ([]byte, error) { content := d.getPending() if len(content) != 0 { return content, nil } if !d.s.Scan() { if err := d.s.Err(); err != nil { return nil, err } return nil, io.EOF } content = d.s.Bytes() size := len(content) if size == 0 { return nil, nil } else if size > d.max { return nil, ErrMaxPackedExceeded } switch Channel(content[0]) { case PackData: return content[1:], nil case ProgressMessage: if d.Progress != nil { _, err := d.Progress.Write(content[1:]) return nil, err } case ErrorMessage: return nil, fmt.Errorf("unexpected error: %s", content[1:]) default: return nil, fmt.Errorf("unknown channel %s", content) } return nil, nil } func (d *Demuxer) getPending() (b []byte) { if len(d.pending) == 0 { return nil } content := d.pending d.pending = nil return content }