From 247e1a865db29a3189acfd89cde776a52a7ebaac Mon Sep 17 00:00:00 2001 From: rng-dynamics <73444470+rng-dynamics@users.noreply.github.com> Date: Tue, 14 Sep 2021 22:22:28 +0200 Subject: feature: Github bridge mutation rate limit (#694) Unified handling of rate limiting of github graphql api --- bridge/core/export.go | 12 +++ bridge/github/client.go | 188 +++++++++++++++++++++++++++++++++++++++ bridge/github/config.go | 2 +- bridge/github/export.go | 119 ++++++++++++------------- bridge/github/github.go | 6 +- bridge/github/import.go | 3 +- bridge/github/import_mediator.go | 116 ++---------------------- bridge/github/import_query.go | 52 +++-------- 8 files changed, 278 insertions(+), 220 deletions(-) create mode 100644 bridge/github/client.go (limited to 'bridge') diff --git a/bridge/core/export.go b/bridge/core/export.go index fa531c5f..6e0612fa 100644 --- a/bridge/core/export.go +++ b/bridge/core/export.go @@ -31,6 +31,9 @@ const ( // but not severe enough to consider the export a failure. ExportEventWarning + // The export system (web API) has reached a rate limit + ExportEventRateLimiting + // Error happened during export ExportEventError ) @@ -74,6 +77,8 @@ func (er ExportResult) String() string { return fmt.Sprintf("warning at %s: %s", er.ID, er.Err.Error()) } return fmt.Sprintf("warning: %s", er.Err.Error()) + case ExportEventRateLimiting: + return fmt.Sprintf("rate limiting: %s", er.Reason) default: panic("unknown export result") @@ -145,3 +150,10 @@ func NewExportTitleEdition(id entity.Id) ExportResult { Event: ExportEventTitleEdition, } } + +func NewExportRateLimiting(msg string) ExportResult { + return ExportResult{ + Reason: msg, + Event: ExportEventRateLimiting, + } +} diff --git a/bridge/github/client.go b/bridge/github/client.go new file mode 100644 index 00000000..10f0a03c --- /dev/null +++ b/bridge/github/client.go @@ -0,0 +1,188 @@ +package github + +import ( + "context" + "fmt" + "net/http" + "strings" + "time" + + "github.com/MichaelMure/git-bug/bridge/core" + "github.com/shurcooL/githubv4" +) + +var _ Client = &githubv4.Client{} + +// Client is an interface conforming with githubv4.Client +type Client interface { + Mutate(context.Context, interface{}, githubv4.Input, map[string]interface{}) error + Query(context.Context, interface{}, map[string]interface{}) error +} + +// rateLimitHandlerClient wrapps the Github client and adds improved error handling and handling of +// Github's GraphQL rate limit. +type rateLimitHandlerClient struct { + sc Client +} + +func newRateLimitHandlerClient(httpClient *http.Client) *rateLimitHandlerClient { + return &rateLimitHandlerClient{sc: githubv4.NewClient(httpClient)} +} + +type RateLimitingEvent struct { + msg string +} + +// mutate calls the github api with a graphql mutation and for each rate limiting event it sends an +// export result. +func (c *rateLimitHandlerClient) mutate(ctx context.Context, m interface{}, input githubv4.Input, vars map[string]interface{}, out chan<- core.ExportResult) error { + // prepare a closure for the mutation + mutFun := func(ctx context.Context) error { + return c.sc.Mutate(ctx, m, input, vars) + } + limitEvents := make(chan RateLimitingEvent) + defer close(limitEvents) + go func() { + for e := range limitEvents { + select { + case <-ctx.Done(): + return + case out <- core.NewExportRateLimiting(e.msg): + } + } + }() + return c.callAPIAndRetry(mutFun, ctx, limitEvents) +} + +// queryWithLimitEvents calls the github api with a graphql query and it sends rate limiting events +// to a given channel of type RateLimitingEvent. +func (c *rateLimitHandlerClient) queryWithLimitEvents(ctx context.Context, query interface{}, vars map[string]interface{}, limitEvents chan<- RateLimitingEvent) error { + // prepare a closure fot the query + queryFun := func(ctx context.Context) error { + return c.sc.Query(ctx, query, vars) + } + return c.callAPIAndRetry(queryFun, ctx, limitEvents) +} + +// queryWithImportEvents calls the github api with a graphql query and it sends rate limiting events +// to a given channel of type ImportEvent. +func (c *rateLimitHandlerClient) queryWithImportEvents(ctx context.Context, query interface{}, vars map[string]interface{}, importEvents chan<- ImportEvent) error { + // forward rate limiting events to channel of import events + limitEvents := make(chan RateLimitingEvent) + defer close(limitEvents) + go func() { + for e := range limitEvents { + select { + case <-ctx.Done(): + return + case importEvents <- e: + } + } + }() + return c.queryWithLimitEvents(ctx, query, vars, limitEvents) +} + +// queryPrintMsgs calls the github api with a graphql query and it prints for ever rate limiting +// event a message to stdout. +func (c *rateLimitHandlerClient) queryPrintMsgs(ctx context.Context, query interface{}, vars map[string]interface{}) error { + // print rate limiting events directly to stdout. + limitEvents := make(chan RateLimitingEvent) + defer close(limitEvents) + go func() { + for e := range limitEvents { + fmt.Println(e.msg) + } + }() + return c.queryWithLimitEvents(ctx, query, vars, limitEvents) +} + +// callAPIAndRetry calls the Github GraphQL API (inderectely through callAPIDealWithLimit) and in +// case of error it repeats the request to the Github API. The parameter `apiCall` is intended to be +// a closure containing a query or a mutation to the Github GraphQL API. +func (c *rateLimitHandlerClient) callAPIAndRetry(apiCall func(context.Context) error, ctx context.Context, events chan<- RateLimitingEvent) error { + var err error + if err = c.callAPIDealWithLimit(apiCall, ctx, events); err == nil { + return nil + } + // failure; the reason may be temporary network problems or internal errors + // on the github servers. Internal errors on the github servers are quite common. + // Retry + retries := 3 + for i := 0; i < retries; i++ { + // wait a few seconds before retry + sleepTime := time.Duration(8*(i+1)) * time.Second + timer := time.NewTimer(sleepTime) + select { + case <-ctx.Done(): + stop(timer) + return ctx.Err() + case <-timer.C: + err = c.callAPIDealWithLimit(apiCall, ctx, events) + if err == nil { + return nil + } + } + } + return err +} + +// callAPIDealWithLimit calls the Github GraphQL API and if the Github API returns a rate limiting +// error, then it waits until the rate limit is reset and it repeats the request to the API. The +// parameter `apiCall` is intended to be a closure containing a query or a mutation to the Github +// GraphQL API. +func (c *rateLimitHandlerClient) callAPIDealWithLimit(apiCall func(context.Context) error, ctx context.Context, events chan<- RateLimitingEvent) error { + qctx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + // call the function fun() + err := apiCall(qctx) + if err == nil { + return nil + } + // matching the error string + if strings.Contains(err.Error(), "API rate limit exceeded") { + // a rate limit error + qctx, cancel = context.WithTimeout(ctx, defaultTimeout) + defer cancel() + // Use a separate query to get Github rate limiting information. + limitQuery := rateLimitQuery{} + if err := c.sc.Query(qctx, &limitQuery, map[string]interface{}{}); err != nil { + return err + } + // Get the time when Github will reset the rate limit of their API. + resetTime := limitQuery.RateLimit.ResetAt.Time + msg := fmt.Sprintf( + "Github GraphQL API rate limit. This process will sleep until %s.", + resetTime.String(), + ) + // Send message about rate limiting event. + select { + case <-ctx.Done(): + return ctx.Err() + case events <- RateLimitingEvent{msg}: + } + // Pause current goroutine + timer := time.NewTimer(time.Until(resetTime)) + select { + case <-ctx.Done(): + stop(timer) + return ctx.Err() + case <-timer.C: + } + // call the function apiCall() again + qctx, cancel = context.WithTimeout(ctx, defaultTimeout) + defer cancel() + err = apiCall(qctx) + return err // might be nil + } else { + return err + } +} + +func stop(t *time.Timer) { + if !t.Stop() { + select { + case <-t.C: + default: + } + } +} diff --git a/bridge/github/config.go b/bridge/github/config.go index 9889f403..3dfbd14b 100644 --- a/bridge/github/config.go +++ b/bridge/github/config.go @@ -563,7 +563,7 @@ func getLoginFromToken(token *auth.Token) (string, error) { var q loginQuery - err := client.Query(ctx, &q, nil) + err := client.queryPrintMsgs(ctx, &q, nil) if err != nil { return "", err } diff --git a/bridge/github/export.go b/bridge/github/export.go index 264f2a23..8c40eb74 100644 --- a/bridge/github/export.go +++ b/bridge/github/export.go @@ -32,12 +32,12 @@ type githubExporter struct { conf core.Configuration // cache identities clients - identityClient map[entity.Id]*githubv4.Client + identityClient map[entity.Id]*rateLimitHandlerClient // the client to use for non user-specific queries // it's the client associated to the "default-login" config // used for the github V4 API (graphql) - defaultClient *githubv4.Client + defaultClient *rateLimitHandlerClient // the token of the default user // it's the token associated to the "default-login" config @@ -53,12 +53,15 @@ type githubExporter struct { // cache labels used to speed up exporting labels events cachedLabels map[string]string + + // channel to send export results + out chan<- core.ExportResult } // Init . func (ge *githubExporter) Init(_ context.Context, repo *cache.RepoCache, conf core.Configuration) error { ge.conf = conf - ge.identityClient = make(map[entity.Id]*githubv4.Client) + ge.identityClient = make(map[entity.Id]*rateLimitHandlerClient) ge.cachedOperationIDs = make(map[entity.Id]string) ge.cachedLabels = make(map[string]string) @@ -114,7 +117,7 @@ func (ge *githubExporter) cacheAllClient(repo *cache.RepoCache) error { } // getClientForIdentity return a githubv4 API client configured with the access token of the given identity. -func (ge *githubExporter) getClientForIdentity(userId entity.Id) (*githubv4.Client, error) { +func (ge *githubExporter) getClientForIdentity(userId entity.Id) (*rateLimitHandlerClient, error) { client, ok := ge.identityClient[userId] if ok { return client, nil @@ -126,6 +129,7 @@ func (ge *githubExporter) getClientForIdentity(userId entity.Id) (*githubv4.Clie // ExportAll export all event made by the current user to Github func (ge *githubExporter) ExportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ExportResult, error) { out := make(chan core.ExportResult) + ge.out = out var err error // get repository node id @@ -139,15 +143,16 @@ func (ge *githubExporter) ExportAll(ctx context.Context, repo *cache.RepoCache, return nil, err } - // query all labels - err = ge.cacheGithubLabels(ctx, ge.defaultClient) - if err != nil { - return nil, err - } - go func() { defer close(out) + // query all labels + err = ge.cacheGithubLabels(ctx, ge.defaultClient) + if err != nil { + out <- core.NewExportError(errors.Wrap(err, "can't obtain Github labels"), "") + return + } + allIdentitiesIds := make([]entity.Id, 0, len(ge.identityClient)) for id := range ge.identityClient { allIdentitiesIds = append(allIdentitiesIds, id) @@ -250,7 +255,7 @@ func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, out } // create bug - id, url, err := createGithubIssue(ctx, client, ge.repositoryID, createOp.Title, createOp.Message) + id, url, err := ge.createGithubIssue(ctx, client, ge.repositoryID, createOp.Title, createOp.Message) if err != nil { err := errors.Wrap(err, "exporting github issue") out <- core.NewExportError(err, b.Id()) @@ -304,7 +309,7 @@ func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, out switch op := op.(type) { case *bug.AddCommentOperation: // send operation to github - id, url, err = addCommentGithubIssue(ctx, client, bugGithubID, op.Message) + id, url, err = ge.addCommentGithubIssue(ctx, client, bugGithubID, op.Message) if err != nil { err := errors.Wrap(err, "adding comment") out <- core.NewExportError(err, b.Id()) @@ -321,7 +326,7 @@ func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, out if op.Target == createOp.Id() { // case bug creation operation: we need to edit the Github issue - if err := updateGithubIssueBody(ctx, client, bugGithubID, op.Message); err != nil { + if err := ge.updateGithubIssueBody(ctx, client, bugGithubID, op.Message); err != nil { err := errors.Wrap(err, "editing issue") out <- core.NewExportError(err, b.Id()) return @@ -340,7 +345,7 @@ func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, out panic("unexpected error: comment id not found") } - eid, eurl, err := editCommentGithubIssue(ctx, client, commentID, op.Message) + eid, eurl, err := ge.editCommentGithubIssue(ctx, client, commentID, op.Message) if err != nil { err := errors.Wrap(err, "editing comment") out <- core.NewExportError(err, b.Id()) @@ -355,7 +360,7 @@ func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, out } case *bug.SetStatusOperation: - if err := updateGithubIssueStatus(ctx, client, bugGithubID, op.Status); err != nil { + if err := ge.updateGithubIssueStatus(ctx, client, bugGithubID, op.Status); err != nil { err := errors.Wrap(err, "editing status") out <- core.NewExportError(err, b.Id()) return @@ -367,7 +372,7 @@ func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, out url = bugGithubURL case *bug.SetTitleOperation: - if err := updateGithubIssueTitle(ctx, client, bugGithubID, op.Title); err != nil { + if err := ge.updateGithubIssueTitle(ctx, client, bugGithubID, op.Title); err != nil { err := errors.Wrap(err, "editing title") out <- core.NewExportError(err, b.Id()) return @@ -472,7 +477,7 @@ func markOperationAsExported(b *cache.BugCache, target entity.Id, githubID, gith return err } -func (ge *githubExporter) cacheGithubLabels(ctx context.Context, gc *githubv4.Client) error { +func (ge *githubExporter) cacheGithubLabels(ctx context.Context, gc *rateLimitHandlerClient) error { variables := map[string]interface{}{ "owner": githubv4.String(ge.conf[confKeyOwner]), "name": githubv4.String(ge.conf[confKeyProject]), @@ -481,17 +486,25 @@ func (ge *githubExporter) cacheGithubLabels(ctx context.Context, gc *githubv4.Cl } q := labelsQuery{} + // When performing the queries we have to forward rate limiting events to the + // current channel of export results. + events := make(chan RateLimitingEvent) + defer close(events) + go func() { + for e := range events { + select { + case <-ctx.Done(): + return + case ge.out <- core.NewExportRateLimiting(e.msg): + } + } + }() hasNextPage := true for hasNextPage { - // create a new timeout context at each iteration - ctx, cancel := context.WithTimeout(ctx, defaultTimeout) - - if err := gc.Query(ctx, &q, variables); err != nil { - cancel() + if err := gc.queryWithLimitEvents(ctx, &q, variables, events); err != nil { return err } - cancel() for _, label := range q.Repository.Labels.Nodes { ge.cachedLabels[label.Name] = label.ID @@ -584,11 +597,9 @@ func (ge *githubExporter) createGithubLabelV4(gc *githubv4.Client, label, labelC Color: githubv4.String(labelColor), } - parentCtx := context.Background() - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) - defer cancel() + ctx := context.Background() - if err := gc.Mutate(ctx, &m, input, nil); err != nil { + if err := gc.mutate(ctx, &m, input, nil); err != nil { return "", err } @@ -596,7 +607,7 @@ func (ge *githubExporter) createGithubLabelV4(gc *githubv4.Client, label, labelC } */ -func (ge *githubExporter) getOrCreateGithubLabelID(ctx context.Context, gc *githubv4.Client, repositoryID string, label bug.Label) (string, error) { +func (ge *githubExporter) getOrCreateGithubLabelID(ctx context.Context, gc *rateLimitHandlerClient, repositoryID string, label bug.Label) (string, error) { // try to get label id from cache labelID, err := ge.getLabelID(string(label)) if err == nil { @@ -618,7 +629,7 @@ func (ge *githubExporter) getOrCreateGithubLabelID(ctx context.Context, gc *gith return labelID, nil } -func (ge *githubExporter) getLabelsIDs(ctx context.Context, gc *githubv4.Client, repositoryID string, labels []bug.Label) ([]githubv4.ID, error) { +func (ge *githubExporter) getLabelsIDs(ctx context.Context, gc *rateLimitHandlerClient, repositoryID string, labels []bug.Label) ([]githubv4.ID, error) { ids := make([]githubv4.ID, 0, len(labels)) var err error @@ -643,7 +654,7 @@ func (ge *githubExporter) getLabelsIDs(ctx context.Context, gc *githubv4.Client, } // create a github issue and return it ID -func createGithubIssue(ctx context.Context, gc *githubv4.Client, repositoryID, title, body string) (string, string, error) { +func (ge *githubExporter) createGithubIssue(ctx context.Context, gc *rateLimitHandlerClient, repositoryID, title, body string) (string, string, error) { m := &createIssueMutation{} input := githubv4.CreateIssueInput{ RepositoryID: repositoryID, @@ -651,10 +662,7 @@ func createGithubIssue(ctx context.Context, gc *githubv4.Client, repositoryID, t Body: (*githubv4.String)(&body), } - ctx, cancel := context.WithTimeout(ctx, defaultTimeout) - defer cancel() - - if err := gc.Mutate(ctx, m, input, nil); err != nil { + if err := gc.mutate(ctx, m, input, nil, ge.out); err != nil { return "", "", err } @@ -663,17 +671,14 @@ func createGithubIssue(ctx context.Context, gc *githubv4.Client, repositoryID, t } // add a comment to an issue and return it ID -func addCommentGithubIssue(ctx context.Context, gc *githubv4.Client, subjectID string, body string) (string, string, error) { +func (ge *githubExporter) addCommentGithubIssue(ctx context.Context, gc *rateLimitHandlerClient, subjectID string, body string) (string, string, error) { m := &addCommentToIssueMutation{} input := githubv4.AddCommentInput{ SubjectID: subjectID, Body: githubv4.String(body), } - ctx, cancel := context.WithTimeout(ctx, defaultTimeout) - defer cancel() - - if err := gc.Mutate(ctx, m, input, nil); err != nil { + if err := gc.mutate(ctx, m, input, nil, ge.out); err != nil { return "", "", err } @@ -681,24 +686,21 @@ func addCommentGithubIssue(ctx context.Context, gc *githubv4.Client, subjectID s return node.ID, node.URL, nil } -func editCommentGithubIssue(ctx context.Context, gc *githubv4.Client, commentID, body string) (string, string, error) { +func (ge *githubExporter) editCommentGithubIssue(ctx context.Context, gc *rateLimitHandlerClient, commentID, body string) (string, string, error) { m := &updateIssueCommentMutation{} input := githubv4.UpdateIssueCommentInput{ ID: commentID, Body: githubv4.String(body), } - ctx, cancel := context.WithTimeout(ctx, defaultTimeout) - defer cancel() - - if err := gc.Mutate(ctx, m, input, nil); err != nil { + if err := gc.mutate(ctx, m, input, nil, ge.out); err != nil { return "", "", err } return commentID, m.UpdateIssueComment.IssueComment.URL, nil } -func updateGithubIssueStatus(ctx context.Context, gc *githubv4.Client, id string, status bug.Status) error { +func (ge *githubExporter) updateGithubIssueStatus(ctx context.Context, gc *rateLimitHandlerClient, id string, status bug.Status) error { m := &updateIssueMutation{} // set state @@ -718,44 +720,35 @@ func updateGithubIssueStatus(ctx context.Context, gc *githubv4.Client, id string State: &state, } - ctx, cancel := context.WithTimeout(ctx, defaultTimeout) - defer cancel() - - if err := gc.Mutate(ctx, m, input, nil); err != nil { + if err := gc.mutate(ctx, m, input, nil, ge.out); err != nil { return err } return nil } -func updateGithubIssueBody(ctx context.Context, gc *githubv4.Client, id string, body string) error { +func (ge *githubExporter) updateGithubIssueBody(ctx context.Context, gc *rateLimitHandlerClient, id string, body string) error { m := &updateIssueMutation{} input := githubv4.UpdateIssueInput{ ID: id, Body: (*githubv4.String)(&body), } - ctx, cancel := context.WithTimeout(ctx, defaultTimeout) - defer cancel() - - if err := gc.Mutate(ctx, m, input, nil); err != nil { + if err := gc.mutate(ctx, m, input, nil, ge.out); err != nil { return err } return nil } -func updateGithubIssueTitle(ctx context.Context, gc *githubv4.Client, id, title string) error { +func (ge *githubExporter) updateGithubIssueTitle(ctx context.Context, gc *rateLimitHandlerClient, id, title string) error { m := &updateIssueMutation{} input := githubv4.UpdateIssueInput{ ID: id, Title: (*githubv4.String)(&title), } - ctx, cancel := context.WithTimeout(ctx, defaultTimeout) - defer cancel() - - if err := gc.Mutate(ctx, m, input, nil); err != nil { + if err := gc.mutate(ctx, m, input, nil, ge.out); err != nil { return err } @@ -763,9 +756,7 @@ func updateGithubIssueTitle(ctx context.Context, gc *githubv4.Client, id, title } // update github issue labels -func (ge *githubExporter) updateGithubIssueLabels(ctx context.Context, gc *githubv4.Client, labelableID string, added, removed []bug.Label) error { - reqCtx, cancel := context.WithTimeout(ctx, defaultTimeout) - defer cancel() +func (ge *githubExporter) updateGithubIssueLabels(ctx context.Context, gc *rateLimitHandlerClient, labelableID string, added, removed []bug.Label) error { wg, ctx := errgroup.WithContext(ctx) if len(added) > 0 { @@ -782,7 +773,7 @@ func (ge *githubExporter) updateGithubIssueLabels(ctx context.Context, gc *githu } // add labels - if err := gc.Mutate(reqCtx, m, inputAdd, nil); err != nil { + if err := gc.mutate(ctx, m, inputAdd, nil, ge.out); err != nil { return err } return nil @@ -803,7 +794,7 @@ func (ge *githubExporter) updateGithubIssueLabels(ctx context.Context, gc *githu } // remove label labels - if err := gc.Mutate(reqCtx, m2, inputRemove, nil); err != nil { + if err := gc.mutate(ctx, m2, inputRemove, nil, ge.out); err != nil { return err } return nil diff --git a/bridge/github/github.go b/bridge/github/github.go index 1e85eb9a..cbeb03eb 100644 --- a/bridge/github/github.go +++ b/bridge/github/github.go @@ -5,7 +5,6 @@ import ( "context" "time" - "github.com/shurcooL/githubv4" "golang.org/x/oauth2" "github.com/MichaelMure/git-bug/bridge/core" @@ -47,11 +46,10 @@ func (*Github) NewExporter() core.Exporter { return &githubExporter{} } -func buildClient(token *auth.Token) *githubv4.Client { +func buildClient(token *auth.Token) *rateLimitHandlerClient { src := oauth2.StaticTokenSource( &oauth2.Token{AccessToken: token.Value}, ) httpClient := oauth2.NewClient(context.TODO(), src) - - return githubv4.NewClient(httpClient) + return newRateLimitHandlerClient(httpClient) } diff --git a/bridge/github/import.go b/bridge/github/import.go index f410cc65..a41083d2 100644 --- a/bridge/github/import.go +++ b/bridge/github/import.go @@ -22,7 +22,7 @@ type githubImporter struct { conf core.Configuration // default client - client *githubv4.Client + client *rateLimitHandlerClient // mediator to access the Github API mediator *importMediator @@ -73,6 +73,7 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, // Exactly the same is true for comments and comment edits. // As a consequence we need to look at the current event and one look ahead // event. + currEvent = nextEvent if currEvent == nil { currEvent = gi.getEventHandleMsgs() diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go index 873d5f62..db9f877c 100644 --- a/bridge/github/import_mediator.go +++ b/bridge/github/import_mediator.go @@ -2,8 +2,6 @@ package github import ( "context" - "fmt" - "strings" "time" "github.com/shurcooL/githubv4" @@ -22,7 +20,7 @@ const ( // importMediator provides a convenient interface to retrieve issues from the Github GraphQL API. type importMediator struct { // Github graphql client - gc *githubv4.Client + gh *rateLimitHandlerClient // name of the repository owner on Github owner string @@ -47,10 +45,6 @@ type ImportEvent interface { isImportEvent() } -type RateLimitingEvent struct { - msg string -} - func (RateLimitingEvent) isImportEvent() {} type IssueEvent struct { @@ -84,9 +78,9 @@ func (mm *importMediator) NextImportEvent() ImportEvent { return <-mm.importEvents } -func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator { +func NewImportMediator(ctx context.Context, client *rateLimitHandlerClient, owner, project string, since time.Time) *importMediator { mm := importMediator{ - gc: client, + gh: client, owner: owner, project: project, since: since, @@ -144,7 +138,7 @@ func (mm *importMediator) Error() error { func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) { query := userQuery{} vars := varmap{"login": githubv4.String(loginName)} - if err := mm.mQuery(ctx, &query, vars); err != nil { + if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil { return nil, err } return &query.User, nil @@ -206,7 +200,7 @@ func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, vars["issueEditBefore"] = cursor } query := issueEditQuery{} - if err := mm.mQuery(ctx, &query, vars); err != nil { + if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil { mm.err = err return nil, false } @@ -250,7 +244,7 @@ func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cu vars["timelineAfter"] = cursor } query := timelineQuery{} - if err := mm.mQuery(ctx, &query, vars); err != nil { + if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil { mm.err = err return nil, false } @@ -300,7 +294,7 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID vars["commentEditBefore"] = cursor } query := commentEditQuery{} - if err := mm.mQuery(ctx, &query, vars); err != nil { + if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil { mm.err = err return nil, false } @@ -319,7 +313,7 @@ func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String vars["issueAfter"] = cursor } query := issueQuery{} - if err := mm.mQuery(ctx, &query, vars); err != nil { + if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil { mm.err = err return nil, false } @@ -340,97 +334,3 @@ func reverse(eds []userContentEdit) chan userContentEdit { }() return ret } - -// mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL query -// and it is used to populate the response into it. It should be a pointer to a struct that -// corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If -// there is a Github rate limiting error, then the function sleeps and retries after the rate limit -// is expired. If there is another error, then the method will retry before giving up. -func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error { - if err := mm.queryOnce(ctx, query, vars); err == nil { - // success: done - return nil - } - // failure: we will retry - // To retry is important for importing projects with a big number of issues, because - // there may be temporary network errors or momentary internal errors of the github servers. - retries := 3 - var err error - for i := 0; i < retries; i++ { - // wait a few seconds before retry - sleepTime := time.Duration(8*(i+1)) * time.Second - timer := time.NewTimer(sleepTime) - select { - case <-ctx.Done(): - stop(timer) - return ctx.Err() - case <-timer.C: - } - err = mm.queryOnce(ctx, query, vars) - if err == nil { - // success: done - return nil - } - } - return err -} - -func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}) error { - // first: just send the query to the graphql api - vars["dryRun"] = githubv4.Boolean(false) - qctx, cancel := context.WithTimeout(ctx, defaultTimeout) - defer cancel() - err := mm.gc.Query(qctx, query, vars) - if err == nil { - // no error: done - return nil - } - // matching the error string - if !strings.Contains(err.Error(), "API rate limit exceeded") { - // an error, but not the API rate limit error: done - return err - } - // a rate limit error - // ask the graphql api for rate limiting information - vars["dryRun"] = githubv4.Boolean(true) - qctx, cancel = context.WithTimeout(ctx, defaultTimeout) - defer cancel() - if err := mm.gc.Query(qctx, query, vars); err != nil { - return err - } - rateLimit := query.rateLimit() - if rateLimit.Cost > rateLimit.Remaining { - // sleep - resetTime := rateLimit.ResetAt.Time - // Add a few seconds (8) for good measure - resetTime = resetTime.Add(8 * time.Second) - msg := fmt.Sprintf("Github GraphQL API: import will sleep until %s", resetTime.String()) - select { - case <-ctx.Done(): - return ctx.Err() - case mm.importEvents <- RateLimitingEvent{msg}: - } - timer := time.NewTimer(time.Until(resetTime)) - select { - case <-ctx.Done(): - stop(timer) - return ctx.Err() - case <-timer.C: - } - } - // run the original query again - vars["dryRun"] = githubv4.Boolean(false) - qctx, cancel = context.WithTimeout(ctx, defaultTimeout) - defer cancel() - err = mm.gc.Query(qctx, query, vars) - return err // might be nil -} - -func stop(t *time.Timer) { - if !t.Stop() { - select { - case <-t.C: - default: - } - } -} diff --git a/bridge/github/import_query.go b/bridge/github/import_query.go index 461daf94..1428e3fd 100644 --- a/bridge/github/import_query.go +++ b/bridge/github/import_query.go @@ -2,30 +2,19 @@ package github import "github.com/shurcooL/githubv4" -type rateLimit struct { - Cost githubv4.Int - Limit githubv4.Int - NodeCount githubv4.Int - Remaining githubv4.Int - ResetAt githubv4.DateTime - Used githubv4.Int -} - -type rateLimiter interface { - rateLimit() rateLimit +type rateLimitQuery struct { + RateLimit struct { + ResetAt githubv4.DateTime + //Limit githubv4.Int + //Remaining githubv4.Int + } } type userQuery struct { - RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"` - User user `graphql:"user(login: $login)"` -} - -func (q *userQuery) rateLimit() rateLimit { - return q.RateLimit + User user `graphql:"user(login: $login)"` } type labelsQuery struct { - //RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"` Repository struct { Labels struct { Nodes []struct { @@ -40,26 +29,19 @@ type labelsQuery struct { } type loginQuery struct { - //RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"` Viewer struct { Login string `graphql:"login"` } `graphql:"viewer"` } type issueQuery struct { - RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"` Repository struct { Issues issueConnection `graphql:"issues(first: $issueFirst, after: $issueAfter, orderBy: {field: CREATED_AT, direction: ASC}, filterBy: {since: $issueSince})"` } `graphql:"repository(owner: $owner, name: $name)"` } -func (q *issueQuery) rateLimit() rateLimit { - return q.RateLimit -} - type issueEditQuery struct { - RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"` - Node struct { + Node struct { Typename githubv4.String `graphql:"__typename"` Issue struct { UserContentEdits userContentEditConnection `graphql:"userContentEdits(last: $issueEditLast, before: $issueEditBefore)"` @@ -67,13 +49,8 @@ type issueEditQuery struct { } `graphql:"node(id: $gqlNodeId)"` } -func (q *issueEditQuery) rateLimit() rateLimit { - return q.RateLimit -} - type timelineQuery struct { - RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"` - Node struct { + Node struct { Typename githubv4.String `graphql:"__typename"` Issue struct { TimelineItems timelineItemsConnection `graphql:"timelineItems(first: $timelineFirst, after: $timelineAfter)"` @@ -81,13 +58,8 @@ type timelineQuery struct { } `graphql:"node(id: $gqlNodeId)"` } -func (q *timelineQuery) rateLimit() rateLimit { - return q.RateLimit -} - type commentEditQuery struct { - RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"` - Node struct { + Node struct { Typename githubv4.String `graphql:"__typename"` IssueComment struct { UserContentEdits userContentEditConnection `graphql:"userContentEdits(last: $commentEditLast, before: $commentEditBefore)"` @@ -95,10 +67,6 @@ type commentEditQuery struct { } `graphql:"node(id: $gqlNodeId)"` } -func (q *commentEditQuery) rateLimit() rateLimit { - return q.RateLimit -} - type user struct { Login githubv4.String AvatarUrl githubv4.String -- cgit