aboutsummaryrefslogtreecommitdiffstats
path: root/bridge/github/client.go
diff options
context:
space:
mode:
authorMichael Muré <batolettre@gmail.com>2022-06-05 15:01:08 +0200
committerMichael Muré <batolettre@gmail.com>2022-06-05 15:13:49 +0200
commit7348fb9edb68ca9142f5d87673da48cef733b3d3 (patch)
tree9d4667ecff1f40491d5825e60293e5c85a725104 /bridge/github/client.go
parent96327c3371ca762d906209c6114092bbf552c0f4 (diff)
downloadgit-bug-7348fb9edb68ca9142f5d87673da48cef733b3d3.tar.gz
github: fix data race when closing event channel
I believe the issue was twofold: When done importing, the calling context is likely still valid, so if the output channel is not read enough and reach capacity, some event producer down the line can be blocked trying to send in that channel. When closing it, this send is still trying to proceed, which is illegal in go. In rateLimitHandlerClient, there was a need to 2 different type of output channel: core.ExportResult and ImportEvent. To do so, the previous code was using a single channel type RateLimitingEvent and a series of goroutines to read/cast/send to the final channel. This could result in more async goroutine being stuck trying to send in an at-capacity channel. Instead, the code now use a simple synchronous callback to directly push to the final output channel. No concurrency needed anymore and the code is simpler. Any of those fixes could have resolved the data race, but both fixes is more correct.
Diffstat (limited to 'bridge/github/client.go')
-rw-r--r--bridge/github/client.go106
1 files changed, 47 insertions, 59 deletions
diff --git a/bridge/github/client.go b/bridge/github/client.go
index 10f0a03c..00981a56 100644
--- a/bridge/github/client.go
+++ b/bridge/github/client.go
@@ -7,8 +7,9 @@ import (
"strings"
"time"
- "github.com/MichaelMure/git-bug/bridge/core"
"github.com/shurcooL/githubv4"
+
+ "github.com/MichaelMure/git-bug/bridge/core"
)
var _ Client = &githubv4.Client{}
@@ -29,79 +30,69 @@ func newRateLimitHandlerClient(httpClient *http.Client) *rateLimitHandlerClient
return &rateLimitHandlerClient{sc: githubv4.NewClient(httpClient)}
}
-type RateLimitingEvent struct {
- msg string
-}
-
-// mutate calls the github api with a graphql mutation and for each rate limiting event it sends an
-// export result.
+// mutate calls the github api with a graphql mutation and sends a core.ExportResult for each rate limiting event
func (c *rateLimitHandlerClient) mutate(ctx context.Context, m interface{}, input githubv4.Input, vars map[string]interface{}, out chan<- core.ExportResult) error {
// prepare a closure for the mutation
mutFun := func(ctx context.Context) error {
return c.sc.Mutate(ctx, m, input, vars)
}
- limitEvents := make(chan RateLimitingEvent)
- defer close(limitEvents)
- go func() {
- for e := range limitEvents {
- select {
- case <-ctx.Done():
- return
- case out <- core.NewExportRateLimiting(e.msg):
- }
+ callback := func(msg string) {
+ select {
+ case <-ctx.Done():
+ case out <- core.NewExportRateLimiting(msg):
}
- }()
- return c.callAPIAndRetry(mutFun, ctx, limitEvents)
+ }
+ return c.callAPIAndRetry(ctx, mutFun, callback)
}
-// queryWithLimitEvents calls the github api with a graphql query and it sends rate limiting events
-// to a given channel of type RateLimitingEvent.
-func (c *rateLimitHandlerClient) queryWithLimitEvents(ctx context.Context, query interface{}, vars map[string]interface{}, limitEvents chan<- RateLimitingEvent) error {
- // prepare a closure fot the query
+// queryImport calls the github api with a graphql query, and sends an ImportEvent for each rate limiting event
+func (c *rateLimitHandlerClient) queryImport(ctx context.Context, query interface{}, vars map[string]interface{}, importEvents chan<- ImportEvent) error {
+ // prepare a closure for the query
queryFun := func(ctx context.Context) error {
return c.sc.Query(ctx, query, vars)
}
- return c.callAPIAndRetry(queryFun, ctx, limitEvents)
+ callback := func(msg string) {
+ select {
+ case <-ctx.Done():
+ case importEvents <- RateLimitingEvent{msg}:
+ }
+ }
+ return c.callAPIAndRetry(ctx, queryFun, callback)
}
-// queryWithImportEvents calls the github api with a graphql query and it sends rate limiting events
-// to a given channel of type ImportEvent.
-func (c *rateLimitHandlerClient) queryWithImportEvents(ctx context.Context, query interface{}, vars map[string]interface{}, importEvents chan<- ImportEvent) error {
- // forward rate limiting events to channel of import events
- limitEvents := make(chan RateLimitingEvent)
- defer close(limitEvents)
- go func() {
- for e := range limitEvents {
- select {
- case <-ctx.Done():
- return
- case importEvents <- e:
- }
+// queryImport calls the github api with a graphql query, and sends a core.ExportResult for each rate limiting event
+func (c *rateLimitHandlerClient) queryExport(ctx context.Context, query interface{}, vars map[string]interface{}, out chan<- core.ExportResult) error {
+ // prepare a closure for the query
+ queryFun := func(ctx context.Context) error {
+ return c.sc.Query(ctx, query, vars)
+ }
+ callback := func(msg string) {
+ select {
+ case <-ctx.Done():
+ case out <- core.NewExportRateLimiting(msg):
}
- }()
- return c.queryWithLimitEvents(ctx, query, vars, limitEvents)
+ }
+ return c.callAPIAndRetry(ctx, queryFun, callback)
}
-// queryPrintMsgs calls the github api with a graphql query and it prints for ever rate limiting
-// event a message to stdout.
+// queryPrintMsgs calls the github api with a graphql query, and prints a message to stdout for every rate limiting event .
func (c *rateLimitHandlerClient) queryPrintMsgs(ctx context.Context, query interface{}, vars map[string]interface{}) error {
- // print rate limiting events directly to stdout.
- limitEvents := make(chan RateLimitingEvent)
- defer close(limitEvents)
- go func() {
- for e := range limitEvents {
- fmt.Println(e.msg)
- }
- }()
- return c.queryWithLimitEvents(ctx, query, vars, limitEvents)
+ // prepare a closure for the query
+ queryFun := func(ctx context.Context) error {
+ return c.sc.Query(ctx, query, vars)
+ }
+ callback := func(msg string) {
+ fmt.Println(msg)
+ }
+ return c.callAPIAndRetry(ctx, queryFun, callback)
}
// callAPIAndRetry calls the Github GraphQL API (inderectely through callAPIDealWithLimit) and in
// case of error it repeats the request to the Github API. The parameter `apiCall` is intended to be
// a closure containing a query or a mutation to the Github GraphQL API.
-func (c *rateLimitHandlerClient) callAPIAndRetry(apiCall func(context.Context) error, ctx context.Context, events chan<- RateLimitingEvent) error {
+func (c *rateLimitHandlerClient) callAPIAndRetry(ctx context.Context, apiCall func(context.Context) error, rateLimitEvent func(msg string)) error {
var err error
- if err = c.callAPIDealWithLimit(apiCall, ctx, events); err == nil {
+ if err = c.callAPIDealWithLimit(ctx, apiCall, rateLimitEvent); err == nil {
return nil
}
// failure; the reason may be temporary network problems or internal errors
@@ -117,7 +108,7 @@ func (c *rateLimitHandlerClient) callAPIAndRetry(apiCall func(context.Context) e
stop(timer)
return ctx.Err()
case <-timer.C:
- err = c.callAPIDealWithLimit(apiCall, ctx, events)
+ err = c.callAPIDealWithLimit(ctx, apiCall, rateLimitEvent)
if err == nil {
return nil
}
@@ -127,10 +118,10 @@ func (c *rateLimitHandlerClient) callAPIAndRetry(apiCall func(context.Context) e
}
// callAPIDealWithLimit calls the Github GraphQL API and if the Github API returns a rate limiting
-// error, then it waits until the rate limit is reset and it repeats the request to the API. The
+// error, then it waits until the rate limit is reset, and it repeats the request to the API. The
// parameter `apiCall` is intended to be a closure containing a query or a mutation to the Github
// GraphQL API.
-func (c *rateLimitHandlerClient) callAPIDealWithLimit(apiCall func(context.Context) error, ctx context.Context, events chan<- RateLimitingEvent) error {
+func (c *rateLimitHandlerClient) callAPIDealWithLimit(ctx context.Context, apiCall func(context.Context) error, rateLimitCallback func(msg string)) error {
qctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
// call the function fun()
@@ -155,11 +146,8 @@ func (c *rateLimitHandlerClient) callAPIDealWithLimit(apiCall func(context.Conte
resetTime.String(),
)
// Send message about rate limiting event.
- select {
- case <-ctx.Done():
- return ctx.Err()
- case events <- RateLimitingEvent{msg}:
- }
+ rateLimitCallback(msg)
+
// Pause current goroutine
timer := time.NewTimer(time.Until(resetTime))
select {