aboutsummaryrefslogtreecommitdiffstats
path: root/bridge/github/import_mediator.go
diff options
context:
space:
mode:
Diffstat (limited to 'bridge/github/import_mediator.go')
-rw-r--r--bridge/github/import_mediator.go331
1 files changed, 113 insertions, 218 deletions
diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go
index 7da62968..25d9c312 100644
--- a/bridge/github/import_mediator.go
+++ b/bridge/github/import_mediator.go
@@ -4,13 +4,13 @@ import (
"context"
"fmt"
"strings"
- "sync"
"time"
"github.com/shurcooL/githubv4"
)
-const ( // These values influence how fast the github graphql rate limit is exhausted.
+const (
+ // These values influence how fast the github graphql rate limit is exhausted.
NUM_ISSUES = 40
NUM_ISSUE_EDITS = 100
NUM_TIMELINE_ITEMS = 100
@@ -34,76 +34,68 @@ type importMediator struct {
// given date should be imported.
since time.Time
- // Issues is a channel holding bundles of Issues to be imported. Each issueEvent
- // is either a message (type messageEvent) or a struct holding all the data associated with
- // one issue (type issueData).
- Issues chan issueEvent
+ // importEvents holds events representing issues, comments, edits, ...
+ // In this channel issues are immediately followed by their issue edits and comments are
+ // immediately followed by their comment edits.
+ importEvents chan ImportEvent
// Sticky error
err error
-
- // errMut is a mutex to synchronize access to the sticky error variable err.
- errMut sync.Mutex
}
-type issueEvent interface {
- isIssueEvent()
-}
-type timelineEvent interface {
- isTimelineEvent()
-}
-type userContentEditEvent interface {
- isUserContentEditEvent()
+type ImportEvent interface {
+ isImportEvent()
}
-type messageEvent struct {
+type MessageEvent struct {
msg string
}
-func (messageEvent) isIssueEvent() {}
-func (messageEvent) isUserContentEditEvent() {}
-func (messageEvent) isTimelineEvent() {}
+func (MessageEvent) isImportEvent() {}
-type issueData struct {
+type IssueEvent struct {
issue
- issueEdits <-chan userContentEditEvent
- timelineItems <-chan timelineEvent
}
-func (issueData) isIssueEvent() {}
+func (IssueEvent) isImportEvent() {}
+
+type IssueEditEvent struct {
+ issueId githubv4.ID
+ userContentEdit
+}
+
+func (IssueEditEvent) isImportEvent() {}
-type timelineData struct {
+type TimelineEvent struct {
+ issueId githubv4.ID
timelineItem
- userContentEdits <-chan userContentEditEvent
}
-func (timelineData) isTimelineEvent() {}
+func (TimelineEvent) isImportEvent() {}
-type userContentEditData struct {
+type CommentEditEvent struct {
+ commentId githubv4.ID
userContentEdit
}
-// func (userContentEditData) isEvent()
-func (userContentEditData) isUserContentEditEvent() {}
+func (CommentEditEvent) isImportEvent() {}
-func (mm *importMediator) setError(err error) {
- mm.errMut.Lock()
- mm.err = err
- mm.errMut.Unlock()
+func (mm *importMediator) NextImportEvent() ImportEvent {
+ return <-mm.importEvents
}
func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator {
mm := importMediator{
- gc: client,
- owner: owner,
- project: project,
- since: since,
- Issues: make(chan issueEvent, CHAN_CAPACITY),
- err: nil,
+ gc: client,
+ owner: owner,
+ project: project,
+ since: since,
+ importEvents: make(chan ImportEvent, CHAN_CAPACITY),
+ err: nil,
}
go func() {
- mm.fillIssues(ctx)
- close(mm.Issues)
+ mm.fillImportEvents(ctx)
+ close(mm.importEvents)
}()
return &mm
}
@@ -146,95 +138,42 @@ func newCommentEditVars() varmap {
}
func (mm *importMediator) Error() error {
- mm.errMut.Lock()
- err := mm.err
- mm.errMut.Unlock()
- return err
+ return mm.err
}
func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) {
query := userQuery{}
vars := varmap{"login": githubv4.String(loginName)}
- // handle message events localy
- channel := make(chan messageEvent)
- defer close(channel)
- // print all messages immediately
- go func() {
- for event := range channel {
- fmt.Println(event.msg)
- }
- }()
- if err := mm.mQuery(ctx, &query, vars, channel); err != nil {
+ if err := mm.mQuery(ctx, &query, vars); err != nil {
return nil, err
}
return &query.User, nil
}
-func (mm *importMediator) fillIssues(ctx context.Context) {
- // First: setup message handling while submitting GraphQL queries.
- msgs := make(chan messageEvent)
- defer close(msgs)
- // forward all the messages to the issue channel. The message will be queued after
- // all the issues which has been completed.
- go func() {
- for msg := range msgs {
- select {
- case <-ctx.Done():
- return
- case mm.Issues <- msg:
- }
- }
- }()
- // start processing the actual issues
+func (mm *importMediator) fillImportEvents(ctx context.Context) {
initialCursor := githubv4.String("")
- issues, hasIssues := mm.queryIssue(ctx, initialCursor, msgs)
+ issues, hasIssues := mm.queryIssue(ctx, initialCursor)
for hasIssues {
for _, node := range issues.Nodes {
- // We need to send an issue-bundle over the issue channel before we start
- // filling the issue edits and the timeline items to avoid deadlocks.
- issueEditChan := make(chan userContentEditEvent, CHAN_CAPACITY)
- timelineBundleChan := make(chan timelineEvent, CHAN_CAPACITY)
select {
case <-ctx.Done():
return
- case mm.Issues <- issueData{node.issue, issueEditChan, timelineBundleChan}:
+ case mm.importEvents <- IssueEvent{node.issue}:
}
- // We do not know whether the client reads from the issue edit channel
- // or the timeline channel first. Since the capacity of any channel is limited
- // any send operation may block. Hence, in order to avoid deadlocks we need
- // to send over both these channels concurrently.
- go func(node issueNode) {
- mm.fillIssueEdits(ctx, &node, issueEditChan)
- close(issueEditChan)
- }(node)
- go func(node issueNode) {
- mm.fillTimeline(ctx, &node, timelineBundleChan)
- close(timelineBundleChan)
- }(node)
+ // issue edit events follow the issue event
+ mm.fillIssueEditEvents(ctx, &node)
+ // last come the timeline events
+ mm.fillTimelineEvents(ctx, &node)
}
if !issues.PageInfo.HasNextPage {
break
}
- issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor, msgs)
+ issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor)
}
}
-func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNode, channel chan<- userContentEditEvent) {
- // First: setup message handling while submitting GraphQL queries.
- msgs := make(chan messageEvent)
- defer close(msgs)
- // forward all the messages to the issue-edit channel. The message will be queued after
- // all the issue edits which have been completed.
- go func() {
- for msg := range msgs {
- select {
- case <-ctx.Done():
- return
- case channel <- msg:
- }
- }
- }()
+func (mm *importMediator) fillIssueEditEvents(ctx context.Context, issueNode *issueNode) {
edits := &issueNode.UserContentEdits
hasEdits := true
for hasEdits {
@@ -248,87 +187,86 @@ func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNo
select {
case <-ctx.Done():
return
- case channel <- userContentEditData{edit}:
+ case mm.importEvents <- IssueEditEvent{issueId: issueNode.issue.Id, userContentEdit: edit}:
}
}
if !edits.PageInfo.HasPreviousPage {
break
}
- edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor, msgs)
+ edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor)
}
}
-func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan<- timelineEvent) {
- // First: setup message handling while submitting GraphQL queries.
- msgs := make(chan messageEvent)
- defer close(msgs)
- // forward all the messages to the timeline channel. The message will be queued after
- // all the timeline items which have been completed.
- go func() {
- for msg := range msgs {
- select {
- case <-ctx.Done():
- return
- case channel <- msg:
- }
- }
- }()
+func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
+ vars := newIssueEditVars()
+ vars["gqlNodeId"] = nid
+ if cursor == "" {
+ vars["issueEditBefore"] = (*githubv4.String)(nil)
+ } else {
+ vars["issueEditBefore"] = cursor
+ }
+ query := issueEditQuery{}
+ if err := mm.mQuery(ctx, &query, vars); err != nil {
+ mm.err = err
+ return nil, false
+ }
+ connection := &query.Node.Issue.UserContentEdits
+ if len(connection.Nodes) <= 0 {
+ return nil, false
+ }
+ return connection, true
+}
+
+func (mm *importMediator) fillTimelineEvents(ctx context.Context, issueNode *issueNode) {
items := &issueNode.TimelineItems
hasItems := true
for hasItems {
for _, item := range items.Nodes {
+ select {
+ case <-ctx.Done():
+ return
+ case mm.importEvents <- TimelineEvent{issueId: issueNode.issue.Id, timelineItem: item}:
+ }
if item.Typename == "IssueComment" {
// Issue comments are different than other timeline items in that
// they may have associated user content edits.
- //
- // Send over the timeline-channel before starting to fill the comment
- // edits.
- commentEditChan := make(chan userContentEditEvent, CHAN_CAPACITY)
- select {
- case <-ctx.Done():
- return
- case channel <- timelineData{item, commentEditChan}:
- }
- // We need to create a new goroutine for filling the comment edit
- // channel.
- go func(item timelineItem) {
- mm.fillCommentEdits(ctx, &item, commentEditChan)
- close(commentEditChan)
- }(item)
- } else {
- select {
- case <-ctx.Done():
- return
- case channel <- timelineData{item, nil}:
- }
+ // Right after the comment we send the comment edits.
+ mm.fillCommentEdits(ctx, &item)
}
}
if !items.PageInfo.HasNextPage {
break
}
- items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor, msgs)
+ items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor)
+ }
+}
+
+func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) {
+ vars := newTimelineVars()
+ vars["gqlNodeId"] = nid
+ if cursor == "" {
+ vars["timelineAfter"] = (*githubv4.String)(nil)
+ } else {
+ vars["timelineAfter"] = cursor
}
+ query := timelineQuery{}
+ if err := mm.mQuery(ctx, &query, vars); err != nil {
+ mm.err = err
+ return nil, false
+ }
+ connection := &query.Node.Issue.TimelineItems
+ if len(connection.Nodes) <= 0 {
+ return nil, false
+ }
+ return connection, true
}
-func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem, channel chan<- userContentEditEvent) {
+func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem) {
// Here we are only concerned with timeline items of type issueComment.
if item.Typename != "IssueComment" {
return
}
// First: setup message handling while submitting GraphQL queries.
- msgs := make(chan messageEvent)
- defer close(msgs)
- // forward all the messages to the user content edit channel. The message will be queued after
- // all the user content edits which have been completed already.
- go func() {
- for msg := range msgs {
- select {
- case <-ctx.Done():
- return
- case channel <- msg:
- }
- }
- }()
comment := &item.IssueComment
edits := &comment.UserContentEdits
hasEdits := true
@@ -343,17 +281,17 @@ func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineIt
select {
case <-ctx.Done():
return
- case channel <- userContentEditData{edit}:
+ case mm.importEvents <- CommentEditEvent{commentId: comment.Id, userContentEdit: edit}:
}
}
if !edits.PageInfo.HasPreviousPage {
break
}
- edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor, msgs)
+ edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor)
}
}
-func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*userContentEditConnection, bool) {
+func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
vars := newCommentEditVars()
vars["gqlNodeId"] = nid
if cursor == "" {
@@ -362,8 +300,8 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID
vars["commentEditBefore"] = cursor
}
query := commentEditQuery{}
- if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
- mm.setError(err)
+ if err := mm.mQuery(ctx, &query, vars); err != nil {
+ mm.err = err
return nil, false
}
connection := &query.Node.IssueComment.UserContentEdits
@@ -373,47 +311,7 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID
return connection, true
}
-func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*timelineItemsConnection, bool) {
- vars := newTimelineVars()
- vars["gqlNodeId"] = nid
- if cursor == "" {
- vars["timelineAfter"] = (*githubv4.String)(nil)
- } else {
- vars["timelineAfter"] = cursor
- }
- query := timelineQuery{}
- if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
- mm.setError(err)
- return nil, false
- }
- connection := &query.Node.Issue.TimelineItems
- if len(connection.Nodes) <= 0 {
- return nil, false
- }
- return connection, true
-}
-
-func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*userContentEditConnection, bool) {
- vars := newIssueEditVars()
- vars["gqlNodeId"] = nid
- if cursor == "" {
- vars["issueEditBefore"] = (*githubv4.String)(nil)
- } else {
- vars["issueEditBefore"] = cursor
- }
- query := issueEditQuery{}
- if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
- mm.setError(err)
- return nil, false
- }
- connection := &query.Node.Issue.UserContentEdits
- if len(connection.Nodes) <= 0 {
- return nil, false
- }
- return connection, true
-}
-
-func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String, msgs chan<- messageEvent) (*issueConnection, bool) {
+func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) {
vars := newIssueVars(mm.owner, mm.project, mm.since)
if cursor == "" {
vars["issueAfter"] = (*githubv4.String)(nil)
@@ -421,8 +319,8 @@ func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String
vars["issueAfter"] = githubv4.String(cursor)
}
query := issueQuery{}
- if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
- mm.setError(err)
+ if err := mm.mQuery(ctx, &query, vars); err != nil {
+ mm.err = err
return nil, false
}
connection := &query.Repository.Issues
@@ -443,29 +341,26 @@ func reverse(eds []userContentEdit) chan userContentEdit {
return ret
}
-type rateLimiter interface {
- rateLimit() rateLimit
-}
-
// mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL query
// and it is used to populate the response into it. It should be a pointer to a struct that
// corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If
// there is a Github rate limiting error, then the function sleeps and retries after the rate limit
// is expired. If there is another error, then the method will retry before giving up.
-func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}, msgs chan<- messageEvent) error {
- if err := mm.queryOnce(ctx, query, vars, msgs); err == nil {
+func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
+ if err := mm.queryOnce(ctx, query, vars); err == nil {
// success: done
return nil
}
// failure: we will retry
- // To retry is important for importing projects with a big number of issues.
+ // To retry is important for importing projects with a big number of issues, because
+ // there may be temporary network errors or momentary internal errors of the github servers.
retries := 3
var err error
for i := 0; i < retries; i++ {
// wait a few seconds before retry
sleepTime := 8 * (i + 1)
time.Sleep(time.Duration(sleepTime) * time.Second)
- err = mm.queryOnce(ctx, query, vars, msgs)
+ err = mm.queryOnce(ctx, query, vars)
if err == nil {
// success: done
return nil
@@ -474,7 +369,7 @@ func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars ma
return err
}
-func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}, msgs chan<- messageEvent) error {
+func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
// first: just send the query to the graphql api
vars["dryRun"] = githubv4.Boolean(false)
qctx, cancel := context.WithTimeout(ctx, defaultTimeout)
@@ -507,7 +402,7 @@ func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars
select {
case <-ctx.Done():
return ctx.Err()
- case msgs <- messageEvent{msg}:
+ case mm.importEvents <- MessageEvent{msg}:
}
timer := time.NewTimer(time.Until(resetTime))
select {