From 2646c63213cb4d1fa04e1b61051f4ac97c1978f0 Mon Sep 17 00:00:00 2001 From: Alexander Scharinger Date: Mon, 22 Mar 2021 19:26:59 +0100 Subject: Github bridge: Refactor --- bridge/github/import.go | 385 ++++++++++++++++++--------------------- bridge/github/import_mediator.go | 331 ++++++++++++--------------------- bridge/github/import_query.go | 22 ++- 3 files changed, 303 insertions(+), 435 deletions(-) diff --git a/bridge/github/import.go b/bridge/github/import.go index 5337c474..ceb35ef0 100644 --- a/bridge/github/import.go +++ b/bridge/github/import.go @@ -54,64 +54,80 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, go func() { defer close(gi.out) - - // Loop over all matching issues - for event := range gi.mediator.Issues { - var issue issue - var issueEdits <-chan userContentEditEvent - var timelineItems <-chan timelineEvent - switch e := event.(type) { - case messageEvent: - fmt.Println(e.msg) - continue - case issueData: - issue = e.issue - issueEdits = e.issueEdits - timelineItems = e.timelineItems - default: - panic(fmt.Sprint("Unknown event type")) + var currBug *cache.BugCache + var currEvent ImportEvent + var nextEvent ImportEvent + var err error + for { + // We need the current event and one look ahead event. + currEvent = nextEvent + if currEvent == nil { + currEvent = gi.mediator.NextImportEvent() } - // create issue - b, err := gi.ensureIssue(ctx, repo, &issue, issueEdits) - if err != nil { - err := fmt.Errorf("issue creation: %v", err) - out <- core.NewImportError(err, "") - return + if currEvent == nil { + break } - - // loop over timeline items - for event := range timelineItems { - var item timelineItem - var edits <-chan userContentEditEvent - switch e := event.(type) { - case messageEvent: - fmt.Println(e.msg) - continue - case timelineData: - item = e.timelineItem - edits = e.userContentEdits + nextEvent = gi.mediator.NextImportEvent() + + switch event := currEvent.(type) { + case MessageEvent: + fmt.Println(event.msg) + case IssueEvent: + // first: commit what is being held in currBug + if err = gi.commit(currBug, out); err != nil { + out <- core.NewImportError(err, "") + return + } + // second: create new issue + switch next := nextEvent.(type) { + case IssueEditEvent: + // consuming and using next event + nextEvent = nil + currBug, err = gi.ensureIssue(ctx, repo, &event.issue, &next.userContentEdit) default: - panic(fmt.Sprint("Unknown event type")) + currBug, err = gi.ensureIssue(ctx, repo, &event.issue, nil) + } + if err != nil { + err := fmt.Errorf("issue creation: %v", err) + out <- core.NewImportError(err, "") + return + } + case IssueEditEvent: + err = gi.ensureIssueEdit(ctx, repo, currBug, event.issueId, &event.userContentEdit) + if err != nil { + err = fmt.Errorf("issue edit: %v", err) + out <- core.NewImportError(err, "") + return + } + case TimelineEvent: + if next, ok := nextEvent.(CommentEditEvent); ok && event.Typename == "IssueComment" { + // consuming and using next event + nextEvent = nil + err = gi.ensureComment(ctx, repo, currBug, &event.timelineItem.IssueComment, &next.userContentEdit) + } else { + err = gi.ensureTimelineItem(ctx, repo, currBug, &event.timelineItem) } - err := gi.ensureTimelineItem(ctx, repo, b, &item, edits) if err != nil { err = fmt.Errorf("timeline item creation: %v", err) out <- core.NewImportError(err, "") return } - } - - if !b.NeedCommit() { - out <- core.NewImportNothing(b.Id(), "no imported operation") - } else if err := b.Commit(); err != nil { - // commit bug state - err = fmt.Errorf("bug commit: %v", err) - out <- core.NewImportError(err, "") - return + case CommentEditEvent: + err = gi.ensureCommentEdit(ctx, repo, currBug, event.commentId, &event.userContentEdit) + if err != nil { + err = fmt.Errorf("comment edit: %v", err) + out <- core.NewImportError(err, "") + return + } + default: + panic("Unknown event type") } } - - if err := gi.mediator.Error(); err != nil { + // commit what is being held in currBug before returning + if err = gi.commit(currBug, out); err != nil { + out <- core.NewImportError(err, "") + } + if err = gi.mediator.Error(); err != nil { gi.out <- core.NewImportError(err, "") } }() @@ -119,27 +135,21 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, return out, nil } -// getNextUserContentEdit reads the input channel, handles messages, and returns the next -// userContentEditData. -func getNextUserContentEdit(in <-chan userContentEditEvent) (*userContentEditData, bool) { - for { - event, hasEvent := <-in - if !hasEvent { - return nil, false - } - switch e := event.(type) { - case messageEvent: - fmt.Println(e.msg) - continue - case userContentEditData: - return &e, true - default: - panic(fmt.Sprint("Unknown event type")) - } +func (gi *githubImporter) commit(b *cache.BugCache, out chan<- core.ImportResult) error { + if b == nil { + return nil } + if !b.NeedCommit() { + out <- core.NewImportNothing(b.Id(), "no imported operation") + return nil + } else if err := b.Commit(); err != nil { + // commit bug state + return fmt.Errorf("bug commit: %v", err) + } + return nil } -func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue, issueEditEvents <-chan userContentEditEvent) (*cache.BugCache, error) { +func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue, issueEdit *userContentEdit) (*cache.BugCache, error) { author, err := gi.ensurePerson(ctx, repo, issue.Author) if err != nil { return nil, err @@ -150,14 +160,13 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache return excerpt.CreateMetadata[core.MetaKeyOrigin] == target && excerpt.CreateMetadata[metaKeyGithubId] == parseId(issue.Id) }) - if err != nil && err != bug.ErrBugNotExist { + if err == nil { + return b, nil + } + if err != bug.ErrBugNotExist { return nil, err } - // get first issue edit - // if it exists, then it holds the bug creation - firstEdit, hasEdit := getNextUserContentEdit(issueEditEvents) - // At Github there exist issues with seemingly empty titles. An example is // https://github.com/NixOS/nixpkgs/issues/72730 . // The title provided by the GraphQL API actually consists of a space followed by a @@ -168,70 +177,49 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache title = EMPTY_TITLE_PLACEHOLDER } - if err == bug.ErrBugNotExist { - var textInput string - if hasEdit { - // use the first issue edit: it represents the bug creation itself - textInput = string(*firstEdit.Diff) - } else { - // if there are no issue edits then the issue struct holds the bug creation - textInput = string(issue.Body) - } - cleanText, err := text.Cleanup(textInput) - if err != nil { - return nil, err - } - // create bug - b, _, err = repo.NewBugRaw( - author, - issue.CreatedAt.Unix(), - title, // TODO: this is the *current* title, not the original one - cleanText, - nil, - map[string]string{ - core.MetaKeyOrigin: target, - metaKeyGithubId: parseId(issue.Id), - metaKeyGithubUrl: issue.Url.String(), - }) - if err != nil { - return nil, err - } - // importing a new bug - gi.out <- core.NewImportBug(b.Id()) + var textInput string + if issueEdit != nil { + // use the first issue edit: it represents the bug creation itself + textInput = string(*issueEdit.Diff) + } else { + // if there are no issue edits then the issue struct holds the bug creation + textInput = string(issue.Body) } - if b == nil { - return nil, fmt.Errorf("finding or creating issue") + cleanText, err := text.Cleanup(textInput) + if err != nil { + return nil, err } - // process remaining issue edits, if they exist - for { - edit, hasEdit := getNextUserContentEdit(issueEditEvents) - if !hasEdit { - break - } - // other edits will be added as CommentEdit operations - target, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(issue.Id)) - if err == cache.ErrNoMatchingOp { - // original comment is missing somehow, issuing a warning - gi.out <- core.NewImportWarning(fmt.Errorf("comment ID %s to edit is missing", parseId(issue.Id)), b.Id()) - continue - } - if err != nil { - return nil, err - } - err = gi.ensureCommentEdit(ctx, repo, b, target, &edit.userContentEdit) - if err != nil { - return nil, err - } + // create bug + b, _, err = repo.NewBugRaw( + author, + issue.CreatedAt.Unix(), + title, // TODO: this is the *current* title, not the original one + cleanText, + nil, + map[string]string{ + core.MetaKeyOrigin: target, + metaKeyGithubId: parseId(issue.Id), + metaKeyGithubUrl: issue.Url.String(), + }) + if err != nil { + return nil, err } + // importing a new bug + gi.out <- core.NewImportBug(b.Id()) + return b, nil } -func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item *timelineItem, commentEdits <-chan userContentEditEvent) error { +func (gi *githubImporter) ensureIssueEdit(ctx context.Context, repo *cache.RepoCache, bug *cache.BugCache, ghIssueId githubv4.ID, edit *userContentEdit) error { + return gi.ensureCommentEdit(ctx, repo, bug, ghIssueId, edit) +} + +func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item *timelineItem) error { switch item.Typename { case "IssueComment": - err := gi.ensureComment(ctx, repo, b, &item.IssueComment, commentEdits) + err := gi.ensureComment(ctx, repo, b, &item.IssueComment, nil) if err != nil { return fmt.Errorf("timeline comment creation: %v", err) } @@ -390,75 +378,62 @@ func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.Re return nil } -func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment, commentEditEvents <-chan userContentEditEvent) error { - author, err := gi.ensurePerson(ctx, repo, comment.Author) +func (gi *githubImporter) ensureCommentEdit(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, ghTargetId githubv4.ID, edit *userContentEdit) error { + // find comment + target, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(ghTargetId)) if err != nil { return err } - - targetOpID, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(comment.Id)) - if err != nil && err != cache.ErrNoMatchingOp { + _, err = b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(edit.Id)) + if err == nil { + return nil + } + if err != cache.ErrNoMatchingOp { // real error return err } - firstEdit, hasEdit := getNextUserContentEdit(commentEditEvents) - if err == cache.ErrNoMatchingOp { - var textInput string - if hasEdit { - // use the first comment edit: it represents the comment creation itself - textInput = string(*firstEdit.Diff) - } else { - // if there are not comment edits, then the comment struct holds the comment creation - textInput = string(comment.Body) - } - cleanText, err := text.Cleanup(textInput) - if err != nil { - return err - } - // add comment operation - op, err := b.AddCommentRaw( - author, - comment.CreatedAt.Unix(), - cleanText, - nil, - map[string]string{ - metaKeyGithubId: parseId(comment.Id), - metaKeyGithubUrl: comment.Url.String(), - }, - ) - if err != nil { - return err - } + editor, err := gi.ensurePerson(ctx, repo, edit.Editor) + if err != nil { + return err + } - gi.out <- core.NewImportComment(op.Id()) - targetOpID = op.Id() + if edit.DeletedAt != nil { + // comment deletion, not supported yet + return nil } - if targetOpID == "" { - return fmt.Errorf("finding or creating issue comment") + + cleanText, err := text.Cleanup(string(*edit.Diff)) + if err != nil { + return err } - // process remaining comment edits, if they exist - for { - edit, hasEdit := getNextUserContentEdit(commentEditEvents) - if !hasEdit { - break - } - // ensure editor identity - _, err := gi.ensurePerson(ctx, repo, edit.Editor) - if err != nil { - return err - } - err = gi.ensureCommentEdit(ctx, repo, b, targetOpID, &edit.userContentEdit) - if err != nil { - return err - } + // comment edition + op, err := b.EditCommentRaw( + editor, + edit.CreatedAt.Unix(), + target, + cleanText, + map[string]string{ + metaKeyGithubId: parseId(edit.Id), + }, + ) + + if err != nil { + return err } + + gi.out <- core.NewImportCommentEdition(op.Id()) return nil } -func (gi *githubImporter) ensureCommentEdit(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit *userContentEdit) error { - _, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(edit.Id)) +func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment, firstEdit *userContentEdit) error { + author, err := gi.ensurePerson(ctx, repo, comment.Author) + if err != nil { + return err + } + + _, err = b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(comment.Id)) if err == nil { return nil } @@ -467,41 +442,35 @@ func (gi *githubImporter) ensureCommentEdit(ctx context.Context, repo *cache.Rep return err } - editor, err := gi.ensurePerson(ctx, repo, edit.Editor) + var textInput string + if firstEdit != nil { + // use the first comment edit: it represents the comment creation itself + textInput = string(*firstEdit.Diff) + } else { + // if there are not comment edits, then the comment struct holds the comment creation + textInput = string(comment.Body) + } + cleanText, err := text.Cleanup(textInput) if err != nil { return err } - switch { - case edit.DeletedAt != nil: - // comment deletion, not supported yet - return nil - - case edit.DeletedAt == nil: - - cleanText, err := text.Cleanup(string(*edit.Diff)) - if err != nil { - return err - } - - // comment edition - op, err := b.EditCommentRaw( - editor, - edit.CreatedAt.Unix(), - target, - cleanText, - map[string]string{ - metaKeyGithubId: parseId(edit.Id), - }, - ) - - if err != nil { - return err - } - - gi.out <- core.NewImportCommentEdition(op.Id()) - return nil + // add comment operation + op, err := b.AddCommentRaw( + author, + comment.CreatedAt.Unix(), + cleanText, + nil, + map[string]string{ + metaKeyGithubId: parseId(comment.Id), + metaKeyGithubUrl: comment.Url.String(), + }, + ) + if err != nil { + return err } + + gi.out <- core.NewImportComment(op.Id()) return nil } diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go index 7da62968..25d9c312 100644 --- a/bridge/github/import_mediator.go +++ b/bridge/github/import_mediator.go @@ -4,13 +4,13 @@ import ( "context" "fmt" "strings" - "sync" "time" "github.com/shurcooL/githubv4" ) -const ( // These values influence how fast the github graphql rate limit is exhausted. +const ( + // These values influence how fast the github graphql rate limit is exhausted. NUM_ISSUES = 40 NUM_ISSUE_EDITS = 100 NUM_TIMELINE_ITEMS = 100 @@ -34,76 +34,68 @@ type importMediator struct { // given date should be imported. since time.Time - // Issues is a channel holding bundles of Issues to be imported. Each issueEvent - // is either a message (type messageEvent) or a struct holding all the data associated with - // one issue (type issueData). - Issues chan issueEvent + // importEvents holds events representing issues, comments, edits, ... + // In this channel issues are immediately followed by their issue edits and comments are + // immediately followed by their comment edits. + importEvents chan ImportEvent // Sticky error err error - - // errMut is a mutex to synchronize access to the sticky error variable err. - errMut sync.Mutex } -type issueEvent interface { - isIssueEvent() -} -type timelineEvent interface { - isTimelineEvent() -} -type userContentEditEvent interface { - isUserContentEditEvent() +type ImportEvent interface { + isImportEvent() } -type messageEvent struct { +type MessageEvent struct { msg string } -func (messageEvent) isIssueEvent() {} -func (messageEvent) isUserContentEditEvent() {} -func (messageEvent) isTimelineEvent() {} +func (MessageEvent) isImportEvent() {} -type issueData struct { +type IssueEvent struct { issue - issueEdits <-chan userContentEditEvent - timelineItems <-chan timelineEvent } -func (issueData) isIssueEvent() {} +func (IssueEvent) isImportEvent() {} + +type IssueEditEvent struct { + issueId githubv4.ID + userContentEdit +} + +func (IssueEditEvent) isImportEvent() {} -type timelineData struct { +type TimelineEvent struct { + issueId githubv4.ID timelineItem - userContentEdits <-chan userContentEditEvent } -func (timelineData) isTimelineEvent() {} +func (TimelineEvent) isImportEvent() {} -type userContentEditData struct { +type CommentEditEvent struct { + commentId githubv4.ID userContentEdit } -// func (userContentEditData) isEvent() -func (userContentEditData) isUserContentEditEvent() {} +func (CommentEditEvent) isImportEvent() {} -func (mm *importMediator) setError(err error) { - mm.errMut.Lock() - mm.err = err - mm.errMut.Unlock() +func (mm *importMediator) NextImportEvent() ImportEvent { + return <-mm.importEvents } func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator { mm := importMediator{ - gc: client, - owner: owner, - project: project, - since: since, - Issues: make(chan issueEvent, CHAN_CAPACITY), - err: nil, + gc: client, + owner: owner, + project: project, + since: since, + importEvents: make(chan ImportEvent, CHAN_CAPACITY), + err: nil, } go func() { - mm.fillIssues(ctx) - close(mm.Issues) + mm.fillImportEvents(ctx) + close(mm.importEvents) }() return &mm } @@ -146,95 +138,42 @@ func newCommentEditVars() varmap { } func (mm *importMediator) Error() error { - mm.errMut.Lock() - err := mm.err - mm.errMut.Unlock() - return err + return mm.err } func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) { query := userQuery{} vars := varmap{"login": githubv4.String(loginName)} - // handle message events localy - channel := make(chan messageEvent) - defer close(channel) - // print all messages immediately - go func() { - for event := range channel { - fmt.Println(event.msg) - } - }() - if err := mm.mQuery(ctx, &query, vars, channel); err != nil { + if err := mm.mQuery(ctx, &query, vars); err != nil { return nil, err } return &query.User, nil } -func (mm *importMediator) fillIssues(ctx context.Context) { - // First: setup message handling while submitting GraphQL queries. - msgs := make(chan messageEvent) - defer close(msgs) - // forward all the messages to the issue channel. The message will be queued after - // all the issues which has been completed. - go func() { - for msg := range msgs { - select { - case <-ctx.Done(): - return - case mm.Issues <- msg: - } - } - }() - // start processing the actual issues +func (mm *importMediator) fillImportEvents(ctx context.Context) { initialCursor := githubv4.String("") - issues, hasIssues := mm.queryIssue(ctx, initialCursor, msgs) + issues, hasIssues := mm.queryIssue(ctx, initialCursor) for hasIssues { for _, node := range issues.Nodes { - // We need to send an issue-bundle over the issue channel before we start - // filling the issue edits and the timeline items to avoid deadlocks. - issueEditChan := make(chan userContentEditEvent, CHAN_CAPACITY) - timelineBundleChan := make(chan timelineEvent, CHAN_CAPACITY) select { case <-ctx.Done(): return - case mm.Issues <- issueData{node.issue, issueEditChan, timelineBundleChan}: + case mm.importEvents <- IssueEvent{node.issue}: } - // We do not know whether the client reads from the issue edit channel - // or the timeline channel first. Since the capacity of any channel is limited - // any send operation may block. Hence, in order to avoid deadlocks we need - // to send over both these channels concurrently. - go func(node issueNode) { - mm.fillIssueEdits(ctx, &node, issueEditChan) - close(issueEditChan) - }(node) - go func(node issueNode) { - mm.fillTimeline(ctx, &node, timelineBundleChan) - close(timelineBundleChan) - }(node) + // issue edit events follow the issue event + mm.fillIssueEditEvents(ctx, &node) + // last come the timeline events + mm.fillTimelineEvents(ctx, &node) } if !issues.PageInfo.HasNextPage { break } - issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor, msgs) + issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor) } } -func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNode, channel chan<- userContentEditEvent) { - // First: setup message handling while submitting GraphQL queries. - msgs := make(chan messageEvent) - defer close(msgs) - // forward all the messages to the issue-edit channel. The message will be queued after - // all the issue edits which have been completed. - go func() { - for msg := range msgs { - select { - case <-ctx.Done(): - return - case channel <- msg: - } - } - }() +func (mm *importMediator) fillIssueEditEvents(ctx context.Context, issueNode *issueNode) { edits := &issueNode.UserContentEdits hasEdits := true for hasEdits { @@ -248,87 +187,86 @@ func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNo select { case <-ctx.Done(): return - case channel <- userContentEditData{edit}: + case mm.importEvents <- IssueEditEvent{issueId: issueNode.issue.Id, userContentEdit: edit}: } } if !edits.PageInfo.HasPreviousPage { break } - edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor, msgs) + edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor) } } -func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan<- timelineEvent) { - // First: setup message handling while submitting GraphQL queries. - msgs := make(chan messageEvent) - defer close(msgs) - // forward all the messages to the timeline channel. The message will be queued after - // all the timeline items which have been completed. - go func() { - for msg := range msgs { - select { - case <-ctx.Done(): - return - case channel <- msg: - } - } - }() +func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) { + vars := newIssueEditVars() + vars["gqlNodeId"] = nid + if cursor == "" { + vars["issueEditBefore"] = (*githubv4.String)(nil) + } else { + vars["issueEditBefore"] = cursor + } + query := issueEditQuery{} + if err := mm.mQuery(ctx, &query, vars); err != nil { + mm.err = err + return nil, false + } + connection := &query.Node.Issue.UserContentEdits + if len(connection.Nodes) <= 0 { + return nil, false + } + return connection, true +} + +func (mm *importMediator) fillTimelineEvents(ctx context.Context, issueNode *issueNode) { items := &issueNode.TimelineItems hasItems := true for hasItems { for _, item := range items.Nodes { + select { + case <-ctx.Done(): + return + case mm.importEvents <- TimelineEvent{issueId: issueNode.issue.Id, timelineItem: item}: + } if item.Typename == "IssueComment" { // Issue comments are different than other timeline items in that // they may have associated user content edits. - // - // Send over the timeline-channel before starting to fill the comment - // edits. - commentEditChan := make(chan userContentEditEvent, CHAN_CAPACITY) - select { - case <-ctx.Done(): - return - case channel <- timelineData{item, commentEditChan}: - } - // We need to create a new goroutine for filling the comment edit - // channel. - go func(item timelineItem) { - mm.fillCommentEdits(ctx, &item, commentEditChan) - close(commentEditChan) - }(item) - } else { - select { - case <-ctx.Done(): - return - case channel <- timelineData{item, nil}: - } + // Right after the comment we send the comment edits. + mm.fillCommentEdits(ctx, &item) } } if !items.PageInfo.HasNextPage { break } - items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor, msgs) + items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor) + } +} + +func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) { + vars := newTimelineVars() + vars["gqlNodeId"] = nid + if cursor == "" { + vars["timelineAfter"] = (*githubv4.String)(nil) + } else { + vars["timelineAfter"] = cursor } + query := timelineQuery{} + if err := mm.mQuery(ctx, &query, vars); err != nil { + mm.err = err + return nil, false + } + connection := &query.Node.Issue.TimelineItems + if len(connection.Nodes) <= 0 { + return nil, false + } + return connection, true } -func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem, channel chan<- userContentEditEvent) { +func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem) { // Here we are only concerned with timeline items of type issueComment. if item.Typename != "IssueComment" { return } // First: setup message handling while submitting GraphQL queries. - msgs := make(chan messageEvent) - defer close(msgs) - // forward all the messages to the user content edit channel. The message will be queued after - // all the user content edits which have been completed already. - go func() { - for msg := range msgs { - select { - case <-ctx.Done(): - return - case channel <- msg: - } - } - }() comment := &item.IssueComment edits := &comment.UserContentEdits hasEdits := true @@ -343,17 +281,17 @@ func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineIt select { case <-ctx.Done(): return - case channel <- userContentEditData{edit}: + case mm.importEvents <- CommentEditEvent{commentId: comment.Id, userContentEdit: edit}: } } if !edits.PageInfo.HasPreviousPage { break } - edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor, msgs) + edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor) } } -func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*userContentEditConnection, bool) { +func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) { vars := newCommentEditVars() vars["gqlNodeId"] = nid if cursor == "" { @@ -362,8 +300,8 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID vars["commentEditBefore"] = cursor } query := commentEditQuery{} - if err := mm.mQuery(ctx, &query, vars, msgs); err != nil { - mm.setError(err) + if err := mm.mQuery(ctx, &query, vars); err != nil { + mm.err = err return nil, false } connection := &query.Node.IssueComment.UserContentEdits @@ -373,47 +311,7 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID return connection, true } -func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*timelineItemsConnection, bool) { - vars := newTimelineVars() - vars["gqlNodeId"] = nid - if cursor == "" { - vars["timelineAfter"] = (*githubv4.String)(nil) - } else { - vars["timelineAfter"] = cursor - } - query := timelineQuery{} - if err := mm.mQuery(ctx, &query, vars, msgs); err != nil { - mm.setError(err) - return nil, false - } - connection := &query.Node.Issue.TimelineItems - if len(connection.Nodes) <= 0 { - return nil, false - } - return connection, true -} - -func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*userContentEditConnection, bool) { - vars := newIssueEditVars() - vars["gqlNodeId"] = nid - if cursor == "" { - vars["issueEditBefore"] = (*githubv4.String)(nil) - } else { - vars["issueEditBefore"] = cursor - } - query := issueEditQuery{} - if err := mm.mQuery(ctx, &query, vars, msgs); err != nil { - mm.setError(err) - return nil, false - } - connection := &query.Node.Issue.UserContentEdits - if len(connection.Nodes) <= 0 { - return nil, false - } - return connection, true -} - -func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String, msgs chan<- messageEvent) (*issueConnection, bool) { +func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) { vars := newIssueVars(mm.owner, mm.project, mm.since) if cursor == "" { vars["issueAfter"] = (*githubv4.String)(nil) @@ -421,8 +319,8 @@ func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String vars["issueAfter"] = githubv4.String(cursor) } query := issueQuery{} - if err := mm.mQuery(ctx, &query, vars, msgs); err != nil { - mm.setError(err) + if err := mm.mQuery(ctx, &query, vars); err != nil { + mm.err = err return nil, false } connection := &query.Repository.Issues @@ -443,29 +341,26 @@ func reverse(eds []userContentEdit) chan userContentEdit { return ret } -type rateLimiter interface { - rateLimit() rateLimit -} - // mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL query // and it is used to populate the response into it. It should be a pointer to a struct that // corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If // there is a Github rate limiting error, then the function sleeps and retries after the rate limit // is expired. If there is another error, then the method will retry before giving up. -func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}, msgs chan<- messageEvent) error { - if err := mm.queryOnce(ctx, query, vars, msgs); err == nil { +func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error { + if err := mm.queryOnce(ctx, query, vars); err == nil { // success: done return nil } // failure: we will retry - // To retry is important for importing projects with a big number of issues. + // To retry is important for importing projects with a big number of issues, because + // there may be temporary network errors or momentary internal errors of the github servers. retries := 3 var err error for i := 0; i < retries; i++ { // wait a few seconds before retry sleepTime := 8 * (i + 1) time.Sleep(time.Duration(sleepTime) * time.Second) - err = mm.queryOnce(ctx, query, vars, msgs) + err = mm.queryOnce(ctx, query, vars) if err == nil { // success: done return nil @@ -474,7 +369,7 @@ func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars ma return err } -func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}, msgs chan<- messageEvent) error { +func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}) error { // first: just send the query to the graphql api vars["dryRun"] = githubv4.Boolean(false) qctx, cancel := context.WithTimeout(ctx, defaultTimeout) @@ -507,7 +402,7 @@ func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars select { case <-ctx.Done(): return ctx.Err() - case msgs <- messageEvent{msg}: + case mm.importEvents <- MessageEvent{msg}: } timer := time.NewTimer(time.Until(resetTime)) select { diff --git a/bridge/github/import_query.go b/bridge/github/import_query.go index c4ab2aa9..461daf94 100644 --- a/bridge/github/import_query.go +++ b/bridge/github/import_query.go @@ -2,6 +2,19 @@ package github import "github.com/shurcooL/githubv4" +type rateLimit struct { + Cost githubv4.Int + Limit githubv4.Int + NodeCount githubv4.Int + Remaining githubv4.Int + ResetAt githubv4.DateTime + Used githubv4.Int +} + +type rateLimiter interface { + rateLimit() rateLimit +} + type userQuery struct { RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"` User user `graphql:"user(login: $login)"` @@ -211,12 +224,3 @@ type pageInfo struct { StartCursor githubv4.String HasPreviousPage bool } - -type rateLimit struct { - Cost githubv4.Int - Limit githubv4.Int - NodeCount githubv4.Int - Remaining githubv4.Int - ResetAt githubv4.DateTime - Used githubv4.Int -} -- cgit