diff options
author | Michael Muré <batolettre@gmail.com> | 2022-06-05 15:01:08 +0200 |
---|---|---|
committer | Michael Muré <batolettre@gmail.com> | 2022-06-05 15:13:49 +0200 |
commit | 7348fb9edb68ca9142f5d87673da48cef733b3d3 (patch) | |
tree | 9d4667ecff1f40491d5825e60293e5c85a725104 | |
parent | 96327c3371ca762d906209c6114092bbf552c0f4 (diff) | |
download | git-bug-7348fb9edb68ca9142f5d87673da48cef733b3d3.tar.gz |
github: fix data race when closing event channel
I believe the issue was twofold:
When done importing, the calling context is likely still valid, so if the output channel is not read enough and reach capacity, some event producer down the line can be blocked trying to send in that channel. When closing it, this send is still trying to proceed, which is illegal in go.
In rateLimitHandlerClient, there was a need to 2 different type of output channel: core.ExportResult and ImportEvent. To do so, the previous code was using a single channel type RateLimitingEvent and a series of goroutines to read/cast/send to the final channel. This could result in more async goroutine being stuck trying to send in an at-capacity channel. Instead, the code now use a simple synchronous callback to directly push to the final output channel. No concurrency needed anymore and the code is simpler.
Any of those fixes could have resolved the data race, but both fixes is more correct.
-rw-r--r-- | bridge/github/client.go | 106 | ||||
-rw-r--r-- | bridge/github/export.go | 15 | ||||
-rw-r--r-- | bridge/github/import_events.go | 40 | ||||
-rw-r--r-- | bridge/github/import_mediator.go | 134 |
4 files changed, 144 insertions, 151 deletions
diff --git a/bridge/github/client.go b/bridge/github/client.go index 10f0a03c..00981a56 100644 --- a/bridge/github/client.go +++ b/bridge/github/client.go @@ -7,8 +7,9 @@ import ( "strings" "time" - "github.com/MichaelMure/git-bug/bridge/core" "github.com/shurcooL/githubv4" + + "github.com/MichaelMure/git-bug/bridge/core" ) var _ Client = &githubv4.Client{} @@ -29,79 +30,69 @@ func newRateLimitHandlerClient(httpClient *http.Client) *rateLimitHandlerClient return &rateLimitHandlerClient{sc: githubv4.NewClient(httpClient)} } -type RateLimitingEvent struct { - msg string -} - -// mutate calls the github api with a graphql mutation and for each rate limiting event it sends an -// export result. +// mutate calls the github api with a graphql mutation and sends a core.ExportResult for each rate limiting event func (c *rateLimitHandlerClient) mutate(ctx context.Context, m interface{}, input githubv4.Input, vars map[string]interface{}, out chan<- core.ExportResult) error { // prepare a closure for the mutation mutFun := func(ctx context.Context) error { return c.sc.Mutate(ctx, m, input, vars) } - limitEvents := make(chan RateLimitingEvent) - defer close(limitEvents) - go func() { - for e := range limitEvents { - select { - case <-ctx.Done(): - return - case out <- core.NewExportRateLimiting(e.msg): - } + callback := func(msg string) { + select { + case <-ctx.Done(): + case out <- core.NewExportRateLimiting(msg): } - }() - return c.callAPIAndRetry(mutFun, ctx, limitEvents) + } + return c.callAPIAndRetry(ctx, mutFun, callback) } -// queryWithLimitEvents calls the github api with a graphql query and it sends rate limiting events -// to a given channel of type RateLimitingEvent. -func (c *rateLimitHandlerClient) queryWithLimitEvents(ctx context.Context, query interface{}, vars map[string]interface{}, limitEvents chan<- RateLimitingEvent) error { - // prepare a closure fot the query +// queryImport calls the github api with a graphql query, and sends an ImportEvent for each rate limiting event +func (c *rateLimitHandlerClient) queryImport(ctx context.Context, query interface{}, vars map[string]interface{}, importEvents chan<- ImportEvent) error { + // prepare a closure for the query queryFun := func(ctx context.Context) error { return c.sc.Query(ctx, query, vars) } - return c.callAPIAndRetry(queryFun, ctx, limitEvents) + callback := func(msg string) { + select { + case <-ctx.Done(): + case importEvents <- RateLimitingEvent{msg}: + } + } + return c.callAPIAndRetry(ctx, queryFun, callback) } -// queryWithImportEvents calls the github api with a graphql query and it sends rate limiting events -// to a given channel of type ImportEvent. -func (c *rateLimitHandlerClient) queryWithImportEvents(ctx context.Context, query interface{}, vars map[string]interface{}, importEvents chan<- ImportEvent) error { - // forward rate limiting events to channel of import events - limitEvents := make(chan RateLimitingEvent) - defer close(limitEvents) - go func() { - for e := range limitEvents { - select { - case <-ctx.Done(): - return - case importEvents <- e: - } +// queryImport calls the github api with a graphql query, and sends a core.ExportResult for each rate limiting event +func (c *rateLimitHandlerClient) queryExport(ctx context.Context, query interface{}, vars map[string]interface{}, out chan<- core.ExportResult) error { + // prepare a closure for the query + queryFun := func(ctx context.Context) error { + return c.sc.Query(ctx, query, vars) + } + callback := func(msg string) { + select { + case <-ctx.Done(): + case out <- core.NewExportRateLimiting(msg): } - }() - return c.queryWithLimitEvents(ctx, query, vars, limitEvents) + } + return c.callAPIAndRetry(ctx, queryFun, callback) } -// queryPrintMsgs calls the github api with a graphql query and it prints for ever rate limiting -// event a message to stdout. +// queryPrintMsgs calls the github api with a graphql query, and prints a message to stdout for every rate limiting event . func (c *rateLimitHandlerClient) queryPrintMsgs(ctx context.Context, query interface{}, vars map[string]interface{}) error { - // print rate limiting events directly to stdout. - limitEvents := make(chan RateLimitingEvent) - defer close(limitEvents) - go func() { - for e := range limitEvents { - fmt.Println(e.msg) - } - }() - return c.queryWithLimitEvents(ctx, query, vars, limitEvents) + // prepare a closure for the query + queryFun := func(ctx context.Context) error { + return c.sc.Query(ctx, query, vars) + } + callback := func(msg string) { + fmt.Println(msg) + } + return c.callAPIAndRetry(ctx, queryFun, callback) } // callAPIAndRetry calls the Github GraphQL API (inderectely through callAPIDealWithLimit) and in // case of error it repeats the request to the Github API. The parameter `apiCall` is intended to be // a closure containing a query or a mutation to the Github GraphQL API. -func (c *rateLimitHandlerClient) callAPIAndRetry(apiCall func(context.Context) error, ctx context.Context, events chan<- RateLimitingEvent) error { +func (c *rateLimitHandlerClient) callAPIAndRetry(ctx context.Context, apiCall func(context.Context) error, rateLimitEvent func(msg string)) error { var err error - if err = c.callAPIDealWithLimit(apiCall, ctx, events); err == nil { + if err = c.callAPIDealWithLimit(ctx, apiCall, rateLimitEvent); err == nil { return nil } // failure; the reason may be temporary network problems or internal errors @@ -117,7 +108,7 @@ func (c *rateLimitHandlerClient) callAPIAndRetry(apiCall func(context.Context) e stop(timer) return ctx.Err() case <-timer.C: - err = c.callAPIDealWithLimit(apiCall, ctx, events) + err = c.callAPIDealWithLimit(ctx, apiCall, rateLimitEvent) if err == nil { return nil } @@ -127,10 +118,10 @@ func (c *rateLimitHandlerClient) callAPIAndRetry(apiCall func(context.Context) e } // callAPIDealWithLimit calls the Github GraphQL API and if the Github API returns a rate limiting -// error, then it waits until the rate limit is reset and it repeats the request to the API. The +// error, then it waits until the rate limit is reset, and it repeats the request to the API. The // parameter `apiCall` is intended to be a closure containing a query or a mutation to the Github // GraphQL API. -func (c *rateLimitHandlerClient) callAPIDealWithLimit(apiCall func(context.Context) error, ctx context.Context, events chan<- RateLimitingEvent) error { +func (c *rateLimitHandlerClient) callAPIDealWithLimit(ctx context.Context, apiCall func(context.Context) error, rateLimitCallback func(msg string)) error { qctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() // call the function fun() @@ -155,11 +146,8 @@ func (c *rateLimitHandlerClient) callAPIDealWithLimit(apiCall func(context.Conte resetTime.String(), ) // Send message about rate limiting event. - select { - case <-ctx.Done(): - return ctx.Err() - case events <- RateLimitingEvent{msg}: - } + rateLimitCallback(msg) + // Pause current goroutine timer := time.NewTimer(time.Until(resetTime)) select { diff --git a/bridge/github/export.go b/bridge/github/export.go index 8c40eb74..35d456c2 100644 --- a/bridge/github/export.go +++ b/bridge/github/export.go @@ -486,23 +486,10 @@ func (ge *githubExporter) cacheGithubLabels(ctx context.Context, gc *rateLimitHa } q := labelsQuery{} - // When performing the queries we have to forward rate limiting events to the - // current channel of export results. - events := make(chan RateLimitingEvent) - defer close(events) - go func() { - for e := range events { - select { - case <-ctx.Done(): - return - case ge.out <- core.NewExportRateLimiting(e.msg): - } - } - }() hasNextPage := true for hasNextPage { - if err := gc.queryWithLimitEvents(ctx, &q, variables, events); err != nil { + if err := gc.queryExport(ctx, &q, variables, ge.out); err != nil { return err } diff --git a/bridge/github/import_events.go b/bridge/github/import_events.go new file mode 100644 index 00000000..7ae86d75 --- /dev/null +++ b/bridge/github/import_events.go @@ -0,0 +1,40 @@ +package github + +import "github.com/shurcooL/githubv4" + +type ImportEvent interface { + isImportEvent() +} + +type RateLimitingEvent struct { + msg string +} + +func (RateLimitingEvent) isImportEvent() {} + +type IssueEvent struct { + issue +} + +func (IssueEvent) isImportEvent() {} + +type IssueEditEvent struct { + issueId githubv4.ID + userContentEdit +} + +func (IssueEditEvent) isImportEvent() {} + +type TimelineEvent struct { + issueId githubv4.ID + timelineItem +} + +func (TimelineEvent) isImportEvent() {} + +type CommentEditEvent struct { + commentId githubv4.ID + userContentEdit +} + +func (CommentEditEvent) isImportEvent() {} diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go index db9f877c..be4e3880 100644 --- a/bridge/github/import_mediator.go +++ b/bridge/github/import_mediator.go @@ -9,6 +9,7 @@ import ( const ( // These values influence how fast the github graphql rate limit is exhausted. + NumIssues = 40 NumIssueEdits = 100 NumTimelineItems = 100 @@ -41,43 +42,6 @@ type importMediator struct { err error } -type ImportEvent interface { - isImportEvent() -} - -func (RateLimitingEvent) isImportEvent() {} - -type IssueEvent struct { - issue -} - -func (IssueEvent) isImportEvent() {} - -type IssueEditEvent struct { - issueId githubv4.ID - userContentEdit -} - -func (IssueEditEvent) isImportEvent() {} - -type TimelineEvent struct { - issueId githubv4.ID - timelineItem -} - -func (TimelineEvent) isImportEvent() {} - -type CommentEditEvent struct { - commentId githubv4.ID - userContentEdit -} - -func (CommentEditEvent) isImportEvent() {} - -func (mm *importMediator) NextImportEvent() ImportEvent { - return <-mm.importEvents -} - func NewImportMediator(ctx context.Context, client *rateLimitHandlerClient, owner, project string, since time.Time) *importMediator { mm := importMediator{ gh: client, @@ -87,48 +51,24 @@ func NewImportMediator(ctx context.Context, client *rateLimitHandlerClient, owne importEvents: make(chan ImportEvent, ChanCapacity), err: nil, } - go func() { - mm.fillImportEvents(ctx) - close(mm.importEvents) - }() - return &mm -} -type varmap map[string]interface{} + go mm.start(ctx) -func newIssueVars(owner, project string, since time.Time) varmap { - return varmap{ - "owner": githubv4.String(owner), - "name": githubv4.String(project), - "issueSince": githubv4.DateTime{Time: since}, - "issueFirst": githubv4.Int(NumIssues), - "issueEditLast": githubv4.Int(NumIssueEdits), - "issueEditBefore": (*githubv4.String)(nil), - "timelineFirst": githubv4.Int(NumTimelineItems), - "timelineAfter": (*githubv4.String)(nil), - "commentEditLast": githubv4.Int(NumCommentEdits), - "commentEditBefore": (*githubv4.String)(nil), - } -} - -func newIssueEditVars() varmap { - return varmap{ - "issueEditLast": githubv4.Int(NumIssueEdits), - } + return &mm } -func newTimelineVars() varmap { - return varmap{ - "timelineFirst": githubv4.Int(NumTimelineItems), - "commentEditLast": githubv4.Int(NumCommentEdits), - "commentEditBefore": (*githubv4.String)(nil), - } +func (mm *importMediator) start(ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + mm.fillImportEvents(ctx) + // Make sure we cancel everything when we are done, instead of relying on the parent context + // This should unblock pending send to the channel if the capacity was reached and avoid a panic/race when closing. + cancel() + close(mm.importEvents) } -func newCommentEditVars() varmap { - return varmap{ - "commentEditLast": githubv4.Int(NumCommentEdits), - } +// NextImportEvent returns the next ImportEvent, or nil if done. +func (mm *importMediator) NextImportEvent() ImportEvent { + return <-mm.importEvents } func (mm *importMediator) Error() error { @@ -138,7 +78,7 @@ func (mm *importMediator) Error() error { func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) { query := userQuery{} vars := varmap{"login": githubv4.String(loginName)} - if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil { + if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil { return nil, err } return &query.User, nil @@ -200,7 +140,7 @@ func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, vars["issueEditBefore"] = cursor } query := issueEditQuery{} - if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil { + if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil { mm.err = err return nil, false } @@ -244,7 +184,7 @@ func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cu vars["timelineAfter"] = cursor } query := timelineQuery{} - if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil { + if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil { mm.err = err return nil, false } @@ -294,7 +234,7 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID vars["commentEditBefore"] = cursor } query := commentEditQuery{} - if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil { + if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil { mm.err = err return nil, false } @@ -313,7 +253,7 @@ func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String vars["issueAfter"] = cursor } query := issueQuery{} - if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil { + if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil { mm.err = err return nil, false } @@ -334,3 +274,41 @@ func reverse(eds []userContentEdit) chan userContentEdit { }() return ret } + +// varmap is a container for Github API's pagination variables +type varmap map[string]interface{} + +func newIssueVars(owner, project string, since time.Time) varmap { + return varmap{ + "owner": githubv4.String(owner), + "name": githubv4.String(project), + "issueSince": githubv4.DateTime{Time: since}, + "issueFirst": githubv4.Int(NumIssues), + "issueEditLast": githubv4.Int(NumIssueEdits), + "issueEditBefore": (*githubv4.String)(nil), + "timelineFirst": githubv4.Int(NumTimelineItems), + "timelineAfter": (*githubv4.String)(nil), + "commentEditLast": githubv4.Int(NumCommentEdits), + "commentEditBefore": (*githubv4.String)(nil), + } +} + +func newIssueEditVars() varmap { + return varmap{ + "issueEditLast": githubv4.Int(NumIssueEdits), + } +} + +func newTimelineVars() varmap { + return varmap{ + "timelineFirst": githubv4.Int(NumTimelineItems), + "commentEditLast": githubv4.Int(NumCommentEdits), + "commentEditBefore": (*githubv4.String)(nil), + } +} + +func newCommentEditVars() varmap { + return varmap{ + "commentEditLast": githubv4.Int(NumCommentEdits), + } +} |