aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bridge/github/client.go106
-rw-r--r--bridge/github/export.go15
-rw-r--r--bridge/github/import_events.go40
-rw-r--r--bridge/github/import_mediator.go134
4 files changed, 144 insertions, 151 deletions
diff --git a/bridge/github/client.go b/bridge/github/client.go
index 10f0a03c..00981a56 100644
--- a/bridge/github/client.go
+++ b/bridge/github/client.go
@@ -7,8 +7,9 @@ import (
"strings"
"time"
- "github.com/MichaelMure/git-bug/bridge/core"
"github.com/shurcooL/githubv4"
+
+ "github.com/MichaelMure/git-bug/bridge/core"
)
var _ Client = &githubv4.Client{}
@@ -29,79 +30,69 @@ func newRateLimitHandlerClient(httpClient *http.Client) *rateLimitHandlerClient
return &rateLimitHandlerClient{sc: githubv4.NewClient(httpClient)}
}
-type RateLimitingEvent struct {
- msg string
-}
-
-// mutate calls the github api with a graphql mutation and for each rate limiting event it sends an
-// export result.
+// mutate calls the github api with a graphql mutation and sends a core.ExportResult for each rate limiting event
func (c *rateLimitHandlerClient) mutate(ctx context.Context, m interface{}, input githubv4.Input, vars map[string]interface{}, out chan<- core.ExportResult) error {
// prepare a closure for the mutation
mutFun := func(ctx context.Context) error {
return c.sc.Mutate(ctx, m, input, vars)
}
- limitEvents := make(chan RateLimitingEvent)
- defer close(limitEvents)
- go func() {
- for e := range limitEvents {
- select {
- case <-ctx.Done():
- return
- case out <- core.NewExportRateLimiting(e.msg):
- }
+ callback := func(msg string) {
+ select {
+ case <-ctx.Done():
+ case out <- core.NewExportRateLimiting(msg):
}
- }()
- return c.callAPIAndRetry(mutFun, ctx, limitEvents)
+ }
+ return c.callAPIAndRetry(ctx, mutFun, callback)
}
-// queryWithLimitEvents calls the github api with a graphql query and it sends rate limiting events
-// to a given channel of type RateLimitingEvent.
-func (c *rateLimitHandlerClient) queryWithLimitEvents(ctx context.Context, query interface{}, vars map[string]interface{}, limitEvents chan<- RateLimitingEvent) error {
- // prepare a closure fot the query
+// queryImport calls the github api with a graphql query, and sends an ImportEvent for each rate limiting event
+func (c *rateLimitHandlerClient) queryImport(ctx context.Context, query interface{}, vars map[string]interface{}, importEvents chan<- ImportEvent) error {
+ // prepare a closure for the query
queryFun := func(ctx context.Context) error {
return c.sc.Query(ctx, query, vars)
}
- return c.callAPIAndRetry(queryFun, ctx, limitEvents)
+ callback := func(msg string) {
+ select {
+ case <-ctx.Done():
+ case importEvents <- RateLimitingEvent{msg}:
+ }
+ }
+ return c.callAPIAndRetry(ctx, queryFun, callback)
}
-// queryWithImportEvents calls the github api with a graphql query and it sends rate limiting events
-// to a given channel of type ImportEvent.
-func (c *rateLimitHandlerClient) queryWithImportEvents(ctx context.Context, query interface{}, vars map[string]interface{}, importEvents chan<- ImportEvent) error {
- // forward rate limiting events to channel of import events
- limitEvents := make(chan RateLimitingEvent)
- defer close(limitEvents)
- go func() {
- for e := range limitEvents {
- select {
- case <-ctx.Done():
- return
- case importEvents <- e:
- }
+// queryImport calls the github api with a graphql query, and sends a core.ExportResult for each rate limiting event
+func (c *rateLimitHandlerClient) queryExport(ctx context.Context, query interface{}, vars map[string]interface{}, out chan<- core.ExportResult) error {
+ // prepare a closure for the query
+ queryFun := func(ctx context.Context) error {
+ return c.sc.Query(ctx, query, vars)
+ }
+ callback := func(msg string) {
+ select {
+ case <-ctx.Done():
+ case out <- core.NewExportRateLimiting(msg):
}
- }()
- return c.queryWithLimitEvents(ctx, query, vars, limitEvents)
+ }
+ return c.callAPIAndRetry(ctx, queryFun, callback)
}
-// queryPrintMsgs calls the github api with a graphql query and it prints for ever rate limiting
-// event a message to stdout.
+// queryPrintMsgs calls the github api with a graphql query, and prints a message to stdout for every rate limiting event .
func (c *rateLimitHandlerClient) queryPrintMsgs(ctx context.Context, query interface{}, vars map[string]interface{}) error {
- // print rate limiting events directly to stdout.
- limitEvents := make(chan RateLimitingEvent)
- defer close(limitEvents)
- go func() {
- for e := range limitEvents {
- fmt.Println(e.msg)
- }
- }()
- return c.queryWithLimitEvents(ctx, query, vars, limitEvents)
+ // prepare a closure for the query
+ queryFun := func(ctx context.Context) error {
+ return c.sc.Query(ctx, query, vars)
+ }
+ callback := func(msg string) {
+ fmt.Println(msg)
+ }
+ return c.callAPIAndRetry(ctx, queryFun, callback)
}
// callAPIAndRetry calls the Github GraphQL API (inderectely through callAPIDealWithLimit) and in
// case of error it repeats the request to the Github API. The parameter `apiCall` is intended to be
// a closure containing a query or a mutation to the Github GraphQL API.
-func (c *rateLimitHandlerClient) callAPIAndRetry(apiCall func(context.Context) error, ctx context.Context, events chan<- RateLimitingEvent) error {
+func (c *rateLimitHandlerClient) callAPIAndRetry(ctx context.Context, apiCall func(context.Context) error, rateLimitEvent func(msg string)) error {
var err error
- if err = c.callAPIDealWithLimit(apiCall, ctx, events); err == nil {
+ if err = c.callAPIDealWithLimit(ctx, apiCall, rateLimitEvent); err == nil {
return nil
}
// failure; the reason may be temporary network problems or internal errors
@@ -117,7 +108,7 @@ func (c *rateLimitHandlerClient) callAPIAndRetry(apiCall func(context.Context) e
stop(timer)
return ctx.Err()
case <-timer.C:
- err = c.callAPIDealWithLimit(apiCall, ctx, events)
+ err = c.callAPIDealWithLimit(ctx, apiCall, rateLimitEvent)
if err == nil {
return nil
}
@@ -127,10 +118,10 @@ func (c *rateLimitHandlerClient) callAPIAndRetry(apiCall func(context.Context) e
}
// callAPIDealWithLimit calls the Github GraphQL API and if the Github API returns a rate limiting
-// error, then it waits until the rate limit is reset and it repeats the request to the API. The
+// error, then it waits until the rate limit is reset, and it repeats the request to the API. The
// parameter `apiCall` is intended to be a closure containing a query or a mutation to the Github
// GraphQL API.
-func (c *rateLimitHandlerClient) callAPIDealWithLimit(apiCall func(context.Context) error, ctx context.Context, events chan<- RateLimitingEvent) error {
+func (c *rateLimitHandlerClient) callAPIDealWithLimit(ctx context.Context, apiCall func(context.Context) error, rateLimitCallback func(msg string)) error {
qctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
// call the function fun()
@@ -155,11 +146,8 @@ func (c *rateLimitHandlerClient) callAPIDealWithLimit(apiCall func(context.Conte
resetTime.String(),
)
// Send message about rate limiting event.
- select {
- case <-ctx.Done():
- return ctx.Err()
- case events <- RateLimitingEvent{msg}:
- }
+ rateLimitCallback(msg)
+
// Pause current goroutine
timer := time.NewTimer(time.Until(resetTime))
select {
diff --git a/bridge/github/export.go b/bridge/github/export.go
index 8c40eb74..35d456c2 100644
--- a/bridge/github/export.go
+++ b/bridge/github/export.go
@@ -486,23 +486,10 @@ func (ge *githubExporter) cacheGithubLabels(ctx context.Context, gc *rateLimitHa
}
q := labelsQuery{}
- // When performing the queries we have to forward rate limiting events to the
- // current channel of export results.
- events := make(chan RateLimitingEvent)
- defer close(events)
- go func() {
- for e := range events {
- select {
- case <-ctx.Done():
- return
- case ge.out <- core.NewExportRateLimiting(e.msg):
- }
- }
- }()
hasNextPage := true
for hasNextPage {
- if err := gc.queryWithLimitEvents(ctx, &q, variables, events); err != nil {
+ if err := gc.queryExport(ctx, &q, variables, ge.out); err != nil {
return err
}
diff --git a/bridge/github/import_events.go b/bridge/github/import_events.go
new file mode 100644
index 00000000..7ae86d75
--- /dev/null
+++ b/bridge/github/import_events.go
@@ -0,0 +1,40 @@
+package github
+
+import "github.com/shurcooL/githubv4"
+
+type ImportEvent interface {
+ isImportEvent()
+}
+
+type RateLimitingEvent struct {
+ msg string
+}
+
+func (RateLimitingEvent) isImportEvent() {}
+
+type IssueEvent struct {
+ issue
+}
+
+func (IssueEvent) isImportEvent() {}
+
+type IssueEditEvent struct {
+ issueId githubv4.ID
+ userContentEdit
+}
+
+func (IssueEditEvent) isImportEvent() {}
+
+type TimelineEvent struct {
+ issueId githubv4.ID
+ timelineItem
+}
+
+func (TimelineEvent) isImportEvent() {}
+
+type CommentEditEvent struct {
+ commentId githubv4.ID
+ userContentEdit
+}
+
+func (CommentEditEvent) isImportEvent() {}
diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go
index db9f877c..be4e3880 100644
--- a/bridge/github/import_mediator.go
+++ b/bridge/github/import_mediator.go
@@ -9,6 +9,7 @@ import (
const (
// These values influence how fast the github graphql rate limit is exhausted.
+
NumIssues = 40
NumIssueEdits = 100
NumTimelineItems = 100
@@ -41,43 +42,6 @@ type importMediator struct {
err error
}
-type ImportEvent interface {
- isImportEvent()
-}
-
-func (RateLimitingEvent) isImportEvent() {}
-
-type IssueEvent struct {
- issue
-}
-
-func (IssueEvent) isImportEvent() {}
-
-type IssueEditEvent struct {
- issueId githubv4.ID
- userContentEdit
-}
-
-func (IssueEditEvent) isImportEvent() {}
-
-type TimelineEvent struct {
- issueId githubv4.ID
- timelineItem
-}
-
-func (TimelineEvent) isImportEvent() {}
-
-type CommentEditEvent struct {
- commentId githubv4.ID
- userContentEdit
-}
-
-func (CommentEditEvent) isImportEvent() {}
-
-func (mm *importMediator) NextImportEvent() ImportEvent {
- return <-mm.importEvents
-}
-
func NewImportMediator(ctx context.Context, client *rateLimitHandlerClient, owner, project string, since time.Time) *importMediator {
mm := importMediator{
gh: client,
@@ -87,48 +51,24 @@ func NewImportMediator(ctx context.Context, client *rateLimitHandlerClient, owne
importEvents: make(chan ImportEvent, ChanCapacity),
err: nil,
}
- go func() {
- mm.fillImportEvents(ctx)
- close(mm.importEvents)
- }()
- return &mm
-}
-type varmap map[string]interface{}
+ go mm.start(ctx)
-func newIssueVars(owner, project string, since time.Time) varmap {
- return varmap{
- "owner": githubv4.String(owner),
- "name": githubv4.String(project),
- "issueSince": githubv4.DateTime{Time: since},
- "issueFirst": githubv4.Int(NumIssues),
- "issueEditLast": githubv4.Int(NumIssueEdits),
- "issueEditBefore": (*githubv4.String)(nil),
- "timelineFirst": githubv4.Int(NumTimelineItems),
- "timelineAfter": (*githubv4.String)(nil),
- "commentEditLast": githubv4.Int(NumCommentEdits),
- "commentEditBefore": (*githubv4.String)(nil),
- }
-}
-
-func newIssueEditVars() varmap {
- return varmap{
- "issueEditLast": githubv4.Int(NumIssueEdits),
- }
+ return &mm
}
-func newTimelineVars() varmap {
- return varmap{
- "timelineFirst": githubv4.Int(NumTimelineItems),
- "commentEditLast": githubv4.Int(NumCommentEdits),
- "commentEditBefore": (*githubv4.String)(nil),
- }
+func (mm *importMediator) start(ctx context.Context) {
+ ctx, cancel := context.WithCancel(ctx)
+ mm.fillImportEvents(ctx)
+ // Make sure we cancel everything when we are done, instead of relying on the parent context
+ // This should unblock pending send to the channel if the capacity was reached and avoid a panic/race when closing.
+ cancel()
+ close(mm.importEvents)
}
-func newCommentEditVars() varmap {
- return varmap{
- "commentEditLast": githubv4.Int(NumCommentEdits),
- }
+// NextImportEvent returns the next ImportEvent, or nil if done.
+func (mm *importMediator) NextImportEvent() ImportEvent {
+ return <-mm.importEvents
}
func (mm *importMediator) Error() error {
@@ -138,7 +78,7 @@ func (mm *importMediator) Error() error {
func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) {
query := userQuery{}
vars := varmap{"login": githubv4.String(loginName)}
- if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
+ if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil {
return nil, err
}
return &query.User, nil
@@ -200,7 +140,7 @@ func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID,
vars["issueEditBefore"] = cursor
}
query := issueEditQuery{}
- if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
+ if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil {
mm.err = err
return nil, false
}
@@ -244,7 +184,7 @@ func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cu
vars["timelineAfter"] = cursor
}
query := timelineQuery{}
- if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
+ if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil {
mm.err = err
return nil, false
}
@@ -294,7 +234,7 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID
vars["commentEditBefore"] = cursor
}
query := commentEditQuery{}
- if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
+ if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil {
mm.err = err
return nil, false
}
@@ -313,7 +253,7 @@ func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String
vars["issueAfter"] = cursor
}
query := issueQuery{}
- if err := mm.gh.queryWithImportEvents(ctx, &query, vars, mm.importEvents); err != nil {
+ if err := mm.gh.queryImport(ctx, &query, vars, mm.importEvents); err != nil {
mm.err = err
return nil, false
}
@@ -334,3 +274,41 @@ func reverse(eds []userContentEdit) chan userContentEdit {
}()
return ret
}
+
+// varmap is a container for Github API's pagination variables
+type varmap map[string]interface{}
+
+func newIssueVars(owner, project string, since time.Time) varmap {
+ return varmap{
+ "owner": githubv4.String(owner),
+ "name": githubv4.String(project),
+ "issueSince": githubv4.DateTime{Time: since},
+ "issueFirst": githubv4.Int(NumIssues),
+ "issueEditLast": githubv4.Int(NumIssueEdits),
+ "issueEditBefore": (*githubv4.String)(nil),
+ "timelineFirst": githubv4.Int(NumTimelineItems),
+ "timelineAfter": (*githubv4.String)(nil),
+ "commentEditLast": githubv4.Int(NumCommentEdits),
+ "commentEditBefore": (*githubv4.String)(nil),
+ }
+}
+
+func newIssueEditVars() varmap {
+ return varmap{
+ "issueEditLast": githubv4.Int(NumIssueEdits),
+ }
+}
+
+func newTimelineVars() varmap {
+ return varmap{
+ "timelineFirst": githubv4.Int(NumTimelineItems),
+ "commentEditLast": githubv4.Int(NumCommentEdits),
+ "commentEditBefore": (*githubv4.String)(nil),
+ }
+}
+
+func newCommentEditVars() varmap {
+ return varmap{
+ "commentEditLast": githubv4.Int(NumCommentEdits),
+ }
+}