aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bridge/github/import.go81
-rw-r--r--bridge/github/import_mediator.go191
2 files changed, 209 insertions, 63 deletions
diff --git a/bridge/github/import.go b/bridge/github/import.go
index d492488b..5337c474 100644
--- a/bridge/github/import.go
+++ b/bridge/github/import.go
@@ -56,10 +56,21 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache,
defer close(gi.out)
// Loop over all matching issues
- for bundle := range gi.mediator.Issues() {
- issue := bundle.issue
- issueEdits := bundle.issueEdits
- timelineBundles := bundle.timelineBundles
+ for event := range gi.mediator.Issues {
+ var issue issue
+ var issueEdits <-chan userContentEditEvent
+ var timelineItems <-chan timelineEvent
+ switch e := event.(type) {
+ case messageEvent:
+ fmt.Println(e.msg)
+ continue
+ case issueData:
+ issue = e.issue
+ issueEdits = e.issueEdits
+ timelineItems = e.timelineItems
+ default:
+ panic(fmt.Sprint("Unknown event type"))
+ }
// create issue
b, err := gi.ensureIssue(ctx, repo, &issue, issueEdits)
if err != nil {
@@ -69,9 +80,19 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache,
}
// loop over timeline items
- for bundle := range timelineBundles {
- item := bundle.timelineItem
- edits := bundle.userContentEdits
+ for event := range timelineItems {
+ var item timelineItem
+ var edits <-chan userContentEditEvent
+ switch e := event.(type) {
+ case messageEvent:
+ fmt.Println(e.msg)
+ continue
+ case timelineData:
+ item = e.timelineItem
+ edits = e.userContentEdits
+ default:
+ panic(fmt.Sprint("Unknown event type"))
+ }
err := gi.ensureTimelineItem(ctx, repo, b, &item, edits)
if err != nil {
err = fmt.Errorf("timeline item creation: %v", err)
@@ -98,7 +119,27 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache,
return out, nil
}
-func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue, issueEdits <-chan userContentEdit) (*cache.BugCache, error) {
+// getNextUserContentEdit reads the input channel, handles messages, and returns the next
+// userContentEditData.
+func getNextUserContentEdit(in <-chan userContentEditEvent) (*userContentEditData, bool) {
+ for {
+ event, hasEvent := <-in
+ if !hasEvent {
+ return nil, false
+ }
+ switch e := event.(type) {
+ case messageEvent:
+ fmt.Println(e.msg)
+ continue
+ case userContentEditData:
+ return &e, true
+ default:
+ panic(fmt.Sprint("Unknown event type"))
+ }
+ }
+}
+
+func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue, issueEditEvents <-chan userContentEditEvent) (*cache.BugCache, error) {
author, err := gi.ensurePerson(ctx, repo, issue.Author)
if err != nil {
return nil, err
@@ -115,7 +156,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache
// get first issue edit
// if it exists, then it holds the bug creation
- firstEdit, hasEdit := <-issueEdits
+ firstEdit, hasEdit := getNextUserContentEdit(issueEditEvents)
// At Github there exist issues with seemingly empty titles. An example is
// https://github.com/NixOS/nixpkgs/issues/72730 .
@@ -162,7 +203,11 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache
return nil, fmt.Errorf("finding or creating issue")
}
// process remaining issue edits, if they exist
- for edit := range issueEdits {
+ for {
+ edit, hasEdit := getNextUserContentEdit(issueEditEvents)
+ if !hasEdit {
+ break
+ }
// other edits will be added as CommentEdit operations
target, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(issue.Id))
if err == cache.ErrNoMatchingOp {
@@ -174,7 +219,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache
return nil, err
}
- err = gi.ensureCommentEdit(ctx, repo, b, target, &edit)
+ err = gi.ensureCommentEdit(ctx, repo, b, target, &edit.userContentEdit)
if err != nil {
return nil, err
}
@@ -182,7 +227,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache
return b, nil
}
-func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item *timelineItem, commentEdits <-chan userContentEdit) error {
+func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item *timelineItem, commentEdits <-chan userContentEditEvent) error {
switch item.Typename {
case "IssueComment":
@@ -345,7 +390,7 @@ func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.Re
return nil
}
-func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment, commentEdits <-chan userContentEdit) error {
+func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment, commentEditEvents <-chan userContentEditEvent) error {
author, err := gi.ensurePerson(ctx, repo, comment.Author)
if err != nil {
return err
@@ -356,7 +401,7 @@ func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCac
// real error
return err
}
- firstEdit, hasEdit := <-commentEdits
+ firstEdit, hasEdit := getNextUserContentEdit(commentEditEvents)
if err == cache.ErrNoMatchingOp {
var textInput string
if hasEdit {
@@ -393,14 +438,18 @@ func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCac
return fmt.Errorf("finding or creating issue comment")
}
// process remaining comment edits, if they exist
- for edit := range commentEdits {
+ for {
+ edit, hasEdit := getNextUserContentEdit(commentEditEvents)
+ if !hasEdit {
+ break
+ }
// ensure editor identity
_, err := gi.ensurePerson(ctx, repo, edit.Editor)
if err != nil {
return err
}
- err = gi.ensureCommentEdit(ctx, repo, b, targetOpID, &edit)
+ err = gi.ensureCommentEdit(ctx, repo, b, targetOpID, &edit.userContentEdit)
if err != nil {
return err
}
diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go
index 8d1796b0..7da62968 100644
--- a/bridge/github/import_mediator.go
+++ b/bridge/github/import_mediator.go
@@ -34,9 +34,10 @@ type importMediator struct {
// given date should be imported.
since time.Time
- // issues is a channel holding bundles of issues to be imported. Each bundle holds the data
- // associated with one issue.
- issues chan issueBundle
+ // Issues is a channel holding bundles of Issues to be imported. Each issueEvent
+ // is either a message (type messageEvent) or a struct holding all the data associated with
+ // one issue (type issueData).
+ Issues chan issueEvent
// Sticky error
err error
@@ -45,17 +46,46 @@ type importMediator struct {
errMut sync.Mutex
}
-type issueBundle struct {
- issue issue
- issueEdits <-chan userContentEdit
- timelineBundles <-chan timelineBundle
+type issueEvent interface {
+ isIssueEvent()
+}
+type timelineEvent interface {
+ isTimelineEvent()
+}
+type userContentEditEvent interface {
+ isUserContentEditEvent()
}
-type timelineBundle struct {
- timelineItem timelineItem
- userContentEdits <-chan userContentEdit
+type messageEvent struct {
+ msg string
}
+func (messageEvent) isIssueEvent() {}
+func (messageEvent) isUserContentEditEvent() {}
+func (messageEvent) isTimelineEvent() {}
+
+type issueData struct {
+ issue
+ issueEdits <-chan userContentEditEvent
+ timelineItems <-chan timelineEvent
+}
+
+func (issueData) isIssueEvent() {}
+
+type timelineData struct {
+ timelineItem
+ userContentEdits <-chan userContentEditEvent
+}
+
+func (timelineData) isTimelineEvent() {}
+
+type userContentEditData struct {
+ userContentEdit
+}
+
+// func (userContentEditData) isEvent()
+func (userContentEditData) isUserContentEditEvent() {}
+
func (mm *importMediator) setError(err error) {
mm.errMut.Lock()
mm.err = err
@@ -68,12 +98,12 @@ func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, proj
owner: owner,
project: project,
since: since,
- issues: make(chan issueBundle, CHAN_CAPACITY),
+ Issues: make(chan issueEvent, CHAN_CAPACITY),
err: nil,
}
go func() {
mm.fillIssues(ctx)
- close(mm.issues)
+ close(mm.Issues)
}()
return &mm
}
@@ -115,10 +145,6 @@ func newCommentEditVars() varmap {
}
}
-func (mm *importMediator) Issues() <-chan issueBundle {
- return mm.issues
-}
-
func (mm *importMediator) Error() error {
mm.errMut.Lock()
err := mm.err
@@ -129,25 +155,49 @@ func (mm *importMediator) Error() error {
func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) {
query := userQuery{}
vars := varmap{"login": githubv4.String(loginName)}
- if err := mm.mQuery(ctx, &query, vars); err != nil {
+ // handle message events localy
+ channel := make(chan messageEvent)
+ defer close(channel)
+ // print all messages immediately
+ go func() {
+ for event := range channel {
+ fmt.Println(event.msg)
+ }
+ }()
+ if err := mm.mQuery(ctx, &query, vars, channel); err != nil {
return nil, err
}
return &query.User, nil
}
func (mm *importMediator) fillIssues(ctx context.Context) {
+ // First: setup message handling while submitting GraphQL queries.
+ msgs := make(chan messageEvent)
+ defer close(msgs)
+ // forward all the messages to the issue channel. The message will be queued after
+ // all the issues which has been completed.
+ go func() {
+ for msg := range msgs {
+ select {
+ case <-ctx.Done():
+ return
+ case mm.Issues <- msg:
+ }
+ }
+ }()
+ // start processing the actual issues
initialCursor := githubv4.String("")
- issues, hasIssues := mm.queryIssue(ctx, initialCursor)
+ issues, hasIssues := mm.queryIssue(ctx, initialCursor, msgs)
for hasIssues {
for _, node := range issues.Nodes {
// We need to send an issue-bundle over the issue channel before we start
// filling the issue edits and the timeline items to avoid deadlocks.
- issueEditChan := make(chan userContentEdit, CHAN_CAPACITY)
- timelineBundleChan := make(chan timelineBundle, CHAN_CAPACITY)
+ issueEditChan := make(chan userContentEditEvent, CHAN_CAPACITY)
+ timelineBundleChan := make(chan timelineEvent, CHAN_CAPACITY)
select {
case <-ctx.Done():
return
- case mm.issues <- issueBundle{node.issue, issueEditChan, timelineBundleChan}:
+ case mm.Issues <- issueData{node.issue, issueEditChan, timelineBundleChan}:
}
// We do not know whether the client reads from the issue edit channel
@@ -166,11 +216,25 @@ func (mm *importMediator) fillIssues(ctx context.Context) {
if !issues.PageInfo.HasNextPage {
break
}
- issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor)
+ issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor, msgs)
}
}
-func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNode, channel chan userContentEdit) {
+func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNode, channel chan<- userContentEditEvent) {
+ // First: setup message handling while submitting GraphQL queries.
+ msgs := make(chan messageEvent)
+ defer close(msgs)
+ // forward all the messages to the issue-edit channel. The message will be queued after
+ // all the issue edits which have been completed.
+ go func() {
+ for msg := range msgs {
+ select {
+ case <-ctx.Done():
+ return
+ case channel <- msg:
+ }
+ }
+ }()
edits := &issueNode.UserContentEdits
hasEdits := true
for hasEdits {
@@ -184,17 +248,31 @@ func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNo
select {
case <-ctx.Done():
return
- case channel <- edit:
+ case channel <- userContentEditData{edit}:
}
}
if !edits.PageInfo.HasPreviousPage {
break
}
- edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor)
+ edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor, msgs)
}
}
-func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan timelineBundle) {
+func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan<- timelineEvent) {
+ // First: setup message handling while submitting GraphQL queries.
+ msgs := make(chan messageEvent)
+ defer close(msgs)
+ // forward all the messages to the timeline channel. The message will be queued after
+ // all the timeline items which have been completed.
+ go func() {
+ for msg := range msgs {
+ select {
+ case <-ctx.Done():
+ return
+ case channel <- msg:
+ }
+ }
+ }()
items := &issueNode.TimelineItems
hasItems := true
for hasItems {
@@ -205,11 +283,11 @@ func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode
//
// Send over the timeline-channel before starting to fill the comment
// edits.
- commentEditChan := make(chan userContentEdit, CHAN_CAPACITY)
+ commentEditChan := make(chan userContentEditEvent, CHAN_CAPACITY)
select {
case <-ctx.Done():
return
- case channel <- timelineBundle{item, commentEditChan}:
+ case channel <- timelineData{item, commentEditChan}:
}
// We need to create a new goroutine for filling the comment edit
// channel.
@@ -221,22 +299,36 @@ func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode
select {
case <-ctx.Done():
return
- case channel <- timelineBundle{item, nil}:
+ case channel <- timelineData{item, nil}:
}
}
}
if !items.PageInfo.HasNextPage {
break
}
- items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor)
+ items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor, msgs)
}
}
-func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem, channel chan userContentEdit) {
+func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem, channel chan<- userContentEditEvent) {
// Here we are only concerned with timeline items of type issueComment.
if item.Typename != "IssueComment" {
return
}
+ // First: setup message handling while submitting GraphQL queries.
+ msgs := make(chan messageEvent)
+ defer close(msgs)
+ // forward all the messages to the user content edit channel. The message will be queued after
+ // all the user content edits which have been completed already.
+ go func() {
+ for msg := range msgs {
+ select {
+ case <-ctx.Done():
+ return
+ case channel <- msg:
+ }
+ }
+ }()
comment := &item.IssueComment
edits := &comment.UserContentEdits
hasEdits := true
@@ -251,17 +343,17 @@ func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineIt
select {
case <-ctx.Done():
return
- case channel <- edit:
+ case channel <- userContentEditData{edit}:
}
}
if !edits.PageInfo.HasPreviousPage {
break
}
- edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor)
+ edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor, msgs)
}
}
-func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
+func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*userContentEditConnection, bool) {
vars := newCommentEditVars()
vars["gqlNodeId"] = nid
if cursor == "" {
@@ -270,7 +362,7 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID
vars["commentEditBefore"] = cursor
}
query := commentEditQuery{}
- if err := mm.mQuery(ctx, &query, vars); err != nil {
+ if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
mm.setError(err)
return nil, false
}
@@ -281,7 +373,7 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID
return connection, true
}
-func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) {
+func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*timelineItemsConnection, bool) {
vars := newTimelineVars()
vars["gqlNodeId"] = nid
if cursor == "" {
@@ -290,7 +382,7 @@ func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cu
vars["timelineAfter"] = cursor
}
query := timelineQuery{}
- if err := mm.mQuery(ctx, &query, vars); err != nil {
+ if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
mm.setError(err)
return nil, false
}
@@ -301,7 +393,7 @@ func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cu
return connection, true
}
-func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
+func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*userContentEditConnection, bool) {
vars := newIssueEditVars()
vars["gqlNodeId"] = nid
if cursor == "" {
@@ -310,7 +402,7 @@ func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID,
vars["issueEditBefore"] = cursor
}
query := issueEditQuery{}
- if err := mm.mQuery(ctx, &query, vars); err != nil {
+ if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
mm.setError(err)
return nil, false
}
@@ -321,7 +413,7 @@ func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID,
return connection, true
}
-func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) {
+func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String, msgs chan<- messageEvent) (*issueConnection, bool) {
vars := newIssueVars(mm.owner, mm.project, mm.since)
if cursor == "" {
vars["issueAfter"] = (*githubv4.String)(nil)
@@ -329,7 +421,7 @@ func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String
vars["issueAfter"] = githubv4.String(cursor)
}
query := issueQuery{}
- if err := mm.mQuery(ctx, &query, vars); err != nil {
+ if err := mm.mQuery(ctx, &query, vars, msgs); err != nil {
mm.setError(err)
return nil, false
}
@@ -360,20 +452,20 @@ type rateLimiter interface {
// corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If
// there is a Github rate limiting error, then the function sleeps and retries after the rate limit
// is expired. If there is another error, then the method will retry before giving up.
-func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
- if err := mm.queryOnce(ctx, query, vars); err == nil {
+func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}, msgs chan<- messageEvent) error {
+ if err := mm.queryOnce(ctx, query, vars, msgs); err == nil {
// success: done
return nil
}
// failure: we will retry
- // This is important for importing projects with a big number of issues.
+ // To retry is important for importing projects with a big number of issues.
retries := 3
var err error
for i := 0; i < retries; i++ {
// wait a few seconds before retry
sleepTime := 8 * (i + 1)
time.Sleep(time.Duration(sleepTime) * time.Second)
- err = mm.queryOnce(ctx, query, vars)
+ err = mm.queryOnce(ctx, query, vars, msgs)
if err == nil {
// success: done
return nil
@@ -382,7 +474,7 @@ func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars ma
return err
}
-func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
+func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}, msgs chan<- messageEvent) error {
// first: just send the query to the graphql api
vars["dryRun"] = githubv4.Boolean(false)
qctx, cancel := context.WithTimeout(ctx, defaultTimeout)
@@ -411,7 +503,12 @@ func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars
resetTime := rateLimit.ResetAt.Time
// Add a few seconds (8) for good measure
resetTime = resetTime.Add(8 * time.Second)
- fmt.Printf("Github rate limit exhausted. Sleeping until %s\n", resetTime.String())
+ msg := fmt.Sprintf("Github GraphQL API rate limit exhausted. Sleeping until %s", resetTime.String())
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case msgs <- messageEvent{msg}:
+ }
timer := time.NewTimer(time.Until(resetTime))
select {
case <-ctx.Done():