diff options
Diffstat (limited to 'vendor/github.com/vektah/gqlgen/handler/websocket.go')
-rw-r--r-- | vendor/github.com/vektah/gqlgen/handler/websocket.go | 245 |
1 files changed, 245 insertions, 0 deletions
diff --git a/vendor/github.com/vektah/gqlgen/handler/websocket.go b/vendor/github.com/vektah/gqlgen/handler/websocket.go new file mode 100644 index 00000000..e80748ca --- /dev/null +++ b/vendor/github.com/vektah/gqlgen/handler/websocket.go @@ -0,0 +1,245 @@ +package handler + +import ( + "context" + "encoding/json" + "fmt" + "log" + "net/http" + "sync" + + "github.com/gorilla/websocket" + "github.com/vektah/gqlgen/graphql" + "github.com/vektah/gqlgen/neelance/errors" + "github.com/vektah/gqlgen/neelance/query" + "github.com/vektah/gqlgen/neelance/validation" +) + +const ( + connectionInitMsg = "connection_init" // Client -> Server + connectionTerminateMsg = "connection_terminate" // Client -> Server + startMsg = "start" // Client -> Server + stopMsg = "stop" // Client -> Server + connectionAckMsg = "connection_ack" // Server -> Client + connectionErrorMsg = "connection_error" // Server -> Client + dataMsg = "data" // Server -> Client + errorMsg = "error" // Server -> Client + completeMsg = "complete" // Server -> Client + //connectionKeepAliveMsg = "ka" // Server -> Client TODO: keepalives +) + +type operationMessage struct { + Payload json.RawMessage `json:"payload,omitempty"` + ID string `json:"id,omitempty"` + Type string `json:"type"` +} + +type wsConnection struct { + ctx context.Context + conn *websocket.Conn + exec graphql.ExecutableSchema + active map[string]context.CancelFunc + mu sync.Mutex + cfg *Config +} + +func connectWs(exec graphql.ExecutableSchema, w http.ResponseWriter, r *http.Request, cfg *Config) { + ws, err := cfg.upgrader.Upgrade(w, r, http.Header{ + "Sec-Websocket-Protocol": []string{"graphql-ws"}, + }) + if err != nil { + log.Printf("unable to upgrade %T to websocket %s: ", w, err.Error()) + sendErrorf(w, http.StatusBadRequest, "unable to upgrade") + return + } + + conn := wsConnection{ + active: map[string]context.CancelFunc{}, + exec: exec, + conn: ws, + ctx: r.Context(), + cfg: cfg, + } + + if !conn.init() { + return + } + + conn.run() +} + +func (c *wsConnection) init() bool { + message := c.readOp() + if message == nil { + c.close(websocket.CloseProtocolError, "decoding error") + return false + } + + switch message.Type { + case connectionInitMsg: + c.write(&operationMessage{Type: connectionAckMsg}) + case connectionTerminateMsg: + c.close(websocket.CloseNormalClosure, "terminated") + return false + default: + c.sendConnectionError("unexpected message %s", message.Type) + c.close(websocket.CloseProtocolError, "unexpected message") + return false + } + + return true +} + +func (c *wsConnection) write(msg *operationMessage) { + c.mu.Lock() + c.conn.WriteJSON(msg) + c.mu.Unlock() +} + +func (c *wsConnection) run() { + for { + message := c.readOp() + if message == nil { + return + } + + switch message.Type { + case startMsg: + if !c.subscribe(message) { + return + } + case stopMsg: + c.mu.Lock() + closer := c.active[message.ID] + c.mu.Unlock() + if closer == nil { + c.sendError(message.ID, errors.Errorf("%s is not running, cannot stop", message.ID)) + continue + } + + closer() + case connectionTerminateMsg: + c.close(websocket.CloseNormalClosure, "terminated") + return + default: + c.sendConnectionError("unexpected message %s", message.Type) + c.close(websocket.CloseProtocolError, "unexpected message") + return + } + } +} + +func (c *wsConnection) subscribe(message *operationMessage) bool { + var reqParams params + if err := json.Unmarshal(message.Payload, &reqParams); err != nil { + c.sendConnectionError("invalid json") + return false + } + + doc, qErr := query.Parse(reqParams.Query) + if qErr != nil { + c.sendError(message.ID, qErr) + return true + } + + errs := validation.Validate(c.exec.Schema(), doc) + if len(errs) != 0 { + c.sendError(message.ID, errs...) + return true + } + + op, err := doc.GetOperation(reqParams.OperationName) + if err != nil { + c.sendError(message.ID, errors.Errorf("%s", err.Error())) + return true + } + + reqCtx := c.cfg.newRequestContext(doc, reqParams.Query, reqParams.Variables) + ctx := graphql.WithRequestContext(c.ctx, reqCtx) + + if op.Type != query.Subscription { + var result *graphql.Response + if op.Type == query.Query { + result = c.exec.Query(ctx, op) + } else { + result = c.exec.Mutation(ctx, op) + } + + c.sendData(message.ID, result) + c.write(&operationMessage{ID: message.ID, Type: completeMsg}) + return true + } + + ctx, cancel := context.WithCancel(ctx) + c.mu.Lock() + c.active[message.ID] = cancel + c.mu.Unlock() + go func() { + defer func() { + if r := recover(); r != nil { + userErr := reqCtx.Recover(ctx, r) + c.sendError(message.ID, &errors.QueryError{Message: userErr.Error()}) + } + }() + next := c.exec.Subscription(ctx, op) + for result := next(); result != nil; result = next() { + c.sendData(message.ID, result) + } + + c.write(&operationMessage{ID: message.ID, Type: completeMsg}) + + c.mu.Lock() + delete(c.active, message.ID) + c.mu.Unlock() + cancel() + }() + + return true +} + +func (c *wsConnection) sendData(id string, response *graphql.Response) { + b, err := json.Marshal(response) + if err != nil { + c.sendError(id, errors.Errorf("unable to encode json response: %s", err.Error())) + return + } + + c.write(&operationMessage{Type: dataMsg, ID: id, Payload: b}) +} + +func (c *wsConnection) sendError(id string, errors ...*errors.QueryError) { + var errs []error + for _, err := range errors { + errs = append(errs, err) + } + b, err := json.Marshal(errs) + if err != nil { + panic(err) + } + c.write(&operationMessage{Type: errorMsg, ID: id, Payload: b}) +} + +func (c *wsConnection) sendConnectionError(format string, args ...interface{}) { + b, err := json.Marshal(&graphql.Error{Message: fmt.Sprintf(format, args...)}) + if err != nil { + panic(err) + } + + c.write(&operationMessage{Type: connectionErrorMsg, Payload: b}) +} + +func (c *wsConnection) readOp() *operationMessage { + message := operationMessage{} + if err := c.conn.ReadJSON(&message); err != nil { + c.sendConnectionError("invalid json") + return nil + } + return &message +} + +func (c *wsConnection) close(closeCode int, message string) { + c.mu.Lock() + _ = c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(closeCode, message)) + c.mu.Unlock() + _ = c.conn.Close() +} |