diff options
Diffstat (limited to 'bridge/github/import_mediator.go')
-rw-r--r-- | bridge/github/import_mediator.go | 331 |
1 files changed, 113 insertions, 218 deletions
diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go index 7da62968..25d9c312 100644 --- a/bridge/github/import_mediator.go +++ b/bridge/github/import_mediator.go @@ -4,13 +4,13 @@ import ( "context" "fmt" "strings" - "sync" "time" "github.com/shurcooL/githubv4" ) -const ( // These values influence how fast the github graphql rate limit is exhausted. +const ( + // These values influence how fast the github graphql rate limit is exhausted. NUM_ISSUES = 40 NUM_ISSUE_EDITS = 100 NUM_TIMELINE_ITEMS = 100 @@ -34,76 +34,68 @@ type importMediator struct { // given date should be imported. since time.Time - // Issues is a channel holding bundles of Issues to be imported. Each issueEvent - // is either a message (type messageEvent) or a struct holding all the data associated with - // one issue (type issueData). - Issues chan issueEvent + // importEvents holds events representing issues, comments, edits, ... + // In this channel issues are immediately followed by their issue edits and comments are + // immediately followed by their comment edits. + importEvents chan ImportEvent // Sticky error err error - - // errMut is a mutex to synchronize access to the sticky error variable err. - errMut sync.Mutex } -type issueEvent interface { - isIssueEvent() -} -type timelineEvent interface { - isTimelineEvent() -} -type userContentEditEvent interface { - isUserContentEditEvent() +type ImportEvent interface { + isImportEvent() } -type messageEvent struct { +type MessageEvent struct { msg string } -func (messageEvent) isIssueEvent() {} -func (messageEvent) isUserContentEditEvent() {} -func (messageEvent) isTimelineEvent() {} +func (MessageEvent) isImportEvent() {} -type issueData struct { +type IssueEvent struct { issue - issueEdits <-chan userContentEditEvent - timelineItems <-chan timelineEvent } -func (issueData) isIssueEvent() {} +func (IssueEvent) isImportEvent() {} + +type IssueEditEvent struct { + issueId githubv4.ID + userContentEdit +} + +func (IssueEditEvent) isImportEvent() {} -type timelineData struct { +type TimelineEvent struct { + issueId githubv4.ID timelineItem - userContentEdits <-chan userContentEditEvent } -func (timelineData) isTimelineEvent() {} +func (TimelineEvent) isImportEvent() {} -type userContentEditData struct { +type CommentEditEvent struct { + commentId githubv4.ID userContentEdit } -// func (userContentEditData) isEvent() -func (userContentEditData) isUserContentEditEvent() {} +func (CommentEditEvent) isImportEvent() {} -func (mm *importMediator) setError(err error) { - mm.errMut.Lock() - mm.err = err - mm.errMut.Unlock() +func (mm *importMediator) NextImportEvent() ImportEvent { + return <-mm.importEvents } func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator { mm := importMediator{ - gc: client, - owner: owner, - project: project, - since: since, - Issues: make(chan issueEvent, CHAN_CAPACITY), - err: nil, + gc: client, + owner: owner, + project: project, + since: since, + importEvents: make(chan ImportEvent, CHAN_CAPACITY), + err: nil, } go func() { - mm.fillIssues(ctx) - close(mm.Issues) + mm.fillImportEvents(ctx) + close(mm.importEvents) }() return &mm } @@ -146,95 +138,42 @@ func newCommentEditVars() varmap { } func (mm *importMediator) Error() error { - mm.errMut.Lock() - err := mm.err - mm.errMut.Unlock() - return err + return mm.err } func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) { query := userQuery{} vars := varmap{"login": githubv4.String(loginName)} - // handle message events localy - channel := make(chan messageEvent) - defer close(channel) - // print all messages immediately - go func() { - for event := range channel { - fmt.Println(event.msg) - } - }() - if err := mm.mQuery(ctx, &query, vars, channel); err != nil { + if err := mm.mQuery(ctx, &query, vars); err != nil { return nil, err } return &query.User, nil } -func (mm *importMediator) fillIssues(ctx context.Context) { - // First: setup message handling while submitting GraphQL queries. - msgs := make(chan messageEvent) - defer close(msgs) - // forward all the messages to the issue channel. The message will be queued after - // all the issues which has been completed. - go func() { - for msg := range msgs { - select { - case <-ctx.Done(): - return - case mm.Issues <- msg: - } - } - }() - // start processing the actual issues +func (mm *importMediator) fillImportEvents(ctx context.Context) { initialCursor := githubv4.String("") - issues, hasIssues := mm.queryIssue(ctx, initialCursor, msgs) + issues, hasIssues := mm.queryIssue(ctx, initialCursor) for hasIssues { for _, node := range issues.Nodes { - // We need to send an issue-bundle over the issue channel before we start - // filling the issue edits and the timeline items to avoid deadlocks. - issueEditChan := make(chan userContentEditEvent, CHAN_CAPACITY) - timelineBundleChan := make(chan timelineEvent, CHAN_CAPACITY) select { case <-ctx.Done(): return - case mm.Issues <- issueData{node.issue, issueEditChan, timelineBundleChan}: + case mm.importEvents <- IssueEvent{node.issue}: } - // We do not know whether the client reads from the issue edit channel - // or the timeline channel first. Since the capacity of any channel is limited - // any send operation may block. Hence, in order to avoid deadlocks we need - // to send over both these channels concurrently. - go func(node issueNode) { - mm.fillIssueEdits(ctx, &node, issueEditChan) - close(issueEditChan) - }(node) - go func(node issueNode) { - mm.fillTimeline(ctx, &node, timelineBundleChan) - close(timelineBundleChan) - }(node) + // issue edit events follow the issue event + mm.fillIssueEditEvents(ctx, &node) + // last come the timeline events + mm.fillTimelineEvents(ctx, &node) } if !issues.PageInfo.HasNextPage { break } - issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor, msgs) + issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor) } } -func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNode, channel chan<- userContentEditEvent) { - // First: setup message handling while submitting GraphQL queries. - msgs := make(chan messageEvent) - defer close(msgs) - // forward all the messages to the issue-edit channel. The message will be queued after - // all the issue edits which have been completed. - go func() { - for msg := range msgs { - select { - case <-ctx.Done(): - return - case channel <- msg: - } - } - }() +func (mm *importMediator) fillIssueEditEvents(ctx context.Context, issueNode *issueNode) { edits := &issueNode.UserContentEdits hasEdits := true for hasEdits { @@ -248,87 +187,86 @@ func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNo select { case <-ctx.Done(): return - case channel <- userContentEditData{edit}: + case mm.importEvents <- IssueEditEvent{issueId: issueNode.issue.Id, userContentEdit: edit}: } } if !edits.PageInfo.HasPreviousPage { break } - edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor, msgs) + edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor) } } -func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan<- timelineEvent) { - // First: setup message handling while submitting GraphQL queries. - msgs := make(chan messageEvent) - defer close(msgs) - // forward all the messages to the timeline channel. The message will be queued after - // all the timeline items which have been completed. - go func() { - for msg := range msgs { - select { - case <-ctx.Done(): - return - case channel <- msg: - } - } - }() +func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) { + vars := newIssueEditVars() + vars["gqlNodeId"] = nid + if cursor == "" { + vars["issueEditBefore"] = (*githubv4.String)(nil) + } else { + vars["issueEditBefore"] = cursor + } + query := issueEditQuery{} + if err := mm.mQuery(ctx, &query, vars); err != nil { + mm.err = err + return nil, false + } + connection := &query.Node.Issue.UserContentEdits + if len(connection.Nodes) <= 0 { + return nil, false + } + return connection, true +} + +func (mm *importMediator) fillTimelineEvents(ctx context.Context, issueNode *issueNode) { items := &issueNode.TimelineItems hasItems := true for hasItems { for _, item := range items.Nodes { + select { + case <-ctx.Done(): + return + case mm.importEvents <- TimelineEvent{issueId: issueNode.issue.Id, timelineItem: item}: + } if item.Typename == "IssueComment" { // Issue comments are different than other timeline items in that // they may have associated user content edits. - // - // Send over the timeline-channel before starting to fill the comment - // edits. - commentEditChan := make(chan userContentEditEvent, CHAN_CAPACITY) - select { - case <-ctx.Done(): - return - case channel <- timelineData{item, commentEditChan}: - } - // We need to create a new goroutine for filling the comment edit - // channel. - go func(item timelineItem) { - mm.fillCommentEdits(ctx, &item, commentEditChan) - close(commentEditChan) - }(item) - } else { - select { - case <-ctx.Done(): - return - case channel <- timelineData{item, nil}: - } + // Right after the comment we send the comment edits. + mm.fillCommentEdits(ctx, &item) } } if !items.PageInfo.HasNextPage { break } - items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor, msgs) + items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor) + } +} + +func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) { + vars := newTimelineVars() + vars["gqlNodeId"] = nid + if cursor == "" { + vars["timelineAfter"] = (*githubv4.String)(nil) + } else { + vars["timelineAfter"] = cursor } + query := timelineQuery{} + if err := mm.mQuery(ctx, &query, vars); err != nil { + mm.err = err + return nil, false + } + connection := &query.Node.Issue.TimelineItems + if len(connection.Nodes) <= 0 { + return nil, false + } + return connection, true } -func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem, channel chan<- userContentEditEvent) { +func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem) { // Here we are only concerned with timeline items of type issueComment. if item.Typename != "IssueComment" { return } // First: setup message handling while submitting GraphQL queries. - msgs := make(chan messageEvent) - defer close(msgs) - // forward all the messages to the user content edit channel. The message will be queued after - // all the user content edits which have been completed already. - go func() { - for msg := range msgs { - select { - case <-ctx.Done(): - return - case channel <- msg: - } - } - }() comment := &item.IssueComment edits := &comment.UserContentEdits hasEdits := true @@ -343,17 +281,17 @@ func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineIt select { case <-ctx.Done(): return - case channel <- userContentEditData{edit}: + case mm.importEvents <- CommentEditEvent{commentId: comment.Id, userContentEdit: edit}: } } if !edits.PageInfo.HasPreviousPage { break } - edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor, msgs) + edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor) } } -func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*userContentEditConnection, bool) { +func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) { vars := newCommentEditVars() vars["gqlNodeId"] = nid if cursor == "" { @@ -362,8 +300,8 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID vars["commentEditBefore"] = cursor } query := commentEditQuery{} - if err := mm.mQuery(ctx, &query, vars, msgs); err != nil { - mm.setError(err) + if err := mm.mQuery(ctx, &query, vars); err != nil { + mm.err = err return nil, false } connection := &query.Node.IssueComment.UserContentEdits @@ -373,47 +311,7 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID return connection, true } -func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*timelineItemsConnection, bool) { - vars := newTimelineVars() - vars["gqlNodeId"] = nid - if cursor == "" { - vars["timelineAfter"] = (*githubv4.String)(nil) - } else { - vars["timelineAfter"] = cursor - } - query := timelineQuery{} - if err := mm.mQuery(ctx, &query, vars, msgs); err != nil { - mm.setError(err) - return nil, false - } - connection := &query.Node.Issue.TimelineItems - if len(connection.Nodes) <= 0 { - return nil, false - } - return connection, true -} - -func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*userContentEditConnection, bool) { - vars := newIssueEditVars() - vars["gqlNodeId"] = nid - if cursor == "" { - vars["issueEditBefore"] = (*githubv4.String)(nil) - } else { - vars["issueEditBefore"] = cursor - } - query := issueEditQuery{} - if err := mm.mQuery(ctx, &query, vars, msgs); err != nil { - mm.setError(err) - return nil, false - } - connection := &query.Node.Issue.UserContentEdits - if len(connection.Nodes) <= 0 { - return nil, false - } - return connection, true -} - -func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String, msgs chan<- messageEvent) (*issueConnection, bool) { +func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) { vars := newIssueVars(mm.owner, mm.project, mm.since) if cursor == "" { vars["issueAfter"] = (*githubv4.String)(nil) @@ -421,8 +319,8 @@ func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String vars["issueAfter"] = githubv4.String(cursor) } query := issueQuery{} - if err := mm.mQuery(ctx, &query, vars, msgs); err != nil { - mm.setError(err) + if err := mm.mQuery(ctx, &query, vars); err != nil { + mm.err = err return nil, false } connection := &query.Repository.Issues @@ -443,29 +341,26 @@ func reverse(eds []userContentEdit) chan userContentEdit { return ret } -type rateLimiter interface { - rateLimit() rateLimit -} - // mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL query // and it is used to populate the response into it. It should be a pointer to a struct that // corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If // there is a Github rate limiting error, then the function sleeps and retries after the rate limit // is expired. If there is another error, then the method will retry before giving up. -func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}, msgs chan<- messageEvent) error { - if err := mm.queryOnce(ctx, query, vars, msgs); err == nil { +func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error { + if err := mm.queryOnce(ctx, query, vars); err == nil { // success: done return nil } // failure: we will retry - // To retry is important for importing projects with a big number of issues. + // To retry is important for importing projects with a big number of issues, because + // there may be temporary network errors or momentary internal errors of the github servers. retries := 3 var err error for i := 0; i < retries; i++ { // wait a few seconds before retry sleepTime := 8 * (i + 1) time.Sleep(time.Duration(sleepTime) * time.Second) - err = mm.queryOnce(ctx, query, vars, msgs) + err = mm.queryOnce(ctx, query, vars) if err == nil { // success: done return nil @@ -474,7 +369,7 @@ func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars ma return err } -func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}, msgs chan<- messageEvent) error { +func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}) error { // first: just send the query to the graphql api vars["dryRun"] = githubv4.Boolean(false) qctx, cancel := context.WithTimeout(ctx, defaultTimeout) @@ -507,7 +402,7 @@ func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars select { case <-ctx.Done(): return ctx.Err() - case msgs <- messageEvent{msg}: + case mm.importEvents <- MessageEvent{msg}: } timer := time.NewTimer(time.Until(resetTime)) select { |