diff options
-rw-r--r-- | bridge/github/import.go | 81 | ||||
-rw-r--r-- | bridge/github/import_mediator.go | 191 |
2 files changed, 209 insertions, 63 deletions
diff --git a/bridge/github/import.go b/bridge/github/import.go index d492488b..5337c474 100644 --- a/bridge/github/import.go +++ b/bridge/github/import.go @@ -56,10 +56,21 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, defer close(gi.out) // Loop over all matching issues - for bundle := range gi.mediator.Issues() { - issue := bundle.issue - issueEdits := bundle.issueEdits - timelineBundles := bundle.timelineBundles + for event := range gi.mediator.Issues { + var issue issue + var issueEdits <-chan userContentEditEvent + var timelineItems <-chan timelineEvent + switch e := event.(type) { + case messageEvent: + fmt.Println(e.msg) + continue + case issueData: + issue = e.issue + issueEdits = e.issueEdits + timelineItems = e.timelineItems + default: + panic(fmt.Sprint("Unknown event type")) + } // create issue b, err := gi.ensureIssue(ctx, repo, &issue, issueEdits) if err != nil { @@ -69,9 +80,19 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, } // loop over timeline items - for bundle := range timelineBundles { - item := bundle.timelineItem - edits := bundle.userContentEdits + for event := range timelineItems { + var item timelineItem + var edits <-chan userContentEditEvent + switch e := event.(type) { + case messageEvent: + fmt.Println(e.msg) + continue + case timelineData: + item = e.timelineItem + edits = e.userContentEdits + default: + panic(fmt.Sprint("Unknown event type")) + } err := gi.ensureTimelineItem(ctx, repo, b, &item, edits) if err != nil { err = fmt.Errorf("timeline item creation: %v", err) @@ -98,7 +119,27 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, return out, nil } -func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue, issueEdits <-chan userContentEdit) (*cache.BugCache, error) { +// getNextUserContentEdit reads the input channel, handles messages, and returns the next +// userContentEditData. +func getNextUserContentEdit(in <-chan userContentEditEvent) (*userContentEditData, bool) { + for { + event, hasEvent := <-in + if !hasEvent { + return nil, false + } + switch e := event.(type) { + case messageEvent: + fmt.Println(e.msg) + continue + case userContentEditData: + return &e, true + default: + panic(fmt.Sprint("Unknown event type")) + } + } +} + +func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue, issueEditEvents <-chan userContentEditEvent) (*cache.BugCache, error) { author, err := gi.ensurePerson(ctx, repo, issue.Author) if err != nil { return nil, err @@ -115,7 +156,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache // get first issue edit // if it exists, then it holds the bug creation - firstEdit, hasEdit := <-issueEdits + firstEdit, hasEdit := getNextUserContentEdit(issueEditEvents) // At Github there exist issues with seemingly empty titles. An example is // https://github.com/NixOS/nixpkgs/issues/72730 . @@ -162,7 +203,11 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache return nil, fmt.Errorf("finding or creating issue") } // process remaining issue edits, if they exist - for edit := range issueEdits { + for { + edit, hasEdit := getNextUserContentEdit(issueEditEvents) + if !hasEdit { + break + } // other edits will be added as CommentEdit operations target, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(issue.Id)) if err == cache.ErrNoMatchingOp { @@ -174,7 +219,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache return nil, err } - err = gi.ensureCommentEdit(ctx, repo, b, target, &edit) + err = gi.ensureCommentEdit(ctx, repo, b, target, &edit.userContentEdit) if err != nil { return nil, err } @@ -182,7 +227,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache return b, nil } -func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item *timelineItem, commentEdits <-chan userContentEdit) error { +func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item *timelineItem, commentEdits <-chan userContentEditEvent) error { switch item.Typename { case "IssueComment": @@ -345,7 +390,7 @@ func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.Re return nil } -func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment, commentEdits <-chan userContentEdit) error { +func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment, commentEditEvents <-chan userContentEditEvent) error { author, err := gi.ensurePerson(ctx, repo, comment.Author) if err != nil { return err @@ -356,7 +401,7 @@ func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCac // real error return err } - firstEdit, hasEdit := <-commentEdits + firstEdit, hasEdit := getNextUserContentEdit(commentEditEvents) if err == cache.ErrNoMatchingOp { var textInput string if hasEdit { @@ -393,14 +438,18 @@ func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCac return fmt.Errorf("finding or creating issue comment") } // process remaining comment edits, if they exist - for edit := range commentEdits { + for { + edit, hasEdit := getNextUserContentEdit(commentEditEvents) + if !hasEdit { + break + } // ensure editor identity _, err := gi.ensurePerson(ctx, repo, edit.Editor) if err != nil { return err } - err = gi.ensureCommentEdit(ctx, repo, b, targetOpID, &edit) + err = gi.ensureCommentEdit(ctx, repo, b, targetOpID, &edit.userContentEdit) if err != nil { return err } diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go index 8d1796b0..7da62968 100644 --- a/bridge/github/import_mediator.go +++ b/bridge/github/import_mediator.go @@ -34,9 +34,10 @@ type importMediator struct { // given date should be imported. since time.Time - // issues is a channel holding bundles of issues to be imported. Each bundle holds the data - // associated with one issue. - issues chan issueBundle + // Issues is a channel holding bundles of Issues to be imported. Each issueEvent + // is either a message (type messageEvent) or a struct holding all the data associated with + // one issue (type issueData). + Issues chan issueEvent // Sticky error err error @@ -45,17 +46,46 @@ type importMediator struct { errMut sync.Mutex } -type issueBundle struct { - issue issue - issueEdits <-chan userContentEdit - timelineBundles <-chan timelineBundle +type issueEvent interface { + isIssueEvent() +} +type timelineEvent interface { + isTimelineEvent() +} +type userContentEditEvent interface { + isUserContentEditEvent() } -type timelineBundle struct { - timelineItem timelineItem - userContentEdits <-chan userContentEdit +type messageEvent struct { + msg string } +func (messageEvent) isIssueEvent() {} +func (messageEvent) isUserContentEditEvent() {} +func (messageEvent) isTimelineEvent() {} + +type issueData struct { + issue + issueEdits <-chan userContentEditEvent + timelineItems <-chan timelineEvent +} + +func (issueData) isIssueEvent() {} + +type timelineData struct { + timelineItem + userContentEdits <-chan userContentEditEvent +} + +func (timelineData) isTimelineEvent() {} + +type userContentEditData struct { + userContentEdit +} + +// func (userContentEditData) isEvent() +func (userContentEditData) isUserContentEditEvent() {} + func (mm *importMediator) setError(err error) { mm.errMut.Lock() mm.err = err @@ -68,12 +98,12 @@ func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, proj owner: owner, project: project, since: since, - issues: make(chan issueBundle, CHAN_CAPACITY), + Issues: make(chan issueEvent, CHAN_CAPACITY), err: nil, } go func() { mm.fillIssues(ctx) - close(mm.issues) + close(mm.Issues) }() return &mm } @@ -115,10 +145,6 @@ func newCommentEditVars() varmap { } } -func (mm *importMediator) Issues() <-chan issueBundle { - return mm.issues -} - func (mm *importMediator) Error() error { mm.errMut.Lock() err := mm.err @@ -129,25 +155,49 @@ func (mm *importMediator) Error() error { func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) { query := userQuery{} vars := varmap{"login": githubv4.String(loginName)} - if err := mm.mQuery(ctx, &query, vars); err != nil { + // handle message events localy + channel := make(chan messageEvent) + defer close(channel) + // print all messages immediately + go func() { + for event := range channel { + fmt.Println(event.msg) + } + }() + if err := mm.mQuery(ctx, &query, vars, channel); err != nil { return nil, err } return &query.User, nil } func (mm *importMediator) fillIssues(ctx context.Context) { + // First: setup message handling while submitting GraphQL queries. + msgs := make(chan messageEvent) + defer close(msgs) + // forward all the messages to the issue channel. The message will be queued after + // all the issues which has been completed. + go func() { + for msg := range msgs { + select { + case <-ctx.Done(): + return + case mm.Issues <- msg: + } + } + }() + // start processing the actual issues initialCursor := githubv4.String("") - issues, hasIssues := mm.queryIssue(ctx, initialCursor) + issues, hasIssues := mm.queryIssue(ctx, initialCursor, msgs) for hasIssues { for _, node := range issues.Nodes { // We need to send an issue-bundle over the issue channel before we start // filling the issue edits and the timeline items to avoid deadlocks. - issueEditChan := make(chan userContentEdit, CHAN_CAPACITY) - timelineBundleChan := make(chan timelineBundle, CHAN_CAPACITY) + issueEditChan := make(chan userContentEditEvent, CHAN_CAPACITY) + timelineBundleChan := make(chan timelineEvent, CHAN_CAPACITY) select { case <-ctx.Done(): return - case mm.issues <- issueBundle{node.issue, issueEditChan, timelineBundleChan}: + case mm.Issues <- issueData{node.issue, issueEditChan, timelineBundleChan}: } // We do not know whether the client reads from the issue edit channel @@ -166,11 +216,25 @@ func (mm *importMediator) fillIssues(ctx context.Context) { if !issues.PageInfo.HasNextPage { break } - issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor) + issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor, msgs) } } -func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNode, channel chan userContentEdit) { +func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNode, channel chan<- userContentEditEvent) { + // First: setup message handling while submitting GraphQL queries. + msgs := make(chan messageEvent) + defer close(msgs) + // forward all the messages to the issue-edit channel. The message will be queued after + // all the issue edits which have been completed. + go func() { + for msg := range msgs { + select { + case <-ctx.Done(): + return + case channel <- msg: + } + } + }() edits := &issueNode.UserContentEdits hasEdits := true for hasEdits { @@ -184,17 +248,31 @@ func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNo select { case <-ctx.Done(): return - case channel <- edit: + case channel <- userContentEditData{edit}: } } if !edits.PageInfo.HasPreviousPage { break } - edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor) + edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor, msgs) } } -func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan timelineBundle) { +func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan<- timelineEvent) { + // First: setup message handling while submitting GraphQL queries. + msgs := make(chan messageEvent) + defer close(msgs) + // forward all the messages to the timeline channel. The message will be queued after + // all the timeline items which have been completed. + go func() { + for msg := range msgs { + select { + case <-ctx.Done(): + return + case channel <- msg: + } + } + }() items := &issueNode.TimelineItems hasItems := true for hasItems { @@ -205,11 +283,11 @@ func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode // // Send over the timeline-channel before starting to fill the comment // edits. - commentEditChan := make(chan userContentEdit, CHAN_CAPACITY) + commentEditChan := make(chan userContentEditEvent, CHAN_CAPACITY) select { case <-ctx.Done(): return - case channel <- timelineBundle{item, commentEditChan}: + case channel <- timelineData{item, commentEditChan}: } // We need to create a new goroutine for filling the comment edit // channel. @@ -221,22 +299,36 @@ func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode select { case <-ctx.Done(): return - case channel <- timelineBundle{item, nil}: + case channel <- timelineData{item, nil}: } } } if !items.PageInfo.HasNextPage { break } - items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor) + items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor, msgs) } } -func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem, channel chan userContentEdit) { +func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem, channel chan<- userContentEditEvent) { // Here we are only concerned with timeline items of type issueComment. if item.Typename != "IssueComment" { return } + // First: setup message handling while submitting GraphQL queries. + msgs := make(chan messageEvent) + defer close(msgs) + // forward all the messages to the user content edit channel. The message will be queued after + // all the user content edits which have been completed already. + go func() { + for msg := range msgs { + select { + case <-ctx.Done(): + return + case channel <- msg: + } + } + }() comment := &item.IssueComment edits := &comment.UserContentEdits hasEdits := true @@ -251,17 +343,17 @@ func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineIt select { case <-ctx.Done(): return - case channel <- edit: + case channel <- userContentEditData{edit}: } } if !edits.PageInfo.HasPreviousPage { break } - edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor) + edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor, msgs) } } -func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) { +func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*userContentEditConnection, bool) { vars := newCommentEditVars() vars["gqlNodeId"] = nid if cursor == "" { @@ -270,7 +362,7 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID vars["commentEditBefore"] = cursor } query := commentEditQuery{} - if err := mm.mQuery(ctx, &query, vars); err != nil { + if err := mm.mQuery(ctx, &query, vars, msgs); err != nil { mm.setError(err) return nil, false } @@ -281,7 +373,7 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID return connection, true } -func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) { +func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*timelineItemsConnection, bool) { vars := newTimelineVars() vars["gqlNodeId"] = nid if cursor == "" { @@ -290,7 +382,7 @@ func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cu vars["timelineAfter"] = cursor } query := timelineQuery{} - if err := mm.mQuery(ctx, &query, vars); err != nil { + if err := mm.mQuery(ctx, &query, vars, msgs); err != nil { mm.setError(err) return nil, false } @@ -301,7 +393,7 @@ func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cu return connection, true } -func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) { +func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*userContentEditConnection, bool) { vars := newIssueEditVars() vars["gqlNodeId"] = nid if cursor == "" { @@ -310,7 +402,7 @@ func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, vars["issueEditBefore"] = cursor } query := issueEditQuery{} - if err := mm.mQuery(ctx, &query, vars); err != nil { + if err := mm.mQuery(ctx, &query, vars, msgs); err != nil { mm.setError(err) return nil, false } @@ -321,7 +413,7 @@ func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, return connection, true } -func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) { +func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String, msgs chan<- messageEvent) (*issueConnection, bool) { vars := newIssueVars(mm.owner, mm.project, mm.since) if cursor == "" { vars["issueAfter"] = (*githubv4.String)(nil) @@ -329,7 +421,7 @@ func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String vars["issueAfter"] = githubv4.String(cursor) } query := issueQuery{} - if err := mm.mQuery(ctx, &query, vars); err != nil { + if err := mm.mQuery(ctx, &query, vars, msgs); err != nil { mm.setError(err) return nil, false } @@ -360,20 +452,20 @@ type rateLimiter interface { // corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If // there is a Github rate limiting error, then the function sleeps and retries after the rate limit // is expired. If there is another error, then the method will retry before giving up. -func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error { - if err := mm.queryOnce(ctx, query, vars); err == nil { +func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}, msgs chan<- messageEvent) error { + if err := mm.queryOnce(ctx, query, vars, msgs); err == nil { // success: done return nil } // failure: we will retry - // This is important for importing projects with a big number of issues. + // To retry is important for importing projects with a big number of issues. retries := 3 var err error for i := 0; i < retries; i++ { // wait a few seconds before retry sleepTime := 8 * (i + 1) time.Sleep(time.Duration(sleepTime) * time.Second) - err = mm.queryOnce(ctx, query, vars) + err = mm.queryOnce(ctx, query, vars, msgs) if err == nil { // success: done return nil @@ -382,7 +474,7 @@ func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars ma return err } -func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}) error { +func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}, msgs chan<- messageEvent) error { // first: just send the query to the graphql api vars["dryRun"] = githubv4.Boolean(false) qctx, cancel := context.WithTimeout(ctx, defaultTimeout) @@ -411,7 +503,12 @@ func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars resetTime := rateLimit.ResetAt.Time // Add a few seconds (8) for good measure resetTime = resetTime.Add(8 * time.Second) - fmt.Printf("Github rate limit exhausted. Sleeping until %s\n", resetTime.String()) + msg := fmt.Sprintf("Github GraphQL API rate limit exhausted. Sleeping until %s", resetTime.String()) + select { + case <-ctx.Done(): + return ctx.Err() + case msgs <- messageEvent{msg}: + } timer := time.NewTimer(time.Until(resetTime)) select { case <-ctx.Done(): |