aboutsummaryrefslogtreecommitdiffstats
path: root/bridge/github
diff options
context:
space:
mode:
authorrng-dynamics <73444470+rng-dynamics@users.noreply.github.com>2021-09-14 22:22:28 +0200
committerGitHub <noreply@github.com>2021-09-14 22:22:28 +0200
commit247e1a865db29a3189acfd89cde776a52a7ebaac (patch)
treea214a05c91c3a859eaaae621665492b24f776991 /bridge/github
parent58e6aec77f27214d7ba457ad709eedb0fc2f907b (diff)
downloadgit-bug-247e1a865db29a3189acfd89cde776a52a7ebaac.tar.gz
feature: Github bridge mutation rate limit (#694)
Unified handling of rate limiting of github graphql api
Diffstat (limited to 'bridge/github')
-rw-r--r--bridge/github/client.go188
-rw-r--r--bridge/github/config.go2
-rw-r--r--bridge/github/export.go119
-rw-r--r--bridge/github/github.go6
-rw-r--r--bridge/github/import.go3
-rw-r--r--bridge/github/import_mediator.go116
-rw-r--r--bridge/github/import_query.go52
7 files changed, 266 insertions, 220 deletions
diff --git a/bridge/github/client.go b/bridge/github/client.go
new file mode 100644
index 00000000..10f0a03c
--- /dev/null
+++ b/bridge/github/client.go
@@ -0,0 +1,188 @@
+package github
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+ "strings"
+ "time"
+
+ "github.com/MichaelMure/git-bug/bridge/core"
+ "github.com/shurcooL/githubv4"
+)
+
+var _ Client = &githubv4.Client{}
+
+// Client is an interface conforming with githubv4.Client
+type Client interface {
+ Mutate(context.Context, interface{}, githubv4.Input, map[string]interface{}) error
+ Query(context.Context, interface{}, map[string]interface{}) error
+}
+
+// rateLimitHandlerClient wrapps the Github client and adds improved error handling and handling of
+// Github's GraphQL rate limit.
+type rateLimitHandlerClient struct {
+ sc Client
+}
+
+func newRateLimitHandlerClient(httpClient *http.Client) *rateLimitHandlerClient {
+ return &rateLimitHandlerClient{sc: githubv4.NewClient(httpClient)}
+}
+
+type RateLimitingEvent struct {
+ msg string
+}
+
+// mutate calls the github api with a graphql mutation and for each rate limiting event it sends an
+// export result.
+func (c *rateLimitHandlerClient) mutate(ctx context.Context, m interface{}, input githubv4.Input, vars map[string]interface{}, out chan<- core.ExportResult) error {
+ // prepare a closure for the mutation
+ mutFun := func(ctx context.Context) error {
+ return c.sc.Mutate(ctx, m, input, vars)
+ }
+ limitEvents := make(chan RateLimitingEvent)
+ defer close(limitEvents)
+ go func() {
+ for e := range limitEvents {
+ select {
+ case <-ctx.Done():
+ return
+ case out <- core.NewExportRateLimiting(e.msg):
+ }
+ }
+ }()
+ return c.callAPIAndRetry(mutFun, ctx, limitEvents)
+}
+
+// queryWithLimitEvents calls the github api with a graphql query and it sends rate limiting events
+// to a given channel of type RateLimitingEvent.
+func (c *rateLimitHandlerClient) queryWithLimitEvents(ctx context.Context, query interface{}, vars map[string]interface{}, limitEvents chan<- RateLimitingEvent) error {
+ // prepare a closure fot the query
+ queryFun := func(ctx context.Context) error {
+ return c.sc.Query(ctx, query, vars)
+ }
+ return c.callAPIAndRetry(queryFun, ctx, limitEvents)
+}
+
+// queryWithImportEvents calls the github api with a graphql query and it sends rate limiting events
+// to a given channel of type ImportEvent.
+func (c *rateLimitHandlerClient) queryWithImportEvents(ctx context.Context, query interface{}, vars map[string]interface{}, importEvents chan<- ImportEvent) error {
+ // forward rate limiting events to channel of import events
+ limitEvents := make(chan RateLimitingEvent)
+ defer close(limitEvents)
+ go func() {
+ for e := range limitEvents {
+ select {
+ case <-ctx.Done():
+ return
+ case importEvents <- e:
+ }
+ }
+ }()
+ return c.queryWithLimitEvents(ctx, query, vars, limitEvents)
+}
+
+// queryPrintMsgs calls the github api with a graphql query and it prints for ever rate limiting
+// event a message to stdout.
+func (c *rateLimitHandlerClient) queryPrintMsgs(ctx context.Context, query interface{}, vars map[string]interface{}) error {
+ // print rate limiting events directly to stdout.
+ limitEvents := make(chan RateLimitingEvent)
+ defer close(limitEvents)
+ go func() {
+ for e := range limitEvents {
+ fmt.Println(e.msg)
+ }
+ }()
+ return c.queryWithLimitEvents(ctx, query, vars, limitEvents)
+}
+
+// callAPIAndRetry calls the Github GraphQL API (inderectely through callAPIDealWithLimit) and in
+// case of error it repeats the request to the Github API. The parameter `apiCall` is intended to be
+// a closure containing a query or a mutation to the Github GraphQL API.
+func (c *rateLimitHandlerClient) callAPIAndRetry(apiCall func(context.Context) error, ctx context.Context, events chan<- RateLimitingEvent) error {
+ var err error
+ if err = c.callAPIDealWithLimit(apiCall, ctx, events); err == nil {
+ return nil
+ }
+ // failure; the reason may be temporary network problems or internal errors
+ // on the github servers. Internal errors on the github servers are quite common.
+ // Retry
+ retries := 3
+ for i := 0; i < retries; i++ {
+ // wait a few seconds before retry
+ sleepTime := time.Duration(8*(i+1)) * time.Second
+ timer := time.NewTimer(sleepTime)
+ select {
+ case <-ctx.Done():
+ stop(timer)
+ return ctx.Err()
+ case <-timer.C:
+ err = c.callAPIDealWithLimit(apiCall, ctx, events)
+ if err == nil {
+ return nil
+ }
+ }
+ }
+ return err
+}
+
+// callAPIDealWithLimit calls the Github GraphQL API and if the Github API returns a rate limiting
+// error, then it waits until the rate limit is reset and it repeats the request to the API. The
+// parameter `apiCall` is intended to be a closure containing a query or a mutation to the Github
+// GraphQL API.
+func (c *rateLimitHandlerClient) callAPIDealWithLimit(apiCall func(context.Context) error, ctx context.Context, events chan<- RateLimitingEvent) error {
+ qctx, cancel := context.WithTimeout(ctx, defaultTimeout)
+ defer cancel()
+ // call the function fun()
+ err := apiCall(qctx)
+ if err == nil {
+ return nil
+ }
+ // matching the error string
+ if strings.Contains(err.Error(), "API rate limit exceeded") {
+ // a rate limit error
+ qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
+ defer cancel()
+ // Use a separate query to get Github rate limiting information.
+ limitQuery := rateLimitQuery{}
+ if err := c.sc.Query(qctx, &limitQuery, map[string]interface{}{}); err != nil {
+ return err
+ }
+ // Get the time when Github will reset the rate limit of their API.
+ resetTime := limitQuery.RateLimit.ResetAt.Time
+ msg := fmt.Sprintf(
+ "Github GraphQL API rate limit. This process will sleep until %s.",
+ resetTime.String(),
+ )
+ // Send message about rate limiting event.
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case events <- RateLimitingEvent{msg}:
+ }
+ // Pause current goroutine
+ timer := time.NewTimer(time.Until(resetTime))
+ select {
+ case <-ctx.Done():
+ stop(timer)
+ return ctx.Err()
+ case <-timer.C:
+ }
+ // call the function apiCall() again
+ qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
+ defer cancel()
+ err = apiCall(qctx)
+ return err // might be nil
+ } else {
+ return err
+ }
+}
+
+func stop(t *time.Timer) {
+ if !t.Stop() {
+ select {
+ case <-t.C:
+ default:
+ }
+ }
+}
diff --git a/bridge/github/config.go b/bridge/github/config.go
index 9889f403..3dfbd14b 100644
--- a/bridge/github/config.go
+++ b/bridge/github/config.go
@@ -563,7 +563,7 @@ func getLoginFromToken(token *auth.Token) (string, error) {
var q loginQuery
- err := client.Query(ctx, &q, nil)
+ err := client.queryPrintMsgs(ctx, &q, nil)
if err != nil {
return "", err
}
diff --git a/bridge/github/export.go b/bridge/github/export.go
index 264f2a23..8c40eb74 100644
--- a/bridge/github/export.go
+++ b/bridge/github/export.go
@@ -32,12 +32,12 @@ type githubExporter struct {
conf core.Configuration
// cache identities clients
- identityClient map[entity.Id]*githubv4.Client
+ identityClient map[entity.Id]*rateLimitHandlerClient
// the client to use for non user-specific queries
// it's the client associated to the "default-login" config
// used for the github V4 API (graphql)
- defaultClient *githubv4.Client
+ defaultClient *rateLimitHandlerClient
// the token of the default user
// it's the token associated to the "default-login" config
@@ -53,12 +53,15 @@ type githubExporter struct {
// cache labels used to speed up exporting labels events
cachedLabels map[string]string
+
+ // channel to send export results
+ out chan<- core.ExportResult
}
// Init .
func (ge *githubExporter) Init(_ context.Context, repo *cache.RepoCache, conf core.Configuration) error {
ge.conf = conf
- ge.identityClient = make(map[entity.Id]*githubv4.Client)
+ ge.identityClient = make(map[entity.Id]*rateLimitHandlerClient)
ge.cachedOperationIDs = make(map[entity.Id]string)
ge.cachedLabels = make(map[string]string)
@@ -114,7 +117,7 @@ func (ge *githubExporter) cacheAllClient(repo *cache.RepoCache) error {
}
// getClientForIdentity return a githubv4 API client configured with the access token of the given identity.
-func (ge *githubExporter) getClientForIdentity(userId entity.Id) (*githubv4.Client, error) {
+func (ge *githubExporter) getClientForIdentity(userId entity.Id) (*rateLimitHandlerClient, error) {
client, ok := ge.identityClient[userId]
if ok {
return client, nil
@@ -126,6 +129,7 @@ func (ge *githubExporter) getClientForIdentity(userId entity.Id) (*githubv4.Clie
// ExportAll export all event made by the current user to Github
func (ge *githubExporter) ExportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ExportResult, error) {
out := make(chan core.ExportResult)
+ ge.out = out
var err error
// get repository node id
@@ -139,15 +143,16 @@ func (ge *githubExporter) ExportAll(ctx context.Context, repo *cache.RepoCache,
return nil, err
}
- // query all labels
- err = ge.cacheGithubLabels(ctx, ge.defaultClient)
- if err != nil {
- return nil, err
- }
-
go func() {
defer close(out)
+ // query all labels
+ err = ge.cacheGithubLabels(ctx, ge.defaultClient)
+ if err != nil {
+ out <- core.NewExportError(errors.Wrap(err, "can't obtain Github labels"), "")
+ return
+ }
+
allIdentitiesIds := make([]entity.Id, 0, len(ge.identityClient))
for id := range ge.identityClient {
allIdentitiesIds = append(allIdentitiesIds, id)
@@ -250,7 +255,7 @@ func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, out
}
// create bug
- id, url, err := createGithubIssue(ctx, client, ge.repositoryID, createOp.Title, createOp.Message)
+ id, url, err := ge.createGithubIssue(ctx, client, ge.repositoryID, createOp.Title, createOp.Message)
if err != nil {
err := errors.Wrap(err, "exporting github issue")
out <- core.NewExportError(err, b.Id())
@@ -304,7 +309,7 @@ func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, out
switch op := op.(type) {
case *bug.AddCommentOperation:
// send operation to github
- id, url, err = addCommentGithubIssue(ctx, client, bugGithubID, op.Message)
+ id, url, err = ge.addCommentGithubIssue(ctx, client, bugGithubID, op.Message)
if err != nil {
err := errors.Wrap(err, "adding comment")
out <- core.NewExportError(err, b.Id())
@@ -321,7 +326,7 @@ func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, out
if op.Target == createOp.Id() {
// case bug creation operation: we need to edit the Github issue
- if err := updateGithubIssueBody(ctx, client, bugGithubID, op.Message); err != nil {
+ if err := ge.updateGithubIssueBody(ctx, client, bugGithubID, op.Message); err != nil {
err := errors.Wrap(err, "editing issue")
out <- core.NewExportError(err, b.Id())
return
@@ -340,7 +345,7 @@ func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, out
panic("unexpected error: comment id not found")
}
- eid, eurl, err := editCommentGithubIssue(ctx, client, commentID, op.Message)
+ eid, eurl, err := ge.editCommentGithubIssue(ctx, client, commentID, op.Message)
if err != nil {
err := errors.Wrap(err, "editing comment")
out <- core.NewExportError(err, b.Id())
@@ -355,7 +360,7 @@ func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, out
}
case *bug.SetStatusOperation:
- if err := updateGithubIssueStatus(ctx, client, bugGithubID, op.Status); err != nil {
+ if err := ge.updateGithubIssueStatus(ctx, client, bugGithubID, op.Status); err != nil {
err := errors.Wrap(err, "editing status")
out <- core.NewExportError(err, b.Id())
return
@@ -367,7 +372,7 @@ func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, out
url = bugGithubURL
case *bug.SetTitleOperation:
- if err := updateGithubIssueTitle(ctx, client, bugGithubID, op.Title); err != nil {
+ if err := ge.updateGithubIssueTitle(ctx, client, bugGithubID, op.Title); err != nil {
err := errors.Wrap(err, "editing title")
out <- core.NewExportError(err, b.Id())
return
@@ -472,7 +477,7 @@ func markOperationAsExported(b *cache.BugCache, target entity.Id, githubID, gith
return err
}
-func (ge *githubExporter) cacheGithubLabels(ctx context.Context, gc *githubv4.Client) error {
+func (ge *githubExporter) cacheGithubLabels(ctx context.Context, gc *rateLimitHandlerClient) error {
variables := map[string]interface{}{
"owner": githubv4.String(ge.conf[confKeyOwner]),
"name": githubv4.String(ge.conf[confKeyProject]),
@@ -481,17 +486,25 @@ func (ge *githubExporter) cacheGithubLabels(ctx context.Context, gc *githubv4.Cl
}
q := labelsQuery{}
+ // When performing the queries we have to forward rate limiting events to the
+ // current channel of export results.
+ events := make(chan RateLimitingEvent)
+ defer close(events)
+ go func() {
+ for e := range events {
+ select {
+ case <-ctx.Done():
+ return
+ case ge.out <- core.NewExportRateLimiting(e.msg):
+ }
+ }
+ }()
hasNextPage := true
for hasNextPage {
- // create a new timeout context at each iteration
- ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
-
- if err := gc.Query(ctx, &q, variables); err != nil {
- cancel()
+ if err := gc.queryWithLimitEvents(ctx, &q, variables, events); err != nil {
return err
}
- cancel()
for _, label := range q.Repository.Labels.Nodes {
ge.cachedLabels[label.Name] = label.ID
@@ -584,11 +597,9 @@ func (ge *githubExporter) createGithubLabelV4(gc *githubv4.Client, label, labelC
Color: githubv4.String(labelColor),
}
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
- defer cancel()
+ ctx := context.Background()
- if err := gc.Mutate(ctx, &m, input, nil); err != nil {
+ if err := gc.mutate(ctx, &m, input, nil); err != nil {
return "", err
}
@@ -596,7 +607,7 @@ func (ge *githubExporter) createGithubLabelV4(gc *githubv4.Client, label, labelC
}
*/
-func (ge *githubExporter) getOrCreateGithubLabelID(ctx context.Context, gc *githubv4.Client, repositoryID string, label bug.Label) (string, error) {
+func (ge *githubExporter) getOrCreateGithubLabelID(ctx context.Context, gc *rateLimitHandlerClient, repositoryID string, label bug.Label) (string, error) {
// try to get label id from cache
labelID, err := ge.getLabelID(string(label))
if err == nil {
@@ -618,7 +629,7 @@ func (ge *githubExporter) getOrCreateGithubLabelID(ctx context.Context, gc *gith
return labelID, nil
}
-func (ge *githubExporter) getLabelsIDs(ctx context.Context, gc *githubv4.Client, repositoryID string, labels []bug.Label) ([]githubv4.ID, error) {
+func (ge *githubExporter) getLabelsIDs(ctx context.Context, gc *rateLimitHandlerClient, repositoryID string, labels []bug.Label) ([]githubv4.ID, error) {
ids := make([]githubv4.ID, 0, len(labels))
var err error
@@ -643,7 +654,7 @@ func (ge *githubExporter) getLabelsIDs(ctx context.Context, gc *githubv4.Client,
}
// create a github issue and return it ID
-func createGithubIssue(ctx context.Context, gc *githubv4.Client, repositoryID, title, body string) (string, string, error) {
+func (ge *githubExporter) createGithubIssue(ctx context.Context, gc *rateLimitHandlerClient, repositoryID, title, body string) (string, string, error) {
m := &createIssueMutation{}
input := githubv4.CreateIssueInput{
RepositoryID: repositoryID,
@@ -651,10 +662,7 @@ func createGithubIssue(ctx context.Context, gc *githubv4.Client, repositoryID, t
Body: (*githubv4.String)(&body),
}
- ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
- defer cancel()
-
- if err := gc.Mutate(ctx, m, input, nil); err != nil {
+ if err := gc.mutate(ctx, m, input, nil, ge.out); err != nil {
return "", "", err
}
@@ -663,17 +671,14 @@ func createGithubIssue(ctx context.Context, gc *githubv4.Client, repositoryID, t
}
// add a comment to an issue and return it ID
-func addCommentGithubIssue(ctx context.Context, gc *githubv4.Client, subjectID string, body string) (string, string, error) {
+func (ge *githubExporter) addCommentGithubIssue(ctx context.Context, gc *rateLimitHandlerClient, subjectID string, body string) (string, string, error) {
m := &addCommentToIssueMutation{}
input := githubv4.AddCommentInput{
SubjectID: subjectID,
Body: githubv4.String(body),
}
- ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
- defer cancel()
-
- if err := gc.Mutate(ctx, m, input, nil); err != nil {
+ if err := gc.mutate(ctx, m, input, nil, ge.out); err != nil {
return "", "", err
}
@@ -681,24 +686,21 @@ func addCommentGithubIssue(ctx context.Context, gc *githubv4.Client, subjectID s
return node.ID, node.URL, nil
}
-func editCommentGithubIssue(ctx context.Context, gc *githubv4.Client, commentID, body string) (string, string, error) {
+func (ge *githubExporter) editCommentGithubIssue(ctx context.Context, gc *rateLimitHandlerClient, commentID, body string) (string, string, error) {
m := &updateIssueCommentMutation{}
input := githubv4.UpdateIssueCommentInput{
ID: commentID,
Body: githubv4.String(body),
}
- ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
- defer cancel()
-
- if err := gc.Mutate(ctx, m, input, nil); err != nil {
+ if err := gc.mutate(ctx, m, input, nil, ge.out); err != nil {
return "", "", err
}
return commentID, m.UpdateIssueComment.IssueComment.URL, nil
}
-func updateGithubIssueStatus(ctx context.Context, gc *githubv4.Client, id string, status bug.Status) error {
+func (ge *githubExporter) updateGithubIssueStatus(ctx context.Context, gc *rateLimitHandlerClient, id string, status bug.Status) error {
m := &updateIssueMutation{}
// set state
@@ -718,44 +720,35 @@ func updateGithubIssueStatus(ctx context.Context, gc *githubv4.Client, id string
State: &state,
}
- ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
- defer cancel()
-
- if err := gc.Mutate(ctx, m, input, nil); err != nil {
+ if err := gc.mutate(ctx, m, input, nil, ge.out); err != nil {
return err
}
return nil
}
-func updateGithubIssueBody(ctx context.Context, gc *githubv4.Client, id string, body string) error {
+func (ge *githubExporter) updateGithubIssueBody(ctx context.Context, gc *rateLimitHandlerClient, id string, body string) error {
m := &updateIssueMutation{}
input := githubv4.UpdateIssueInput{
ID: id,
Body: (*githubv4.String)(&body),
}
- ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
- defer cancel()
-
- if err := gc.Mutate(ctx, m, input, nil); err != nil {
+ if err := gc.mutate(ctx, m, input, nil, ge.out); err != nil {
return err
}
return nil
}
-func updateGithubIssueTitle(ctx context.Context, gc *githubv4.Client, id, title string) error {
+func (ge *githubExporter) updateGithubIssueTitle(ctx context.Context, gc *rateLimitHandlerClient, id, title string) error {
m := &updateIssueMutation{}
input := githubv4.UpdateIssueInput{
ID: id,
Title: (*githubv4.String)(&title),
}
- ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
- defer cancel()
-
- if err := gc.Mutate(ctx, m, input, nil); err != nil {
+ if err := gc.mutate(ctx, m, input, nil, ge.out); err != nil {
return err
}
@@ -763,9 +756,7 @@ func updateGithubIssueTitle(ctx context.Context, gc *githubv4.Client, id, title
}
// update github issue labels
-func (ge *githubExporter) updateGithubIssueLabels(ctx context.Context, gc *githubv4.Client, labelableID string, added, removed []bug.Label) error {
- reqCtx, cancel := context.WithTimeout(ctx, defaultTimeout)
- defer cancel()
+func (ge *githubExporter) updateGithubIssueLabels(ctx context.Context, gc *rateLimitHandlerClient, labelableID string, added, removed []bug.Label) error {
wg, ctx := errgroup.WithContext(ctx)
if len(added) > 0 {
@@ -782,7 +773,7 @@ func (ge *githubExporter) updateGithubIssueLabels(ctx context.Context, gc *githu
}
// add labels
- if err := gc.Mutate(reqCtx, m, inputAdd, nil); err != nil {
+ if err := gc.mutate(ctx, m, inputAdd, nil, ge.out); err != nil {
return err
}
return nil
@@ -803,7 +794,7 @@ func (ge *githubExporter) updateGithubIssueLabels(ctx context.Context, gc *githu
}
// remove label labels
- if err := gc.Mutate(reqCtx, m2, inputRemove, nil); err != nil {
+ if err := gc.mutate(ctx, m2, inputRemove, nil, ge.out); err != nil {
return err
}
return nil
diff --git a/bridge/github/github.go b/bridge/github/github.go
index 1e85eb9a..cbeb03eb 100644
--- a/bridge/github/github.go
+++ b/bridge/github/github.go
@@ -5,7 +5,6 @@ import (
"context"
"time"
- "github.com/shurcooL/githubv4"
"golang.org/x/oauth2"
"github.com/MichaelMure/git-bug/bridge/core"
@@ -47,11 +46,10 @@ func (*Github) NewExporter() core.Exporter {
return &githubExporter{}
}
-func buildClient(token *auth.Token) *githubv4.Client {
+func buildClient(token *auth.Token) *rateLimitHandlerClient {
src := oauth2.StaticTokenSource(
&oauth2.Token{AccessToken: token.Value},
)
httpClient := oauth2.NewClient(context.TODO(), src)
-
- return githubv4.NewClient(httpClient)
+ return newRateLimitHandlerClient(httpClient)
}
diff --git a/bridge/github/import.go b/bridge/github/import.go
index f410cc65..a41083d2 100644
--- a/bridge/github/import.go
+++ b/bridge/github/import.go
@@ -22,7 +22,7 @@ type githubImporter struct {
conf core.Configuration
// default client
- client *githubv4.Client
+ client *rateLimitHandlerClient
// mediator to access the Github API
mediator *importMediator
@@ -73,6 +73,7 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache,
// Exactly the same is true for comments and comment edits.
// As a consequence we need to look at the current event and one look ahead
// event.
+
currEvent = nextEvent
if currEvent == nil {
currEvent = gi.getEventHandleMsgs()
diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go
index 873d5f62..db9f877c 100644
--- a/bridge/github/import_mediator.go
+++ b/bridge/github/import_mediator.go
@@ -2,8 +2,6 @@ package github
import (
"context"
- "fmt"
- "strings"
"time"
"github.com/shurcooL/githubv4"
@@ -22,7 +20,7 @@ const (
// importMediator provides a convenient interface to retrieve issues from the Github GraphQL API.
type importMediator struct {
// Github graphql client
- gc *githubv4.Client
+ gh *rateLimitHandlerClient
// name of the repository owner on Github
owner string
@@ -47,10 +45,6 @@ type ImportEvent interface {
isImportEvent()
}
-type RateLimitingEvent struct {
- msg string
-}
-
func (RateLimitingEvent) isImportEvent() {}
type IssueEvent struct {
@@ -84,9 +78,9 @@ func (mm *importMediator) NextImportEvent() ImportEvent {
return <-mm.importEvents
}
-func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator {
+func NewImportMediator(ctx context.Context, client *rateLimitHandlerClient, owner, project string, since time.Time) *importMediator {
mm := importMediator{
- gc: client,
+ gh: client,
owner: owner,
project: project,
since: since,
@@ -144,7 +138,7 @@ func (mm *importMediator) Error() error {
func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) {
query := userQuery{}
vars := varmap{"login": githubv4.String(loginName)}
- if err := mm.mQuery(ctx, &query, vars); err != nil {
+ if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
return nil, err
}
return &query.User, nil
@@ -206,7 +200,7 @@ func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID,
vars["issueEditBefore"] = cursor
}
query := issueEditQuery{}
- if err := mm.mQuery(ctx, &query, vars); err != nil {
+ if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
mm.err = err
return nil, false
}
@@ -250,7 +244,7 @@ func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cu
vars["timelineAfter"] = cursor
}
query := timelineQuery{}
- if err := mm.mQuery(ctx, &query, vars); err != nil {
+ if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
mm.err = err
return nil, false
}
@@ -300,7 +294,7 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID
vars["commentEditBefore"] = cursor
}
query := commentEditQuery{}
- if err := mm.mQuery(ctx, &query, vars); err != nil {
+ if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
mm.err = err
return nil, false
}
@@ -319,7 +313,7 @@ func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String
vars["issueAfter"] = cursor
}
query := issueQuery{}
- if err := mm.mQuery(ctx, &query, vars); err != nil {
+ if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
mm.err = err
return nil, false
}
@@ -340,97 +334,3 @@ func reverse(eds []userContentEdit) chan userContentEdit {
}()
return ret
}
-
-// mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL query
-// and it is used to populate the response into it. It should be a pointer to a struct that
-// corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If
-// there is a Github rate limiting error, then the function sleeps and retries after the rate limit
-// is expired. If there is another error, then the method will retry before giving up.
-func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
- if err := mm.queryOnce(ctx, query, vars); err == nil {
- // success: done
- return nil
- }
- // failure: we will retry
- // To retry is important for importing projects with a big number of issues, because
- // there may be temporary network errors or momentary internal errors of the github servers.
- retries := 3
- var err error
- for i := 0; i < retries; i++ {
- // wait a few seconds before retry
- sleepTime := time.Duration(8*(i+1)) * time.Second
- timer := time.NewTimer(sleepTime)
- select {
- case <-ctx.Done():
- stop(timer)
- return ctx.Err()
- case <-timer.C:
- }
- err = mm.queryOnce(ctx, query, vars)
- if err == nil {
- // success: done
- return nil
- }
- }
- return err
-}
-
-func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
- // first: just send the query to the graphql api
- vars["dryRun"] = githubv4.Boolean(false)
- qctx, cancel := context.WithTimeout(ctx, defaultTimeout)
- defer cancel()
- err := mm.gc.Query(qctx, query, vars)
- if err == nil {
- // no error: done
- return nil
- }
- // matching the error string
- if !strings.Contains(err.Error(), "API rate limit exceeded") {
- // an error, but not the API rate limit error: done
- return err
- }
- // a rate limit error
- // ask the graphql api for rate limiting information
- vars["dryRun"] = githubv4.Boolean(true)
- qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
- defer cancel()
- if err := mm.gc.Query(qctx, query, vars); err != nil {
- return err
- }
- rateLimit := query.rateLimit()
- if rateLimit.Cost > rateLimit.Remaining {
- // sleep
- resetTime := rateLimit.ResetAt.Time
- // Add a few seconds (8) for good measure
- resetTime = resetTime.Add(8 * time.Second)
- msg := fmt.Sprintf("Github GraphQL API: import will sleep until %s", resetTime.String())
- select {
- case <-ctx.Done():
- return ctx.Err()
- case mm.importEvents <- RateLimitingEvent{msg}:
- }
- timer := time.NewTimer(time.Until(resetTime))
- select {
- case <-ctx.Done():
- stop(timer)
- return ctx.Err()
- case <-timer.C:
- }
- }
- // run the original query again
- vars["dryRun"] = githubv4.Boolean(false)
- qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
- defer cancel()
- err = mm.gc.Query(qctx, query, vars)
- return err // might be nil
-}
-
-func stop(t *time.Timer) {
- if !t.Stop() {
- select {
- case <-t.C:
- default:
- }
- }
-}
diff --git a/bridge/github/import_query.go b/bridge/github/import_query.go
index 461daf94..1428e3fd 100644
--- a/bridge/github/import_query.go
+++ b/bridge/github/import_query.go
@@ -2,30 +2,19 @@ package github
import "github.com/shurcooL/githubv4"
-type rateLimit struct {
- Cost githubv4.Int
- Limit githubv4.Int
- NodeCount githubv4.Int
- Remaining githubv4.Int
- ResetAt githubv4.DateTime
- Used githubv4.Int
-}
-
-type rateLimiter interface {
- rateLimit() rateLimit
+type rateLimitQuery struct {
+ RateLimit struct {
+ ResetAt githubv4.DateTime
+ //Limit githubv4.Int
+ //Remaining githubv4.Int
+ }
}
type userQuery struct {
- RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"`
- User user `graphql:"user(login: $login)"`
-}
-
-func (q *userQuery) rateLimit() rateLimit {
- return q.RateLimit
+ User user `graphql:"user(login: $login)"`
}
type labelsQuery struct {
- //RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"`
Repository struct {
Labels struct {
Nodes []struct {
@@ -40,26 +29,19 @@ type labelsQuery struct {
}
type loginQuery struct {
- //RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"`
Viewer struct {
Login string `graphql:"login"`
} `graphql:"viewer"`
}
type issueQuery struct {
- RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"`
Repository struct {
Issues issueConnection `graphql:"issues(first: $issueFirst, after: $issueAfter, orderBy: {field: CREATED_AT, direction: ASC}, filterBy: {since: $issueSince})"`
} `graphql:"repository(owner: $owner, name: $name)"`
}
-func (q *issueQuery) rateLimit() rateLimit {
- return q.RateLimit
-}
-
type issueEditQuery struct {
- RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"`
- Node struct {
+ Node struct {
Typename githubv4.String `graphql:"__typename"`
Issue struct {
UserContentEdits userContentEditConnection `graphql:"userContentEdits(last: $issueEditLast, before: $issueEditBefore)"`
@@ -67,13 +49,8 @@ type issueEditQuery struct {
} `graphql:"node(id: $gqlNodeId)"`
}
-func (q *issueEditQuery) rateLimit() rateLimit {
- return q.RateLimit
-}
-
type timelineQuery struct {
- RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"`
- Node struct {
+ Node struct {
Typename githubv4.String `graphql:"__typename"`
Issue struct {
TimelineItems timelineItemsConnection `graphql:"timelineItems(first: $timelineFirst, after: $timelineAfter)"`
@@ -81,13 +58,8 @@ type timelineQuery struct {
} `graphql:"node(id: $gqlNodeId)"`
}
-func (q *timelineQuery) rateLimit() rateLimit {
- return q.RateLimit
-}
-
type commentEditQuery struct {
- RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"`
- Node struct {
+ Node struct {
Typename githubv4.String `graphql:"__typename"`
IssueComment struct {
UserContentEdits userContentEditConnection `graphql:"userContentEdits(last: $commentEditLast, before: $commentEditBefore)"`
@@ -95,10 +67,6 @@ type commentEditQuery struct {
} `graphql:"node(id: $gqlNodeId)"`
}
-func (q *commentEditQuery) rateLimit() rateLimit {
- return q.RateLimit
-}
-
type user struct {
Login githubv4.String
AvatarUrl githubv4.String