aboutsummaryrefslogtreecommitdiffstats
path: root/bridge/github
diff options
context:
space:
mode:
authorAlexander Scharinger <rng.dynamics@gmail.com>2021-03-22 19:26:59 +0100
committerAlexander Scharinger <rng.dynamics@gmail.com>2021-03-22 19:26:59 +0100
commit2646c63213cb4d1fa04e1b61051f4ac97c1978f0 (patch)
tree59506a5619ea4efee511a64096679baadb59ed7f /bridge/github
parent52fba350d6d127d5c50aca34aabcca1ef0d26d75 (diff)
downloadgit-bug-2646c63213cb4d1fa04e1b61051f4ac97c1978f0.tar.gz
Github bridge: Refactor
Diffstat (limited to 'bridge/github')
-rw-r--r--bridge/github/import.go385
-rw-r--r--bridge/github/import_mediator.go331
-rw-r--r--bridge/github/import_query.go22
3 files changed, 303 insertions, 435 deletions
diff --git a/bridge/github/import.go b/bridge/github/import.go
index 5337c474..ceb35ef0 100644
--- a/bridge/github/import.go
+++ b/bridge/github/import.go
@@ -54,64 +54,80 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache,
go func() {
defer close(gi.out)
-
- // Loop over all matching issues
- for event := range gi.mediator.Issues {
- var issue issue
- var issueEdits <-chan userContentEditEvent
- var timelineItems <-chan timelineEvent
- switch e := event.(type) {
- case messageEvent:
- fmt.Println(e.msg)
- continue
- case issueData:
- issue = e.issue
- issueEdits = e.issueEdits
- timelineItems = e.timelineItems
- default:
- panic(fmt.Sprint("Unknown event type"))
+ var currBug *cache.BugCache
+ var currEvent ImportEvent
+ var nextEvent ImportEvent
+ var err error
+ for {
+ // We need the current event and one look ahead event.
+ currEvent = nextEvent
+ if currEvent == nil {
+ currEvent = gi.mediator.NextImportEvent()
}
- // create issue
- b, err := gi.ensureIssue(ctx, repo, &issue, issueEdits)
- if err != nil {
- err := fmt.Errorf("issue creation: %v", err)
- out <- core.NewImportError(err, "")
- return
+ if currEvent == nil {
+ break
}
-
- // loop over timeline items
- for event := range timelineItems {
- var item timelineItem
- var edits <-chan userContentEditEvent
- switch e := event.(type) {
- case messageEvent:
- fmt.Println(e.msg)
- continue
- case timelineData:
- item = e.timelineItem
- edits = e.userContentEdits
+ nextEvent = gi.mediator.NextImportEvent()
+
+ switch event := currEvent.(type) {
+ case MessageEvent:
+ fmt.Println(event.msg)
+ case IssueEvent:
+ // first: commit what is being held in currBug
+ if err = gi.commit(currBug, out); err != nil {
+ out <- core.NewImportError(err, "")
+ return
+ }
+ // second: create new issue
+ switch next := nextEvent.(type) {
+ case IssueEditEvent:
+ // consuming and using next event
+ nextEvent = nil
+ currBug, err = gi.ensureIssue(ctx, repo, &event.issue, &next.userContentEdit)
default:
- panic(fmt.Sprint("Unknown event type"))
+ currBug, err = gi.ensureIssue(ctx, repo, &event.issue, nil)
+ }
+ if err != nil {
+ err := fmt.Errorf("issue creation: %v", err)
+ out <- core.NewImportError(err, "")
+ return
+ }
+ case IssueEditEvent:
+ err = gi.ensureIssueEdit(ctx, repo, currBug, event.issueId, &event.userContentEdit)
+ if err != nil {
+ err = fmt.Errorf("issue edit: %v", err)
+ out <- core.NewImportError(err, "")
+ return
+ }
+ case TimelineEvent:
+ if next, ok := nextEvent.(CommentEditEvent); ok && event.Typename == "IssueComment" {
+ // consuming and using next event
+ nextEvent = nil
+ err = gi.ensureComment(ctx, repo, currBug, &event.timelineItem.IssueComment, &next.userContentEdit)
+ } else {
+ err = gi.ensureTimelineItem(ctx, repo, currBug, &event.timelineItem)
}
- err := gi.ensureTimelineItem(ctx, repo, b, &item, edits)
if err != nil {
err = fmt.Errorf("timeline item creation: %v", err)
out <- core.NewImportError(err, "")
return
}
- }
-
- if !b.NeedCommit() {
- out <- core.NewImportNothing(b.Id(), "no imported operation")
- } else if err := b.Commit(); err != nil {
- // commit bug state
- err = fmt.Errorf("bug commit: %v", err)
- out <- core.NewImportError(err, "")
- return
+ case CommentEditEvent:
+ err = gi.ensureCommentEdit(ctx, repo, currBug, event.commentId, &event.userContentEdit)
+ if err != nil {
+ err = fmt.Errorf("comment edit: %v", err)
+ out <- core.NewImportError(err, "")
+ return
+ }
+ default:
+ panic("Unknown event type")
}
}
-
- if err := gi.mediator.Error(); err != nil {
+ // commit what is being held in currBug before returning
+ if err = gi.commit(currBug, out); err != nil {
+ out <- core.NewImportError(err, "")
+ }
+ if err = gi.mediator.Error(); err != nil {
gi.out <- core.NewImportError(err, "")
}
}()
@@ -119,27 +135,21 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache,
return out, nil
}
-// getNextUserContentEdit reads the input channel, handles messages, and returns the next
-// userContentEditData.
-func getNextUserContentEdit(in <-chan userContentEditEvent) (*userContentEditData, bool) {
- for {
- event, hasEvent := <-in
- if !hasEvent {
- return nil, false
- }
- switch e := event.(type) {
- case messageEvent:
- fmt.Println(e.msg)
- continue
- case userContentEditData:
- return &e, true
- default:
- panic(fmt.Sprint("Unknown event type"))
- }
+func (gi *githubImporter) commit(b *cache.BugCache, out chan<- core.ImportResult) error {
+ if b == nil {
+ return nil
}
+ if !b.NeedCommit() {
+ out <- core.NewImportNothing(b.Id(), "no imported operation")
+ return nil
+ } else if err := b.Commit(); err != nil {
+ // commit bug state
+ return fmt.Errorf("bug commit: %v", err)
+ }
+ return nil
}
-func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue, issueEditEvents <-chan userContentEditEvent) (*cache.BugCache, error) {
+func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue, issueEdit *userContentEdit) (*cache.BugCache, error) {
author, err := gi.ensurePerson(ctx, repo, issue.Author)
if err != nil {
return nil, err
@@ -150,14 +160,13 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache
return excerpt.CreateMetadata[core.MetaKeyOrigin] == target &&
excerpt.CreateMetadata[metaKeyGithubId] == parseId(issue.Id)
})
- if err != nil && err != bug.ErrBugNotExist {
+ if err == nil {
+ return b, nil
+ }
+ if err != bug.ErrBugNotExist {
return nil, err
}
- // get first issue edit
- // if it exists, then it holds the bug creation
- firstEdit, hasEdit := getNextUserContentEdit(issueEditEvents)
-
// At Github there exist issues with seemingly empty titles. An example is
// https://github.com/NixOS/nixpkgs/issues/72730 .
// The title provided by the GraphQL API actually consists of a space followed by a
@@ -168,70 +177,49 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache
title = EMPTY_TITLE_PLACEHOLDER
}
- if err == bug.ErrBugNotExist {
- var textInput string
- if hasEdit {
- // use the first issue edit: it represents the bug creation itself
- textInput = string(*firstEdit.Diff)
- } else {
- // if there are no issue edits then the issue struct holds the bug creation
- textInput = string(issue.Body)
- }
- cleanText, err := text.Cleanup(textInput)
- if err != nil {
- return nil, err
- }
- // create bug
- b, _, err = repo.NewBugRaw(
- author,
- issue.CreatedAt.Unix(),
- title, // TODO: this is the *current* title, not the original one
- cleanText,
- nil,
- map[string]string{
- core.MetaKeyOrigin: target,
- metaKeyGithubId: parseId(issue.Id),
- metaKeyGithubUrl: issue.Url.String(),
- })
- if err != nil {
- return nil, err
- }
- // importing a new bug
- gi.out <- core.NewImportBug(b.Id())
+ var textInput string
+ if issueEdit != nil {
+ // use the first issue edit: it represents the bug creation itself
+ textInput = string(*issueEdit.Diff)
+ } else {
+ // if there are no issue edits then the issue struct holds the bug creation
+ textInput = string(issue.Body)
}
- if b == nil {
- return nil, fmt.Errorf("finding or creating issue")
+ cleanText, err := text.Cleanup(textInput)
+ if err != nil {
+ return nil, err
}
- // process remaining issue edits, if they exist
- for {
- edit, hasEdit := getNextUserContentEdit(issueEditEvents)
- if !hasEdit {
- break
- }
- // other edits will be added as CommentEdit operations
- target, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(issue.Id))
- if err == cache.ErrNoMatchingOp {
- // original comment is missing somehow, issuing a warning
- gi.out <- core.NewImportWarning(fmt.Errorf("comment ID %s to edit is missing", parseId(issue.Id)), b.Id())
- continue
- }
- if err != nil {
- return nil, err
- }
- err = gi.ensureCommentEdit(ctx, repo, b, target, &edit.userContentEdit)
- if err != nil {
- return nil, err
- }
+ // create bug
+ b, _, err = repo.NewBugRaw(
+ author,
+ issue.CreatedAt.Unix(),
+ title, // TODO: this is the *current* title, not the original one
+ cleanText,
+ nil,
+ map[string]string{
+ core.MetaKeyOrigin: target,
+ metaKeyGithubId: parseId(issue.Id),
+ metaKeyGithubUrl: issue.Url.String(),
+ })
+ if err != nil {
+ return nil, err
}
+ // importing a new bug
+ gi.out <- core.NewImportBug(b.Id())
+
return b, nil
}
-func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item *timelineItem, commentEdits <-chan userContentEditEvent) error {
+func (gi *githubImporter) ensureIssueEdit(ctx context.Context, repo *cache.RepoCache, bug *cache.BugCache, ghIssueId githubv4.ID, edit *userContentEdit) error {
+ return gi.ensureCommentEdit(ctx, repo, bug, ghIssueId, edit)
+}
+
+func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item *timelineItem) error {
switch item.Typename {
case "IssueComment":
- err := gi.ensureComment(ctx, repo, b, &item.IssueComment, commentEdits)
+ err := gi.ensureComment(ctx, repo, b, &item.IssueComment, nil)
if err != nil {
return fmt.Errorf("timeline comment creation: %v", err)
}
@@ -390,75 +378,62 @@ func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.Re
return nil
}
-func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment, commentEditEvents <-chan userContentEditEvent) error {
- author, err := gi.ensurePerson(ctx, repo, comment.Author)
+func (gi *githubImporter) ensureCommentEdit(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, ghTargetId githubv4.ID, edit *userContentEdit) error {
+ // find comment
+ target, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(ghTargetId))
if err != nil {
return err
}
-
- targetOpID, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(comment.Id))
- if err != nil && err != cache.ErrNoMatchingOp {
+ _, err = b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(edit.Id))
+ if err == nil {
+ return nil
+ }
+ if err != cache.ErrNoMatchingOp {
// real error
return err
}
- firstEdit, hasEdit := getNextUserContentEdit(commentEditEvents)
- if err == cache.ErrNoMatchingOp {
- var textInput string
- if hasEdit {
- // use the first comment edit: it represents the comment creation itself
- textInput = string(*firstEdit.Diff)
- } else {
- // if there are not comment edits, then the comment struct holds the comment creation
- textInput = string(comment.Body)
- }
- cleanText, err := text.Cleanup(textInput)
- if err != nil {
- return err
- }
- // add comment operation
- op, err := b.AddCommentRaw(
- author,
- comment.CreatedAt.Unix(),
- cleanText,
- nil,
- map[string]string{
- metaKeyGithubId: parseId(comment.Id),
- metaKeyGithubUrl: comment.Url.String(),
- },
- )
- if err != nil {
- return err
- }
+ editor, err := gi.ensurePerson(ctx, repo, edit.Editor)
+ if err != nil {
+ return err
+ }
- gi.out <- core.NewImportComment(op.Id())
- targetOpID = op.Id()
+ if edit.DeletedAt != nil {
+ // comment deletion, not supported yet
+ return nil
}
- if targetOpID == "" {
- return fmt.Errorf("finding or creating issue comment")
+
+ cleanText, err := text.Cleanup(string(*edit.Diff))
+ if err != nil {
+ return err
}
- // process remaining comment edits, if they exist
- for {
- edit, hasEdit := getNextUserContentEdit(commentEditEvents)
- if !hasEdit {
- break
- }
- // ensure editor identity
- _, err := gi.ensurePerson(ctx, repo, edit.Editor)
- if err != nil {
- return err
- }
- err = gi.ensureCommentEdit(ctx, repo, b, targetOpID, &edit.userContentEdit)
- if err != nil {
- return err
- }
+ // comment edition
+ op, err := b.EditCommentRaw(
+ editor,
+ edit.CreatedAt.Unix(),
+ target,
+ cleanText,
+ map[string]string{
+ metaKeyGithubId: parseId(edit.Id),
+ },
+ )
+
+ if err != nil {
+ return err
}
+
+ gi.out <- core.NewImportCommentEdition(op.Id())
return nil
}
-func (gi *githubImporter) ensureCommentEdit(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit *userContentEdit) error {
- _, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(edit.Id))
+func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment, firstEdit *userContentEdit) error {
+ author, err := gi.ensurePerson(ctx, repo, comment.Author)
+ if err != nil {
+ return err
+ }
+
+ _, err = b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(comment.Id))
if err == nil {
return nil
}
@@ -467,41 +442,35 @@ func (gi *githubImporter) ensureCommentEdit(ctx context.Context, repo *cache.Rep
return err
}
- editor, err := gi.ensurePerson(ctx, repo, edit.Editor)
+ var textInput string
+ if firstEdit != nil {
+ // use the first comment edit: it represents the comment creation itself
+ textInput = string(*firstEdit.Diff)
+ } else {
+ // if there are not comment edits, then the comment struct holds the comment creation
+ textInput = string(comment.Body)
+ }
+ cleanText, err := text.Cleanup(textInput)
if err != nil {
return err
}
- switch {
- case edit.DeletedAt != nil:
- // comment deletion, not supported yet
- return nil
-
- case edit.DeletedAt == nil:
-
- cleanText, err := text.Cleanup(string(*edit.Diff))
- if err != nil {
- return err
- }
-
- // comment edition
- op, err := b.EditCommentRaw(
- editor,
- edit.CreatedAt.Unix(),
- target,
- cleanText,
- map[string]string{
- metaKeyGithubId: parseId(edit.Id),
- },
- )
-
- if err != nil {
- return err
- }
-
- gi.out <- core.NewImportCommentEdition(op.Id())
- return nil
+ // add comment operation
+ op, err := b.AddCommentRaw(
+ author,
+ comment.CreatedAt.Unix(),
+ cleanText,
+ nil,
+ map[string]string{
+ metaKeyGithubId: parseId(comment.Id),
+ metaKeyGithubUrl: comment.Url.String(),
+ },
+ )
+ if err != nil {
+ return err
}
+
+ gi.out <- core.NewImportComment(op.Id())
return nil
}
diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go
index 7da62968..25d9c312 100644
--- a/bridge/github/import_mediator.go
+++ b/bridge/github/import_mediator.go
@@ -4,13 +4,13 @@ import (
"context"
"fmt"
"strings"
- "sync"
"time"
"github.com/shurcooL/githubv4"
)
-const ( // These values influence how fast the github graphql rate limit is exhausted.
+const (
+ // These values influence how fast the github graphql rate limit is exhausted.
NUM_ISSUES = 40
NUM_ISSUE_EDITS = 100
NUM_TIMELINE_ITEMS = 100
@@ -34,76 +34,68 @@ type importMediator struct {
// given date should be imported.
since time.Time
- // Issues is a channel holding bundles of Issues to be imported. Each issueEvent
- // is either a message (type messageEvent) or a struct holding all the data associated with
- // one issue (type issueData).
- Issues chan issueEvent
+ // importEvents holds events representing issues, comments, edits, ...
+ // In this channel issues are immediately followed by their issue edits and comments are
+ // immediately followed by their comment edits.
+ importEvents chan ImportEvent
// Sticky error
err error
-
- // errMut is a mutex to synchronize access to the sticky error variable err.
- errMut sync.Mutex
}
-type issueEvent interface {
- isIssueEvent()
-}
-type timelineEvent interface {
- isTimelineEvent()
-}
-type userContentEditEvent interface {
- isUserContentEditEvent()
+type ImportEvent interface {
+ isImportEvent()
}
-type messageEvent struct {
+type MessageEvent struct {
msg string
}
-func (messageEvent) isIssueEvent() {}
-func (messageEvent) isUserContentEditEvent() {}
-func (messageEvent) isTimelineEvent() {}
+func (MessageEvent) isImportEvent() {}
-type issueData struct {
+type IssueEvent struct {
issue
- issueEdits <-chan userContentEditEvent
- timelineItems <-chan timelineEvent
}
-func (issueData) isIssueEvent() {}
+func (IssueEvent) isImportEvent() {}
+
+type IssueEditEvent struct {
+ issueId githubv4.ID
+ userContentEdit
+}
+
+func (IssueEditEvent) isImportEvent() {}
-type timelineData struct {
+type TimelineEvent struct {
+ issueId githubv4.ID
timelineItem
- userContentEdits <-chan userContentEditEvent
}
-func (timelineData) isTimelineEvent() {}
+func (TimelineEvent) isImportEvent() {}
-type userContentEditData struct {
+type CommentEditEvent struct {
+ commentId githubv4.ID
userContentEdit
}
-// func (userContentEditData) isEvent()
-func (userContentEditData) isUserContentEditEvent() {}
+func (CommentEditEvent) isImportEvent() {}
-func (mm *importMediator) setError(err error) {
- mm.errMut.Lock()
- mm.err = err
- mm.errMut.Unlock()
+func (mm *importMediator) NextImportEvent() ImportEvent {
+ return <-mm.importEvents
}
func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator {
mm := importMediator{
- gc: client,
- owner: owner,
- project: project,
- since: since,
- Issues: make(chan issueEvent, CHAN_CAPACITY),
- err: nil,
+ gc: client,
+ owner: owner,
+ project: project,
+ since: since,
+ importEvents: make(chan ImportEvent, CHAN_CAPACITY),
+ err: nil,
}
go func() {
- mm.fillIssues(ctx)
- close(mm.Issues)
+ mm.fillImportEvents(ctx)
+ close(mm.importEvents)
}()
return &mm
}
@@ -146,95 +138,42 @@ func newCommentEditVars() varmap {
}
func (mm *importMediator) Error() error {
- mm.errMut.Lock()
- err := mm.err
- mm.errMut.Unlock()
- return err
+ return mm.err
}
func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) {
query := userQuery{}
vars := varmap{"login": githubv4.String(loginName)}
- // handle message events localy
- channel := make(chan messageEvent)
- defer close(channel)
- // print all messages immediately
- go func() {
- for event := range channel {
- fmt.Println(event.msg)
- }
- }()
- if err := mm.mQuery(ctx, &query, vars, channel); err != nil {
+ if err := mm.mQuery(ctx, &query, vars); err != nil {
return nil, err
}
return &query.User, nil
}
-func (mm *importMediator) fillIssues(ctx context.Context) {
- // First: setup message handling while submitting GraphQL queries.
- msgs := make(chan messageEvent)
- defer close(msgs)
- // forward all the messages to the issue channel. The message will be queued after
- // all the issues which has been completed.
- go func() {
- for msg := range msgs {
- select {
- case <-ctx.Done():
- return
- case mm.Issues <- msg:
- }
- }
- }()
- // start processing the actual issues
+func (mm *importMediator) fillImportEvents(ctx context.Context) {
initialCursor := githubv4.String("")
- issues, hasIssues := mm.queryIssue(ctx, initialCursor, msgs)
+ issues, hasIssues := mm.queryIssue(ctx, initialCursor)
for hasIssues {
for _, node := range issues.Nodes {
- // We need to send an issue-bundle over the issue channel before we start
- // filling the issue edits and the timeline items to avoid deadlocks.
- issueEditChan := make(chan userContentEditEvent, CHAN_CAPACITY)
- timelineBundleChan := make(chan timelineEvent, CHAN_CAPACITY)
select {
case <-ctx.Done():
return
- case mm.Issues <- issueData{node.issue, issueEditChan, timelineBundleChan}:
+ case mm.importEvents <- IssueEvent{node.issue}:
}
- // We do not know whether the client reads from the issue edit channel
- // or the timeline channel first. Since the capacity of any channel is limited
- // any send operation may block. Hence, in order to avoid deadlocks we need
- // to send over both these channels concurrently.
- go func(node issueNode) {
- mm.fillIssueEdits(ctx, &node, issueEditChan)
- close(issueEditChan)
- }(node)
- go func(node issueNode) {
- mm.fillTimeline(ctx, &node, timelineBundleChan)
- close(timelineBundleChan)
- }(node)
+ // issue edit events follow the issue event
+ mm.fillIssueEditEvents(ctx, &node)
+ // last come the timeline events
+ mm.fillTimelineEvents(ctx, &node)
}
if !issues.PageInfo.HasNextPage {
break
}
- issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor, msgs)
+ issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor)
}
}
-func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNode, channel chan<- userContentEditEvent) {
- // First: setup message handling while submitting GraphQL queries.
- msgs := make(chan messageEvent)
- defer close(msgs)
- // forward all the messages to the issue-edit channel. The message will be queued after
- // all the issue edits which have been completed.
- go func() {
- for msg := range msgs {
- select {
- case <-ctx.Done():
- return
- case channel <- msg:
- }
- }
- }()
+func (mm *importMediator) fillIssueEditEvents(ctx context.Context, issueNode *issueNode) {
edits := &issueNode.UserContentEdits
hasEdits := true
for hasEdits {
@@ -248,87 +187,86 @@ func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNo
select {
case <-ctx.Done():
return
- case channel <- userContentEditData{edit}:
+ case mm.importEvents <- IssueEditEvent{issueId: issueNode.issue.Id, userContentEdit: edit}:
}
}
if !edits.PageInfo.HasPreviousPage {
break
}
- edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor, msgs)
+ edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor)
}
}
-func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan<- timelineEvent) {
- // First: setup message handling while submitting GraphQL queries.
- msgs := make(chan messageEvent)
- defer close(msgs)
- // forward all the messages to the timeline channel. The message will be queued after
- // all the timeline items which have been completed.
- go func() {
- for msg := range msgs {
- select {
- case <-ctx.Done():
- return
- case channel <- msg:
- }
- }
- }()
+func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
+ vars := newIssueEditVars()
+ vars["gqlNodeId"] = nid
+ if cursor == "" {
+ vars["issueEditBefore"] = (*githubv4.String)(nil)
+ } else {
+ vars["issueEditBefore"] = cursor
+ }
+ query := issueEditQuery{}
+ if err := mm.mQuery(ctx, &query, vars); err != nil {
+ mm.err = err
+ return nil, false
+ }
+ connection := &query.Node.Issue.UserContentEdits
+ if len(connection.Nodes) <= 0 {
+ return nil, false
+ }
+ return connection, true
+}
+
+func (mm *importMediator) fillTimelineEvents(ctx context.Context, issueNode *issueNode) {
items := &issueNode.TimelineItems
hasItems := true
for hasItems {
for _, item := range items.Nodes {
+ select {
+ case <-ctx.Done():
+ return
+ case mm.importEvents <- TimelineEvent{issueId: issueNode.issue.Id, timelineItem: item}:
+ }
if item.Typename == "IssueComment" {
// Issue comments are different than other timeline items in that
// they may have associated user content edits.
- //
- // Send over the timeline-channel before starting to fill the comment
- // edits.
- commentEditChan := make(chan userContentEditEvent, CHAN_CAPACITY)
- select {
- case <-ctx.Done():
- return
- case channel <- timelineData{item, commentEditChan}:
- }
- // We need to create a new goroutine for filling the comment edit
- // channel.
- go func(item timelineItem) {
- mm.fillCommentEdits(ctx, &item, commentEditChan)
- close(commentEditChan)
- }(item)
- } else {
- select {
- case <-ctx.Done():
- return
- case channel <- timelineData{item, nil}:
- }
+ // Right after the comment we send the comment edits.
+ mm.fillCommentEdits(ctx, &item)
}
}
if !items.PageInfo.HasNextPage {
break
}
- items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor, msgs)
+ items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor)
+ }
+}
+
+func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) {
+ vars := newTimelineVars()
+ vars["gqlNodeId"] = nid
+ if cursor == "" {
+ vars["timelineAfter"] = (*githubv4.String)(nil)
+ } else {
+ vars["timelineAfter"] = cursor
}
+ query := timelineQuery{}
+ if err := mm.mQuery(ctx, &query, vars); err != nil {
+ mm.err = err
+ return nil, false
+ }
+ connection := &query.Node.Issue.TimelineItems
+ if len(connection.Nodes) <= 0 {
+ return nil, false
+ }
+ return connection, true
}
-func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem, channel chan<- userContentEditEvent) {
+func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem) {
// Here we are only concerned with timeline items of type issueComment.
if item.Typename != "IssueComment" {
return
}
// First: setup message handling while submitting GraphQL queries.
- msgs := make(chan messageEvent)
- defer close(msgs)
- // forward all the messages to the user content edit channel. The message will be queued after
- // all the user content edits which have been completed already.
- go func() {
- for msg := range msgs {
- select {
- case <-ctx.Done():
- return
- case channel <- msg:
- }
- }
- }()
comment := &item.IssueComment
edits := &comment.UserContentEdits
hasEdits := true
@@ -343,17 +281,17 @@ func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineIt
select {
case <-ctx.Done():
return
- case channel <- userContentEditData{edit}:
+ case mm.importEvents <- CommentEditEvent{commentId: comment.Id, userContentEdit: edit}:
}
}
if !edits.PageInfo.HasPreviousPage {
break
}
- edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor, msgs)
+ edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor)
}
}
-func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*userContentEditConnection, bool) {
+func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
vars := newCommentEditVars()
vars["gqlNodeId"] = nid
if cursor == "" {
@@ -362,8 +300,8 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID
vars["commentEditBefore"] = cursor
}
query := commentEditQuery{}
- if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
- mm.setError(err)
+ if err := mm.mQuery(ctx, &query, vars); err != nil {
+ mm.err = err
return nil, false
}
connection := &query.Node.IssueComment.UserContentEdits
@@ -373,47 +311,7 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID
return connection, true
}
-func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*timelineItemsConnection, bool) {
- vars := newTimelineVars()
- vars["gqlNodeId"] = nid
- if cursor == "" {
- vars["timelineAfter"] = (*githubv4.String)(nil)
- } else {
- vars["timelineAfter"] = cursor
- }
- query := timelineQuery{}
- if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
- mm.setError(err)
- return nil, false
- }
- connection := &query.Node.Issue.TimelineItems
- if len(connection.Nodes) <= 0 {
- return nil, false
- }
- return connection, true
-}
-
-func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*userContentEditConnection, bool) {
- vars := newIssueEditVars()
- vars["gqlNodeId"] = nid
- if cursor == "" {
- vars["issueEditBefore"] = (*githubv4.String)(nil)
- } else {
- vars["issueEditBefore"] = cursor
- }
- query := issueEditQuery{}
- if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
- mm.setError(err)
- return nil, false
- }
- connection := &query.Node.Issue.UserContentEdits
- if len(connection.Nodes) <= 0 {
- return nil, false
- }
- return connection, true
-}
-
-func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String, msgs chan<- messageEvent) (*issueConnection, bool) {
+func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) {
vars := newIssueVars(mm.owner, mm.project, mm.since)
if cursor == "" {
vars["issueAfter"] = (*githubv4.String)(nil)
@@ -421,8 +319,8 @@ func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String
vars["issueAfter"] = githubv4.String(cursor)
}
query := issueQuery{}
- if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
- mm.setError(err)
+ if err := mm.mQuery(ctx, &query, vars); err != nil {
+ mm.err = err
return nil, false
}
connection := &query.Repository.Issues
@@ -443,29 +341,26 @@ func reverse(eds []userContentEdit) chan userContentEdit {
return ret
}
-type rateLimiter interface {
- rateLimit() rateLimit
-}
-
// mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL query
// and it is used to populate the response into it. It should be a pointer to a struct that
// corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If
// there is a Github rate limiting error, then the function sleeps and retries after the rate limit
// is expired. If there is another error, then the method will retry before giving up.
-func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}, msgs chan<- messageEvent) error {
- if err := mm.queryOnce(ctx, query, vars, msgs); err == nil {
+func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
+ if err := mm.queryOnce(ctx, query, vars); err == nil {
// success: done
return nil
}
// failure: we will retry
- // To retry is important for importing projects with a big number of issues.
+ // To retry is important for importing projects with a big number of issues, because
+ // there may be temporary network errors or momentary internal errors of the github servers.
retries := 3
var err error
for i := 0; i < retries; i++ {
// wait a few seconds before retry
sleepTime := 8 * (i + 1)
time.Sleep(time.Duration(sleepTime) * time.Second)
- err = mm.queryOnce(ctx, query, vars, msgs)
+ err = mm.queryOnce(ctx, query, vars)
if err == nil {
// success: done
return nil
@@ -474,7 +369,7 @@ func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars ma
return err
}
-func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}, msgs chan<- messageEvent) error {
+func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
// first: just send the query to the graphql api
vars["dryRun"] = githubv4.Boolean(false)
qctx, cancel := context.WithTimeout(ctx, defaultTimeout)
@@ -507,7 +402,7 @@ func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars
select {
case <-ctx.Done():
return ctx.Err()
- case msgs <- messageEvent{msg}:
+ case mm.importEvents <- MessageEvent{msg}:
}
timer := time.NewTimer(time.Until(resetTime))
select {
diff --git a/bridge/github/import_query.go b/bridge/github/import_query.go
index c4ab2aa9..461daf94 100644
--- a/bridge/github/import_query.go
+++ b/bridge/github/import_query.go
@@ -2,6 +2,19 @@ package github
import "github.com/shurcooL/githubv4"
+type rateLimit struct {
+ Cost githubv4.Int
+ Limit githubv4.Int
+ NodeCount githubv4.Int
+ Remaining githubv4.Int
+ ResetAt githubv4.DateTime
+ Used githubv4.Int
+}
+
+type rateLimiter interface {
+ rateLimit() rateLimit
+}
+
type userQuery struct {
RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"`
User user `graphql:"user(login: $login)"`
@@ -211,12 +224,3 @@ type pageInfo struct {
StartCursor githubv4.String
HasPreviousPage bool
}
-
-type rateLimit struct {
- Cost githubv4.Int
- Limit githubv4.Int
- NodeCount githubv4.Int
- Remaining githubv4.Int
- ResetAt githubv4.DateTime
- Used githubv4.Int
-}