diff options
Diffstat (limited to 'bridge')
-rw-r--r-- | bridge/github/import.go | 35 | ||||
-rw-r--r-- | bridge/github/import_mediator.go | 120 |
2 files changed, 62 insertions, 93 deletions
diff --git a/bridge/github/import.go b/bridge/github/import.go index 09a39586..d492488b 100644 --- a/bridge/github/import.go +++ b/bridge/github/import.go @@ -56,9 +56,12 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, defer close(gi.out) // Loop over all matching issues - for issue := range gi.mediator.Issues() { + for bundle := range gi.mediator.Issues() { + issue := bundle.issue + issueEdits := bundle.issueEdits + timelineBundles := bundle.timelineBundles // create issue - b, err := gi.ensureIssue(ctx, repo, &issue) + b, err := gi.ensureIssue(ctx, repo, &issue, issueEdits) if err != nil { err := fmt.Errorf("issue creation: %v", err) out <- core.NewImportError(err, "") @@ -66,8 +69,10 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, } // loop over timeline items - for item := range gi.mediator.TimelineItems(&issue) { - err := gi.ensureTimelineItem(ctx, repo, b, item) + for bundle := range timelineBundles { + item := bundle.timelineItem + edits := bundle.userContentEdits + err := gi.ensureTimelineItem(ctx, repo, b, &item, edits) if err != nil { err = fmt.Errorf("timeline item creation: %v", err) out <- core.NewImportError(err, "") @@ -93,7 +98,7 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, return out, nil } -func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue) (*cache.BugCache, error) { +func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue, issueEdits <-chan userContentEdit) (*cache.BugCache, error) { author, err := gi.ensurePerson(ctx, repo, issue.Author) if err != nil { return nil, err @@ -110,7 +115,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache // get first issue edit // if it exists, then it holds the bug creation - firstEdit, hasEdit := <-gi.mediator.IssueEdits(issue) + firstEdit, hasEdit := <-issueEdits // At Github there exist issues with seemingly empty titles. An example is // https://github.com/NixOS/nixpkgs/issues/72730 . @@ -157,7 +162,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache return nil, fmt.Errorf("finding or creating issue") } // process remaining issue edits, if they exist - for edit := range gi.mediator.IssueEdits(issue) { + for edit := range issueEdits { // other edits will be added as CommentEdit operations target, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(issue.Id)) if err == cache.ErrNoMatchingOp { @@ -169,7 +174,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache return nil, err } - err = gi.ensureCommentEdit(ctx, repo, b, target, edit) + err = gi.ensureCommentEdit(ctx, repo, b, target, &edit) if err != nil { return nil, err } @@ -177,11 +182,11 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache return b, nil } -func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item timelineItem) error { +func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item *timelineItem, commentEdits <-chan userContentEdit) error { switch item.Typename { case "IssueComment": - err := gi.ensureComment(ctx, repo, b, &item.IssueComment) + err := gi.ensureComment(ctx, repo, b, &item.IssueComment, commentEdits) if err != nil { return fmt.Errorf("timeline comment creation: %v", err) } @@ -340,7 +345,7 @@ func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.Re return nil } -func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment) error { +func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment, commentEdits <-chan userContentEdit) error { author, err := gi.ensurePerson(ctx, repo, comment.Author) if err != nil { return err @@ -351,7 +356,7 @@ func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCac // real error return err } - firstEdit, hasEdit := <-gi.mediator.CommentEdits(comment) + firstEdit, hasEdit := <-commentEdits if err == cache.ErrNoMatchingOp { var textInput string if hasEdit { @@ -388,14 +393,14 @@ func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCac return fmt.Errorf("finding or creating issue comment") } // process remaining comment edits, if they exist - for edit := range gi.mediator.CommentEdits(comment) { + for edit := range commentEdits { // ensure editor identity _, err := gi.ensurePerson(ctx, repo, edit.Editor) if err != nil { return err } - err = gi.ensureCommentEdit(ctx, repo, b, targetOpID, edit) + err = gi.ensureCommentEdit(ctx, repo, b, targetOpID, &edit) if err != nil { return err } @@ -403,7 +408,7 @@ func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCac return nil } -func (gi *githubImporter) ensureCommentEdit(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit userContentEdit) error { +func (gi *githubImporter) ensureCommentEdit(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit *userContentEdit) error { _, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(edit.Id)) if err == nil { return nil diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go index 8bd33adb..02067286 100644 --- a/bridge/github/import_mediator.go +++ b/bridge/github/import_mediator.go @@ -19,9 +19,7 @@ const ( // These values influence how fast the github graphql rate limit is exha CHAN_CAPACITY = 128 ) -type varmap map[string]interface{} - -// importMediator provides an interface to retrieve Github issues. +// importMediator provides a convenient interface to retrieve issues from the Github GraphQL API. type importMediator struct { // Github graphql client gc *githubv4.Client @@ -32,28 +30,30 @@ type importMediator struct { // name of the Github repository project string - // The importMediator will only query issues updated or created after the date given in - // the variable since. + // since specifies which issues to import. Issues that have been updated at or after the + // given date should be imported. since time.Time - // channel for the issues - issues chan issue + // issues is a channel holding bundles of issues to be imported. Each bundle holds the data + // associated with one issue. + issues chan issueBundle - // channel for issue edits - issueEdits map[githubv4.ID]chan userContentEdit - issueEditsMut sync.Mutex + // Sticky error + err error - // channel for timeline items - timelineItems map[githubv4.ID]chan timelineItem - timelineItemsMut sync.Mutex + // errMut is a mutex to synchronize access to the sticky error variable err. + errMut sync.Mutex +} - // channel for comment edits - commentEdits map[githubv4.ID]chan userContentEdit - commentEditsMut sync.Mutex +type issueBundle struct { + issue issue + issueEdits <-chan userContentEdit + timelineBundles <-chan timelineBundle +} - // Sticky error - err error - errMut sync.Mutex +type timelineBundle struct { + timelineItem timelineItem + userContentEdits <-chan userContentEdit } func (mm *importMediator) setError(err error) { @@ -64,18 +64,12 @@ func (mm *importMediator) setError(err error) { func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator { mm := importMediator{ - gc: client, - owner: owner, - project: project, - since: since, - issues: make(chan issue, CHAN_CAPACITY), - issueEdits: make(map[githubv4.ID]chan userContentEdit), - issueEditsMut: sync.Mutex{}, - timelineItems: make(map[githubv4.ID]chan timelineItem), - timelineItemsMut: sync.Mutex{}, - commentEdits: make(map[githubv4.ID]chan userContentEdit), - commentEditsMut: sync.Mutex{}, - err: nil, + gc: client, + owner: owner, + project: project, + since: since, + issues: make(chan issueBundle, CHAN_CAPACITY), + err: nil, } go func() { mm.fillIssues(ctx) @@ -84,6 +78,8 @@ func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, proj return &mm } +type varmap map[string]interface{} + func newIssueVars(owner, project string, since time.Time) varmap { return varmap{ "owner": githubv4.String(owner), @@ -119,31 +115,10 @@ func newCommentEditVars() varmap { } } -func (mm *importMediator) Issues() <-chan issue { +func (mm *importMediator) Issues() <-chan issueBundle { return mm.issues } -func (mm *importMediator) IssueEdits(issue *issue) <-chan userContentEdit { - mm.issueEditsMut.Lock() - channel := mm.issueEdits[issue.Id] - mm.issueEditsMut.Unlock() - return channel -} - -func (mm *importMediator) TimelineItems(issue *issue) <-chan timelineItem { - mm.timelineItemsMut.Lock() - channel := mm.timelineItems[issue.Id] - mm.timelineItemsMut.Unlock() - return channel -} - -func (mm *importMediator) CommentEdits(comment *issueComment) <-chan userContentEdit { - mm.commentEditsMut.Lock() - channel := mm.commentEdits[comment.Id] - mm.commentEditsMut.Unlock() - return channel -} - func (mm *importMediator) Error() error { mm.errMut.Lock() err := mm.err @@ -165,26 +140,14 @@ func (mm *importMediator) fillIssues(ctx context.Context) { issues, hasIssues := mm.queryIssue(ctx, initialCursor) for hasIssues { for _, node := range issues.Nodes { - // The order of statements in this loop is crucial for the correct concurrent - // execution. - // - // The issue edit channel and the timeline channel need to be added to the - // corresponding maps before the issue is sent in the issue channel. - // Otherwise, the client could try to retrieve issue edits and timeline itmes - // before these channels are even created. In this case the client would - // receive a nil channel. + // We need to send an issue-bundle over the issue channel before we start + // filling the issue edits and the timeline items to avoid deadlocks. issueEditChan := make(chan userContentEdit, CHAN_CAPACITY) - timelineChan := make(chan timelineItem, CHAN_CAPACITY) - mm.issueEditsMut.Lock() - mm.issueEdits[node.issue.Id] = issueEditChan - mm.issueEditsMut.Unlock() - mm.timelineItemsMut.Lock() - mm.timelineItems[node.issue.Id] = timelineChan - mm.timelineItemsMut.Unlock() + timelineBundleChan := make(chan timelineBundle, CHAN_CAPACITY) select { case <-ctx.Done(): return - case mm.issues <- node.issue: + case mm.issues <- issueBundle{node.issue, issueEditChan, timelineBundleChan}: } // We do not know whether the client reads from the issue edit channel @@ -196,8 +159,8 @@ func (mm *importMediator) fillIssues(ctx context.Context) { close(issueEditChan) }(node) go func(node issueNode) { - mm.fillTimeline(ctx, &node, timelineChan) - close(timelineChan) + mm.fillTimeline(ctx, &node, timelineBundleChan) + close(timelineBundleChan) }(node) } if !issues.PageInfo.HasNextPage { @@ -231,21 +194,22 @@ func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNo } } -func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan timelineItem) { +func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan timelineBundle) { items := &issueNode.TimelineItems hasItems := true for hasItems { for _, item := range items.Nodes { if item.Typename == "IssueComment" { - // Here the order of statements is crucial for correct concurrency. + // Issue comments are different than other timeline items in that + // they may have associated user content edits. + // + // Send over the timeline-channel before starting to fill the comment + // edits. commentEditChan := make(chan userContentEdit, CHAN_CAPACITY) - mm.commentEditsMut.Lock() - mm.commentEdits[item.IssueComment.Id] = commentEditChan - mm.commentEditsMut.Unlock() select { case <-ctx.Done(): return - case channel <- item: + case channel <- timelineBundle{item, commentEditChan}: } // We need to create a new goroutine for filling the comment edit // channel. @@ -257,7 +221,7 @@ func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode select { case <-ctx.Done(): return - case channel <- item: + case channel <- timelineBundle{item, nil}: } } } |