aboutsummaryrefslogtreecommitdiffstats
path: root/bridge/github/import_mediator.go
diff options
context:
space:
mode:
authorAlexander Scharinger <rng.dynamics@gmail.com>2021-03-17 19:29:39 +0100
committerAlexander Scharinger <rng.dynamics@gmail.com>2021-03-18 20:59:53 +0100
commit52fba350d6d127d5c50aca34aabcca1ef0d26d75 (patch)
tree24ad5ab613dc675f9bfe4da45cbd97e8dac4f06d /bridge/github/import_mediator.go
parentd7f555b4374eee2ecdc144283a73327c931f09f1 (diff)
downloadgit-bug-52fba350d6d127d5c50aca34aabcca1ef0d26d75.tar.gz
Github bridge: send message to user when waiting
When the Github GraphQL API rate limit is exhausted print a message at the bottom of the terminal so the user knows why the import has been paused.
Diffstat (limited to 'bridge/github/import_mediator.go')
-rw-r--r--bridge/github/import_mediator.go191
1 files changed, 144 insertions, 47 deletions
diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go
index 8d1796b0..7da62968 100644
--- a/bridge/github/import_mediator.go
+++ b/bridge/github/import_mediator.go
@@ -34,9 +34,10 @@ type importMediator struct {
// given date should be imported.
since time.Time
- // issues is a channel holding bundles of issues to be imported. Each bundle holds the data
- // associated with one issue.
- issues chan issueBundle
+ // Issues is a channel holding bundles of Issues to be imported. Each issueEvent
+ // is either a message (type messageEvent) or a struct holding all the data associated with
+ // one issue (type issueData).
+ Issues chan issueEvent
// Sticky error
err error
@@ -45,17 +46,46 @@ type importMediator struct {
errMut sync.Mutex
}
-type issueBundle struct {
- issue issue
- issueEdits <-chan userContentEdit
- timelineBundles <-chan timelineBundle
+type issueEvent interface {
+ isIssueEvent()
+}
+type timelineEvent interface {
+ isTimelineEvent()
+}
+type userContentEditEvent interface {
+ isUserContentEditEvent()
}
-type timelineBundle struct {
- timelineItem timelineItem
- userContentEdits <-chan userContentEdit
+type messageEvent struct {
+ msg string
}
+func (messageEvent) isIssueEvent() {}
+func (messageEvent) isUserContentEditEvent() {}
+func (messageEvent) isTimelineEvent() {}
+
+type issueData struct {
+ issue
+ issueEdits <-chan userContentEditEvent
+ timelineItems <-chan timelineEvent
+}
+
+func (issueData) isIssueEvent() {}
+
+type timelineData struct {
+ timelineItem
+ userContentEdits <-chan userContentEditEvent
+}
+
+func (timelineData) isTimelineEvent() {}
+
+type userContentEditData struct {
+ userContentEdit
+}
+
+// func (userContentEditData) isEvent()
+func (userContentEditData) isUserContentEditEvent() {}
+
func (mm *importMediator) setError(err error) {
mm.errMut.Lock()
mm.err = err
@@ -68,12 +98,12 @@ func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, proj
owner: owner,
project: project,
since: since,
- issues: make(chan issueBundle, CHAN_CAPACITY),
+ Issues: make(chan issueEvent, CHAN_CAPACITY),
err: nil,
}
go func() {
mm.fillIssues(ctx)
- close(mm.issues)
+ close(mm.Issues)
}()
return &mm
}
@@ -115,10 +145,6 @@ func newCommentEditVars() varmap {
}
}
-func (mm *importMediator) Issues() <-chan issueBundle {
- return mm.issues
-}
-
func (mm *importMediator) Error() error {
mm.errMut.Lock()
err := mm.err
@@ -129,25 +155,49 @@ func (mm *importMediator) Error() error {
func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) {
query := userQuery{}
vars := varmap{"login": githubv4.String(loginName)}
- if err := mm.mQuery(ctx, &query, vars); err != nil {
+ // handle message events localy
+ channel := make(chan messageEvent)
+ defer close(channel)
+ // print all messages immediately
+ go func() {
+ for event := range channel {
+ fmt.Println(event.msg)
+ }
+ }()
+ if err := mm.mQuery(ctx, &query, vars, channel); err != nil {
return nil, err
}
return &query.User, nil
}
func (mm *importMediator) fillIssues(ctx context.Context) {
+ // First: setup message handling while submitting GraphQL queries.
+ msgs := make(chan messageEvent)
+ defer close(msgs)
+ // forward all the messages to the issue channel. The message will be queued after
+ // all the issues which has been completed.
+ go func() {
+ for msg := range msgs {
+ select {
+ case <-ctx.Done():
+ return
+ case mm.Issues <- msg:
+ }
+ }
+ }()
+ // start processing the actual issues
initialCursor := githubv4.String("")
- issues, hasIssues := mm.queryIssue(ctx, initialCursor)
+ issues, hasIssues := mm.queryIssue(ctx, initialCursor, msgs)
for hasIssues {
for _, node := range issues.Nodes {
// We need to send an issue-bundle over the issue channel before we start
// filling the issue edits and the timeline items to avoid deadlocks.
- issueEditChan := make(chan userContentEdit, CHAN_CAPACITY)
- timelineBundleChan := make(chan timelineBundle, CHAN_CAPACITY)
+ issueEditChan := make(chan userContentEditEvent, CHAN_CAPACITY)
+ timelineBundleChan := make(chan timelineEvent, CHAN_CAPACITY)
select {
case <-ctx.Done():
return
- case mm.issues <- issueBundle{node.issue, issueEditChan, timelineBundleChan}:
+ case mm.Issues <- issueData{node.issue, issueEditChan, timelineBundleChan}:
}
// We do not know whether the client reads from the issue edit channel
@@ -166,11 +216,25 @@ func (mm *importMediator) fillIssues(ctx context.Context) {
if !issues.PageInfo.HasNextPage {
break
}
- issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor)
+ issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor, msgs)
}
}
-func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNode, channel chan userContentEdit) {
+func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNode, channel chan<- userContentEditEvent) {
+ // First: setup message handling while submitting GraphQL queries.
+ msgs := make(chan messageEvent)
+ defer close(msgs)
+ // forward all the messages to the issue-edit channel. The message will be queued after
+ // all the issue edits which have been completed.
+ go func() {
+ for msg := range msgs {
+ select {
+ case <-ctx.Done():
+ return
+ case channel <- msg:
+ }
+ }
+ }()
edits := &issueNode.UserContentEdits
hasEdits := true
for hasEdits {
@@ -184,17 +248,31 @@ func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNo
select {
case <-ctx.Done():
return
- case channel <- edit:
+ case channel <- userContentEditData{edit}:
}
}
if !edits.PageInfo.HasPreviousPage {
break
}
- edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor)
+ edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor, msgs)
}
}
-func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan timelineBundle) {
+func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan<- timelineEvent) {
+ // First: setup message handling while submitting GraphQL queries.
+ msgs := make(chan messageEvent)
+ defer close(msgs)
+ // forward all the messages to the timeline channel. The message will be queued after
+ // all the timeline items which have been completed.
+ go func() {
+ for msg := range msgs {
+ select {
+ case <-ctx.Done():
+ return
+ case channel <- msg:
+ }
+ }
+ }()
items := &issueNode.TimelineItems
hasItems := true
for hasItems {
@@ -205,11 +283,11 @@ func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode
//
// Send over the timeline-channel before starting to fill the comment
// edits.
- commentEditChan := make(chan userContentEdit, CHAN_CAPACITY)
+ commentEditChan := make(chan userContentEditEvent, CHAN_CAPACITY)
select {
case <-ctx.Done():
return
- case channel <- timelineBundle{item, commentEditChan}:
+ case channel <- timelineData{item, commentEditChan}:
}
// We need to create a new goroutine for filling the comment edit
// channel.
@@ -221,22 +299,36 @@ func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode
select {
case <-ctx.Done():
return
- case channel <- timelineBundle{item, nil}:
+ case channel <- timelineData{item, nil}:
}
}
}
if !items.PageInfo.HasNextPage {
break
}
- items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor)
+ items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor, msgs)
}
}
-func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem, channel chan userContentEdit) {
+func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem, channel chan<- userContentEditEvent) {
// Here we are only concerned with timeline items of type issueComment.
if item.Typename != "IssueComment" {
return
}
+ // First: setup message handling while submitting GraphQL queries.
+ msgs := make(chan messageEvent)
+ defer close(msgs)
+ // forward all the messages to the user content edit channel. The message will be queued after
+ // all the user content edits which have been completed already.
+ go func() {
+ for msg := range msgs {
+ select {
+ case <-ctx.Done():
+ return
+ case channel <- msg:
+ }
+ }
+ }()
comment := &item.IssueComment
edits := &comment.UserContentEdits
hasEdits := true
@@ -251,17 +343,17 @@ func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineIt
select {
case <-ctx.Done():
return
- case channel <- edit:
+ case channel <- userContentEditData{edit}:
}
}
if !edits.PageInfo.HasPreviousPage {
break
}
- edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor)
+ edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor, msgs)
}
}
-func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
+func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*userContentEditConnection, bool) {
vars := newCommentEditVars()
vars["gqlNodeId"] = nid
if cursor == "" {
@@ -270,7 +362,7 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID
vars["commentEditBefore"] = cursor
}
query := commentEditQuery{}
- if err := mm.mQuery(ctx, &query, vars); err != nil {
+ if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
mm.setError(err)
return nil, false
}
@@ -281,7 +373,7 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID
return connection, true
}
-func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) {
+func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*timelineItemsConnection, bool) {
vars := newTimelineVars()
vars["gqlNodeId"] = nid
if cursor == "" {
@@ -290,7 +382,7 @@ func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cu
vars["timelineAfter"] = cursor
}
query := timelineQuery{}
- if err := mm.mQuery(ctx, &query, vars); err != nil {
+ if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
mm.setError(err)
return nil, false
}
@@ -301,7 +393,7 @@ func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cu
return connection, true
}
-func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
+func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*userContentEditConnection, bool) {
vars := newIssueEditVars()
vars["gqlNodeId"] = nid
if cursor == "" {
@@ -310,7 +402,7 @@ func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID,
vars["issueEditBefore"] = cursor
}
query := issueEditQuery{}
- if err := mm.mQuery(ctx, &query, vars); err != nil {
+ if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
mm.setError(err)
return nil, false
}
@@ -321,7 +413,7 @@ func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID,
return connection, true
}
-func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) {
+func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String, msgs chan<- messageEvent) (*issueConnection, bool) {
vars := newIssueVars(mm.owner, mm.project, mm.since)
if cursor == "" {
vars["issueAfter"] = (*githubv4.String)(nil)
@@ -329,7 +421,7 @@ func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String
vars["issueAfter"] = githubv4.String(cursor)
}
query := issueQuery{}
- if err := mm.mQuery(ctx, &query, vars); err != nil {
+ if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
mm.setError(err)
return nil, false
}
@@ -360,20 +452,20 @@ type rateLimiter interface {
// corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If
// there is a Github rate limiting error, then the function sleeps and retries after the rate limit
// is expired. If there is another error, then the method will retry before giving up.
-func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
- if err := mm.queryOnce(ctx, query, vars); err == nil {
+func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}, msgs chan<- messageEvent) error {
+ if err := mm.queryOnce(ctx, query, vars, msgs); err == nil {
// success: done
return nil
}
// failure: we will retry
- // This is important for importing projects with a big number of issues.
+ // To retry is important for importing projects with a big number of issues.
retries := 3
var err error
for i := 0; i < retries; i++ {
// wait a few seconds before retry
sleepTime := 8 * (i + 1)
time.Sleep(time.Duration(sleepTime) * time.Second)
- err = mm.queryOnce(ctx, query, vars)
+ err = mm.queryOnce(ctx, query, vars, msgs)
if err == nil {
// success: done
return nil
@@ -382,7 +474,7 @@ func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars ma
return err
}
-func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
+func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}, msgs chan<- messageEvent) error {
// first: just send the query to the graphql api
vars["dryRun"] = githubv4.Boolean(false)
qctx, cancel := context.WithTimeout(ctx, defaultTimeout)
@@ -411,7 +503,12 @@ func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars
resetTime := rateLimit.ResetAt.Time
// Add a few seconds (8) for good measure
resetTime = resetTime.Add(8 * time.Second)
- fmt.Printf("Github rate limit exhausted. Sleeping until %s\n", resetTime.String())
+ msg := fmt.Sprintf("Github GraphQL API rate limit exhausted. Sleeping until %s", resetTime.String())
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case msgs <- messageEvent{msg}:
+ }
timer := time.NewTimer(time.Until(resetTime))
select {
case <-ctx.Done():