aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bridge/github/import.go35
-rw-r--r--bridge/github/import_mediator.go120
2 files changed, 62 insertions, 93 deletions
diff --git a/bridge/github/import.go b/bridge/github/import.go
index 09a39586..d492488b 100644
--- a/bridge/github/import.go
+++ b/bridge/github/import.go
@@ -56,9 +56,12 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache,
defer close(gi.out)
// Loop over all matching issues
- for issue := range gi.mediator.Issues() {
+ for bundle := range gi.mediator.Issues() {
+ issue := bundle.issue
+ issueEdits := bundle.issueEdits
+ timelineBundles := bundle.timelineBundles
// create issue
- b, err := gi.ensureIssue(ctx, repo, &issue)
+ b, err := gi.ensureIssue(ctx, repo, &issue, issueEdits)
if err != nil {
err := fmt.Errorf("issue creation: %v", err)
out <- core.NewImportError(err, "")
@@ -66,8 +69,10 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache,
}
// loop over timeline items
- for item := range gi.mediator.TimelineItems(&issue) {
- err := gi.ensureTimelineItem(ctx, repo, b, item)
+ for bundle := range timelineBundles {
+ item := bundle.timelineItem
+ edits := bundle.userContentEdits
+ err := gi.ensureTimelineItem(ctx, repo, b, &item, edits)
if err != nil {
err = fmt.Errorf("timeline item creation: %v", err)
out <- core.NewImportError(err, "")
@@ -93,7 +98,7 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache,
return out, nil
}
-func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue) (*cache.BugCache, error) {
+func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue, issueEdits <-chan userContentEdit) (*cache.BugCache, error) {
author, err := gi.ensurePerson(ctx, repo, issue.Author)
if err != nil {
return nil, err
@@ -110,7 +115,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache
// get first issue edit
// if it exists, then it holds the bug creation
- firstEdit, hasEdit := <-gi.mediator.IssueEdits(issue)
+ firstEdit, hasEdit := <-issueEdits
// At Github there exist issues with seemingly empty titles. An example is
// https://github.com/NixOS/nixpkgs/issues/72730 .
@@ -157,7 +162,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache
return nil, fmt.Errorf("finding or creating issue")
}
// process remaining issue edits, if they exist
- for edit := range gi.mediator.IssueEdits(issue) {
+ for edit := range issueEdits {
// other edits will be added as CommentEdit operations
target, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(issue.Id))
if err == cache.ErrNoMatchingOp {
@@ -169,7 +174,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache
return nil, err
}
- err = gi.ensureCommentEdit(ctx, repo, b, target, edit)
+ err = gi.ensureCommentEdit(ctx, repo, b, target, &edit)
if err != nil {
return nil, err
}
@@ -177,11 +182,11 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache
return b, nil
}
-func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item timelineItem) error {
+func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item *timelineItem, commentEdits <-chan userContentEdit) error {
switch item.Typename {
case "IssueComment":
- err := gi.ensureComment(ctx, repo, b, &item.IssueComment)
+ err := gi.ensureComment(ctx, repo, b, &item.IssueComment, commentEdits)
if err != nil {
return fmt.Errorf("timeline comment creation: %v", err)
}
@@ -340,7 +345,7 @@ func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.Re
return nil
}
-func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment) error {
+func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment, commentEdits <-chan userContentEdit) error {
author, err := gi.ensurePerson(ctx, repo, comment.Author)
if err != nil {
return err
@@ -351,7 +356,7 @@ func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCac
// real error
return err
}
- firstEdit, hasEdit := <-gi.mediator.CommentEdits(comment)
+ firstEdit, hasEdit := <-commentEdits
if err == cache.ErrNoMatchingOp {
var textInput string
if hasEdit {
@@ -388,14 +393,14 @@ func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCac
return fmt.Errorf("finding or creating issue comment")
}
// process remaining comment edits, if they exist
- for edit := range gi.mediator.CommentEdits(comment) {
+ for edit := range commentEdits {
// ensure editor identity
_, err := gi.ensurePerson(ctx, repo, edit.Editor)
if err != nil {
return err
}
- err = gi.ensureCommentEdit(ctx, repo, b, targetOpID, edit)
+ err = gi.ensureCommentEdit(ctx, repo, b, targetOpID, &edit)
if err != nil {
return err
}
@@ -403,7 +408,7 @@ func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCac
return nil
}
-func (gi *githubImporter) ensureCommentEdit(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit userContentEdit) error {
+func (gi *githubImporter) ensureCommentEdit(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit *userContentEdit) error {
_, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(edit.Id))
if err == nil {
return nil
diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go
index 8bd33adb..02067286 100644
--- a/bridge/github/import_mediator.go
+++ b/bridge/github/import_mediator.go
@@ -19,9 +19,7 @@ const ( // These values influence how fast the github graphql rate limit is exha
CHAN_CAPACITY = 128
)
-type varmap map[string]interface{}
-
-// importMediator provides an interface to retrieve Github issues.
+// importMediator provides a convenient interface to retrieve issues from the Github GraphQL API.
type importMediator struct {
// Github graphql client
gc *githubv4.Client
@@ -32,28 +30,30 @@ type importMediator struct {
// name of the Github repository
project string
- // The importMediator will only query issues updated or created after the date given in
- // the variable since.
+ // since specifies which issues to import. Issues that have been updated at or after the
+ // given date should be imported.
since time.Time
- // channel for the issues
- issues chan issue
+ // issues is a channel holding bundles of issues to be imported. Each bundle holds the data
+ // associated with one issue.
+ issues chan issueBundle
- // channel for issue edits
- issueEdits map[githubv4.ID]chan userContentEdit
- issueEditsMut sync.Mutex
+ // Sticky error
+ err error
- // channel for timeline items
- timelineItems map[githubv4.ID]chan timelineItem
- timelineItemsMut sync.Mutex
+ // errMut is a mutex to synchronize access to the sticky error variable err.
+ errMut sync.Mutex
+}
- // channel for comment edits
- commentEdits map[githubv4.ID]chan userContentEdit
- commentEditsMut sync.Mutex
+type issueBundle struct {
+ issue issue
+ issueEdits <-chan userContentEdit
+ timelineBundles <-chan timelineBundle
+}
- // Sticky error
- err error
- errMut sync.Mutex
+type timelineBundle struct {
+ timelineItem timelineItem
+ userContentEdits <-chan userContentEdit
}
func (mm *importMediator) setError(err error) {
@@ -64,18 +64,12 @@ func (mm *importMediator) setError(err error) {
func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator {
mm := importMediator{
- gc: client,
- owner: owner,
- project: project,
- since: since,
- issues: make(chan issue, CHAN_CAPACITY),
- issueEdits: make(map[githubv4.ID]chan userContentEdit),
- issueEditsMut: sync.Mutex{},
- timelineItems: make(map[githubv4.ID]chan timelineItem),
- timelineItemsMut: sync.Mutex{},
- commentEdits: make(map[githubv4.ID]chan userContentEdit),
- commentEditsMut: sync.Mutex{},
- err: nil,
+ gc: client,
+ owner: owner,
+ project: project,
+ since: since,
+ issues: make(chan issueBundle, CHAN_CAPACITY),
+ err: nil,
}
go func() {
mm.fillIssues(ctx)
@@ -84,6 +78,8 @@ func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, proj
return &mm
}
+type varmap map[string]interface{}
+
func newIssueVars(owner, project string, since time.Time) varmap {
return varmap{
"owner": githubv4.String(owner),
@@ -119,31 +115,10 @@ func newCommentEditVars() varmap {
}
}
-func (mm *importMediator) Issues() <-chan issue {
+func (mm *importMediator) Issues() <-chan issueBundle {
return mm.issues
}
-func (mm *importMediator) IssueEdits(issue *issue) <-chan userContentEdit {
- mm.issueEditsMut.Lock()
- channel := mm.issueEdits[issue.Id]
- mm.issueEditsMut.Unlock()
- return channel
-}
-
-func (mm *importMediator) TimelineItems(issue *issue) <-chan timelineItem {
- mm.timelineItemsMut.Lock()
- channel := mm.timelineItems[issue.Id]
- mm.timelineItemsMut.Unlock()
- return channel
-}
-
-func (mm *importMediator) CommentEdits(comment *issueComment) <-chan userContentEdit {
- mm.commentEditsMut.Lock()
- channel := mm.commentEdits[comment.Id]
- mm.commentEditsMut.Unlock()
- return channel
-}
-
func (mm *importMediator) Error() error {
mm.errMut.Lock()
err := mm.err
@@ -165,26 +140,14 @@ func (mm *importMediator) fillIssues(ctx context.Context) {
issues, hasIssues := mm.queryIssue(ctx, initialCursor)
for hasIssues {
for _, node := range issues.Nodes {
- // The order of statements in this loop is crucial for the correct concurrent
- // execution.
- //
- // The issue edit channel and the timeline channel need to be added to the
- // corresponding maps before the issue is sent in the issue channel.
- // Otherwise, the client could try to retrieve issue edits and timeline itmes
- // before these channels are even created. In this case the client would
- // receive a nil channel.
+ // We need to send an issue-bundle over the issue channel before we start
+ // filling the issue edits and the timeline items to avoid deadlocks.
issueEditChan := make(chan userContentEdit, CHAN_CAPACITY)
- timelineChan := make(chan timelineItem, CHAN_CAPACITY)
- mm.issueEditsMut.Lock()
- mm.issueEdits[node.issue.Id] = issueEditChan
- mm.issueEditsMut.Unlock()
- mm.timelineItemsMut.Lock()
- mm.timelineItems[node.issue.Id] = timelineChan
- mm.timelineItemsMut.Unlock()
+ timelineBundleChan := make(chan timelineBundle, CHAN_CAPACITY)
select {
case <-ctx.Done():
return
- case mm.issues <- node.issue:
+ case mm.issues <- issueBundle{node.issue, issueEditChan, timelineBundleChan}:
}
// We do not know whether the client reads from the issue edit channel
@@ -196,8 +159,8 @@ func (mm *importMediator) fillIssues(ctx context.Context) {
close(issueEditChan)
}(node)
go func(node issueNode) {
- mm.fillTimeline(ctx, &node, timelineChan)
- close(timelineChan)
+ mm.fillTimeline(ctx, &node, timelineBundleChan)
+ close(timelineBundleChan)
}(node)
}
if !issues.PageInfo.HasNextPage {
@@ -231,21 +194,22 @@ func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNo
}
}
-func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan timelineItem) {
+func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan timelineBundle) {
items := &issueNode.TimelineItems
hasItems := true
for hasItems {
for _, item := range items.Nodes {
if item.Typename == "IssueComment" {
- // Here the order of statements is crucial for correct concurrency.
+ // Issue comments are different than other timeline items in that
+ // they may have associated user content edits.
+ //
+ // Send over the timeline-channel before starting to fill the comment
+ // edits.
commentEditChan := make(chan userContentEdit, CHAN_CAPACITY)
- mm.commentEditsMut.Lock()
- mm.commentEdits[item.IssueComment.Id] = commentEditChan
- mm.commentEditsMut.Unlock()
select {
case <-ctx.Done():
return
- case channel <- item:
+ case channel <- timelineBundle{item, commentEditChan}:
}
// We need to create a new goroutine for filling the comment edit
// channel.
@@ -257,7 +221,7 @@ func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode
select {
case <-ctx.Done():
return
- case channel <- item:
+ case channel <- timelineBundle{item, nil}:
}
}
}