diff options
author | Michael Muré <batolettre@gmail.com> | 2022-06-06 12:54:20 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-06-06 12:54:20 +0200 |
commit | ee0bac6b54e28bb52cbe18332fec84bdefb37194 (patch) | |
tree | 9425ff5ef9a54f2dc1ba23149b73d70d84c70923 | |
parent | c2d7b1271f9dc2df3a752b73bc7fd26dbeeb10c0 (diff) | |
parent | 7348fb9edb68ca9142f5d87673da48cef733b3d3 (diff) | |
download | git-bug-ee0bac6b54e28bb52cbe18332fec84bdefb37194.tar.gz |
Merge pull request #813 from MichaelMure/fix-data-race2
Github: fix data race
-rw-r--r-- | bridge/github/client.go | 106 | ||||
-rw-r--r-- | bridge/github/export.go | 15 | ||||
-rw-r--r-- | bridge/github/import_events.go | 40 | ||||
-rw-r--r-- | bridge/github/import_mediator.go | 134 |
4 files changed, 144 insertions, 151 deletions
diff --git a/bridge/github/client.go b/bridge/github/client.go index 10f0a03c..00981a56 100644 --- a/bridge/github/client.go +++ b/bridge/github/client.go @@ -7,8 +7,9 @@ import ( "strings" "time" - "github.com/MichaelMure/git-bug/bridge/core" "github.com/shurcooL/githubv4" + + "github.com/MichaelMure/git-bug/bridge/core" ) var _ Client = &githubv4.Client{} @@ -29,79 +30,69 @@ func newRateLimitHandlerClient(httpClient *http.Client) *rateLimitHandlerClient return &rateLimitHandlerClient{sc: githubv4.NewClient(httpClient)} } -type RateLimitingEvent struct { - msg string -} - -// mutate calls the github api with a graphql mutation and for each rate limiting event it sends an -// export result. +// mutate calls the github api with a graphql mutation and sends a core.ExportResult for each rate limiting event func (c *rateLimitHandlerClient) mutate(ctx context.Context, m interface{}, input githubv4.Input, vars map[string]interface{}, out chan<- core.ExportResult) error { // prepare a closure for the mutation mutFun := func(ctx context.Context) error { return c.sc.Mutate(ctx, m, input, vars) } - limitEvents := make(chan RateLimitingEvent) - defer close(limitEvents) - go func() { - for e := range limitEvents { - select { - case <-ctx.Done(): - return - case out <- core.NewExportRateLimiting(e.msg): - } + callback := func(msg string) { + select { + case <-ctx.Done(): + case out <- core.NewExportRateLimiting(msg): } - }() - return c.callAPIAndRetry(mutFun, ctx, limitEvents) + } + return c.callAPIAndRetry(ctx, mutFun, callback) } -// queryWithLimitEvents calls the github api with a graphql query and it sends rate limiting events -// to a given channel of type RateLimitingEvent. -func (c *rateLimitHandlerClient) queryWithLimitEvents(ctx context.Context, query interface{}, vars map[string]interface{}, limitEvents chan<- RateLimitingEvent) error { - // prepare a closure fot the query +// queryImport calls the github api with a graphql query, and sends an ImportEvent for each rate limiting event +func (c *rateLimitHandlerClient) queryImport(ctx context.Context, query interface{}, vars map[string]interface{}, importEvents chan<- ImportEvent) error { + // prepare a closure for the query queryFun := func(ctx context.Context) error { return c.sc.Query(ctx, query, vars) } - return c.callAPIAndRetry(queryFun, ctx, limitEvents) + callback := func(msg string) { + select { + case <-ctx.Done(): + case importEvents <- RateLimitingEvent{msg}: + } + } + return c.callAPIAndRetry(ctx, queryFun, callback) } -// queryWithImportEvents calls the github api with a graphql query and it sends rate limiting events -// to a given channel of type ImportEvent. -func (c *rateLimitHandlerClient) queryWithImportEvents(ctx context.Context, query interface{}, vars map[string]interface{}, importEvents chan<- ImportEvent) error { - // forward rate limiting events to channel of import events - limitEvents := make(chan RateLimitingEvent) - defer close(limitEvents) - go func() { - for e := range limitEvents { - select { - case <-ctx.Done(): - return - case importEvents <- e: - } +// queryImport calls the github api with a graphql query, and sends a core.ExportResult for each rate limiting event +func (c *rateLimitHandlerClient) queryExport(ctx context.Context, query interface{}, vars map[string]interface{}, out chan<- core.ExportResult) error { + // prepare a closure for the query + queryFun := func(ctx context.Context) error { + return c.sc.Query(ctx, query, vars) + } + callback := func(msg string) { + select { + case <-ctx.Done(): + case out <- core.NewExportRateLimiting(msg): } - }() - return c.queryWithLimitEvents(ctx, query, vars, limitEvents) + } + return c.callAPIAndRetry(ctx, queryFun, callback) } -// queryPrintMsgs calls the github api with a graphql query and it prints for ever rate limiting -// event a message to stdout. +// queryPrintMsgs calls the github api with a graphql query, and prints a message to stdout for every rate limiting event . func (c *rateLimitHandlerClient) queryPrintMsgs(ctx context.Context, query interface{}, vars map[string]interface{}) error { - // print rate limiting events directly to stdout. - limitEvents := make(chan RateLimitingEvent) - defer close(limitEvents) - go func() { - for e := range limitEvents { - fmt.Println(e.msg) - } - }() - return c.queryWithLimitEvents(ctx, query, vars, limitEvents) + // prepare a closure for the query + queryFun := func(ctx context.Context) error { + return c.sc.Query(ctx, query, vars) + } + callback := func(msg string) { + fmt.Println(msg) + } + return c.callAPIAndRetry(ctx, queryFun, callback) } // callAPIAndRetry calls the Github GraphQL API (inderectely through callAPIDealWithLimit) and in // case of error it repeats the request to the Github API. The parameter `apiCall` is intended to be // a closure containing a query or a mutation to the Github GraphQL API. -func (c *rateLimitHandlerClient) callAPIAndRetry(apiCall func(context.Context) error, ctx context.Context, events chan<- RateLimitingEvent) error { +func (c *rateLimitHandlerClient) callAPIAndRetry(ctx context.Context, apiCall func(context.Context) error, rateLimitEvent func(msg string)) error { var err error - if err = c.callAPIDealWithLimit(apiCall, ctx, events); err == nil { + if err = c.callAPIDealWithLimit(ctx, apiCall, rateLimitEvent); err == nil { return nil } // failure; the reason may be temporary network problems or internal errors @@ -117,7 +108,7 @@ func (c *rateLimitHandlerClient) callAPIAndRetry(apiCall func(context.Context) e stop(timer) return ctx.Err() case <-timer.C: - err = c.callAPIDealWithLimit(apiCall, ctx, events) + err = c.callAPIDealWithLimit(ctx, apiCall, rateLimitEvent) if err == nil { return nil } @@ -127,10 +118,10 @@ func (c *rateLimitHandlerClient) callAPIAndRetry(apiCall func(context.Context) e } // callAPIDealWithLimit calls the Github GraphQL API and if the Github API returns a rate limiting -// error, then it waits until the rate limit is reset and it repeats the request to the API. The +// error, then it waits until the rate limit is reset, and it repeats the request to the API. The // parameter `apiCall` is intended to be a closure containing a query or a mutation to the Github // GraphQL API. -func (c *rateLimitHandlerClient) callAPIDealWithLimit(apiCall func(context.Context) error, ctx context.Context, events chan<- RateLimitingEvent) error { +func (c *rateLimitHandlerClient) callAPIDealWithLimit(ctx context.Context, apiCall func(context.Context) error, rateLimitCallback func(msg string)) error { qctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() // call the function fun() @@ -155,11 +146,8 @@ func (c *rateLimitHandlerClient) callAPIDealWithLimit(apiCall func(context.Conte resetTime.String(), ) // Send message about rate limiting event. - select { - case <-ctx.Done(): - return ctx.Err() - case events <- RateLimitingEvent{msg}: - } + rateLimitCallback(msg) + // Pause current goroutine timer := time.NewTimer(time.Until(resetTime)) select { diff --git a/bridge/github/export.go b/bridge/github/export.go index 8c40eb74..35d456c2 100644 --- a/bridge/github/export.go +++ b/bridge/github/export.go @@ -486,23 +486,10 @@ func (ge *githubExporter) cacheGithubLabels(ctx context.Context, gc *rateLimitHa } q := labelsQuery{} - // When performing the queries we have to forward rate limiting events to the - // current channel of export results. - events := make(chan RateLimitingEvent) - defer close(events) - go func() { - for e := range events { - select { - case <-ctx.Done(): - return - case ge.out <- core.NewExportRateLimiting(e.msg): - } - } - }() hasNextPage := true for hasNextPage { - if err := gc.queryWithLimitEvents(ctx, &q, variables, events); err != nil { + if err := gc.queryExport(ctx, &q, variables, ge.out); err != nil { return err } diff --git a/bridge/github/import_events.go b/bridge/github/import_events.go new file mode 100644 index 00000000..7ae86d75 --- /dev/null +++ b/bridge/github/import_events.go @@ -0,0 +1,40 @@ +package github + +import "github.com/shurcooL/githubv4" + +type ImportEvent interface { + isImportEvent() +} + +type RateLimitingEvent struct { + msg string +} + +func (RateLimitingEvent) isImportEvent() {} + +type IssueEvent struct { + issue +} + +func (IssueEvent) isImportEvent() {} + +type IssueEditEvent struct { + issueId githubv4.ID + userContentEdit +} + +func (IssueEditEvent) isImportEvent() {} + +type TimelineEvent struct { + issueId githubv4.ID + timelineItem +} + +func (TimelineEvent) isImportEvent() {} + +type CommentEditEvent struct { + commentId githubv4.ID + userContentEdit +} + +func (CommentEditEvent) isImportEvent() {} diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go index db9f877c..be4e3880 100644 --- a/bridge/github/import_mediator.go +++ b/bridge/github/import_mediator.go @@ -9,6 +9,7 @@ import ( const ( // These values influence how fast the github graphql rate limit is exhausted. + NumIssues = 40 NumIssueEdits = 100 NumTimelineItems = 100 @@ -41,43 +42,6 @@ type importMediator struct { err error } -type ImportEvent interface { - isImportEvent() -} - -func (RateLimitingEvent) isImportEvent() {} - -type IssueEvent struct { - issue -} - -func (IssueEvent) isImportEvent() {} - -type IssueEditEvent struct { - issueId githubv4.ID - userContentEdit -} - -func (IssueEditEvent) isImportEvent() {} - -type TimelineEvent struct { - issueId githubv4.ID - timelineItem -} - -func (TimelineEvent) isImportEvent() {} - -type CommentEditEvent struct { - commentId githubv4.ID - userContentEdit -} - -func (CommentEditEvent) isImportEvent() {} - -func (mm *importMediator) NextImportEvent() ImportEvent { - return <-mm.importEvents -} - func NewImportMediator(ctx context.Context, client *rateLimitHandlerClient, owner, project string, since time.Time) *importMediator { mm := importMediator{ gh: client, @@ -87,48 +51,24 @@ func NewImportMediator(ctx context.Context, client *rateLimitHandlerClient, owne importEvents: make(chan ImportEvent, ChanCapacity), err: nil, } - go func() { - mm.fillImportEvents(ctx) - close(mm.importEvents) - }() - return &mm -} -type varmap map[string]interface{} + go mm.start(ctx) -func newIssueVars(owner, project string, since time.Time) varmap { - return varmap{ - "owner": githubv4.String(owner), - "name": githubv4.String(project), - "issueSince": githubv4.DateTime{Time: since}, - "issueFirst": githubv4.Int(NumIssues), - "issueEditLast": githubv4.Int(NumIssueEdits), - "issueEditBefore": (*githubv4.String)(nil), - "timelineFirst": githubv4.Int(NumTimelineItems), - "timelineAfter": (*githubv4.String)(nil), - "commentEditLast": githubv4.Int(NumCommentEdits), - "commentEditBefore": (*githubv4.String)(nil), - } -} - -func newIssueEditVars() varmap { - return varmap{ - "issueEditLast": githubv4.Int(NumIssueEdits), - } + return &mm } -func newTimelineVars() varmap { - return varmap{ - "timelineFirst": githubv4.Int(NumTimelineItems), - "commentEditLast": githubv4.Int(NumCommentEdits), - "commentEditBefore": (*githubv4.String)(nil), - } +func (mm *importMediator) start(ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + mm.fillImportEvents(ctx) + // Make sure we cancel everything when we are done, instead of relying on the parent context + // This should unblock pending send to the channel if the capacity was reached and avoid a panic/race when closing. + cancel() + close(mm.importEvents) } -func newCommentEditVars() varmap { - return varmap{ - "commentEditLast": githubv4.Int(NumCommentEdits), - } +// NextImportEvent returns the next ImportEvent, or nil if done. +func (mm *importMediator) NextImportEvent() ImportEvent { + return <-mm.importEvents } func (mm *importMediator) Error() error { @@ -138,7 +78,7 @@ func (mm *importMediator) Error() error { func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) { query := userQuery{} vars := varmap{"login": githubv4.String(loginName)} - if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil { + if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil { return nil, err } return &query.User, nil @@ -200,7 +140,7 @@ func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, vars["issueEditBefore"] = cursor } query := issueEditQuery{} - if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil { + if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil { mm.err = err return nil, false } @@ -244,7 +184,7 @@ func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cu vars["timelineAfter"] = cursor } query := timelineQuery{} - if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil { + if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil { mm.err = err return nil, false } @@ -294,7 +234,7 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID vars["commentEditBefore"] = cursor } query := commentEditQuery{} - if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil { + if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil { mm.err = err return nil, false } @@ -313,7 +253,7 @@ func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String vars["issueAfter"] = cursor } query := issueQuery{} - if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil { + if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil { mm.err = err return nil, false } @@ -334,3 +274,41 @@ func reverse(eds []userContentEdit) chan userContentEdit { }() return ret } + +// varmap is a container for Github API's pagination variables +type varmap map[string]interface{} + +func newIssueVars(owner, project string, since time.Time) varmap { + return varmap{ + "owner": githubv4.String(owner), + "name": githubv4.String(project), + "issueSince": githubv4.DateTime{Time: since}, + "issueFirst": githubv4.Int(NumIssues), + "issueEditLast": githubv4.Int(NumIssueEdits), + "issueEditBefore": (*githubv4.String)(nil), + "timelineFirst": githubv4.Int(NumTimelineItems), + "timelineAfter": (*githubv4.String)(nil), + "commentEditLast": githubv4.Int(NumCommentEdits), + "commentEditBefore": (*githubv4.String)(nil), + } +} + +func newIssueEditVars() varmap { + return varmap{ + "issueEditLast": githubv4.Int(NumIssueEdits), + } +} + +func newTimelineVars() varmap { + return varmap{ + "timelineFirst": githubv4.Int(NumTimelineItems), + "commentEditLast": githubv4.Int(NumCommentEdits), + "commentEditBefore": (*githubv4.String)(nil), + } +} + +func newCommentEditVars() varmap { + return varmap{ + "commentEditLast": githubv4.Int(NumCommentEdits), + } +} |