diff options
author | Alexander Scharinger <rng.dynamics@gmail.com> | 2021-03-08 07:53:09 +0100 |
---|---|---|
committer | Alexander Scharinger <rng.dynamics@gmail.com> | 2021-03-15 07:15:00 +0100 |
commit | 93b14c509b8260d8238ec1b32394b4a03bcd1349 (patch) | |
tree | d44ed8da71d72ef2f2d085dc0937e84a74495198 /bridge | |
parent | 9a8e487613d99fb102e4619cb30464342b73fee7 (diff) | |
download | git-bug-93b14c509b8260d8238ec1b32394b4a03bcd1349.tar.gz |
Remove maps containing channels.
The old implementation of the github bridge used maps to store several
channels holding data obtained from the Github API. Removing the maps and
simply packing data and channels together in a struct and passing it
through one single channel makes the program simpler in terms of
concurrency and, additionally, enables the garbage collector to free the
memory gradually without any additional provisions.
Diffstat (limited to 'bridge')
-rw-r--r-- | bridge/github/import.go | 35 | ||||
-rw-r--r-- | bridge/github/import_mediator.go | 120 |
2 files changed, 62 insertions, 93 deletions
diff --git a/bridge/github/import.go b/bridge/github/import.go index 09a39586..d492488b 100644 --- a/bridge/github/import.go +++ b/bridge/github/import.go @@ -56,9 +56,12 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, defer close(gi.out) // Loop over all matching issues - for issue := range gi.mediator.Issues() { + for bundle := range gi.mediator.Issues() { + issue := bundle.issue + issueEdits := bundle.issueEdits + timelineBundles := bundle.timelineBundles // create issue - b, err := gi.ensureIssue(ctx, repo, &issue) + b, err := gi.ensureIssue(ctx, repo, &issue, issueEdits) if err != nil { err := fmt.Errorf("issue creation: %v", err) out <- core.NewImportError(err, "") @@ -66,8 +69,10 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, } // loop over timeline items - for item := range gi.mediator.TimelineItems(&issue) { - err := gi.ensureTimelineItem(ctx, repo, b, item) + for bundle := range timelineBundles { + item := bundle.timelineItem + edits := bundle.userContentEdits + err := gi.ensureTimelineItem(ctx, repo, b, &item, edits) if err != nil { err = fmt.Errorf("timeline item creation: %v", err) out <- core.NewImportError(err, "") @@ -93,7 +98,7 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, return out, nil } -func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue) (*cache.BugCache, error) { +func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue, issueEdits <-chan userContentEdit) (*cache.BugCache, error) { author, err := gi.ensurePerson(ctx, repo, issue.Author) if err != nil { return nil, err @@ -110,7 +115,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache // get first issue edit // if it exists, then it holds the bug creation - firstEdit, hasEdit := <-gi.mediator.IssueEdits(issue) + firstEdit, hasEdit := <-issueEdits // At Github there exist issues with seemingly empty titles. An example is // https://github.com/NixOS/nixpkgs/issues/72730 . @@ -157,7 +162,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache return nil, fmt.Errorf("finding or creating issue") } // process remaining issue edits, if they exist - for edit := range gi.mediator.IssueEdits(issue) { + for edit := range issueEdits { // other edits will be added as CommentEdit operations target, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(issue.Id)) if err == cache.ErrNoMatchingOp { @@ -169,7 +174,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache return nil, err } - err = gi.ensureCommentEdit(ctx, repo, b, target, edit) + err = gi.ensureCommentEdit(ctx, repo, b, target, &edit) if err != nil { return nil, err } @@ -177,11 +182,11 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache return b, nil } -func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item timelineItem) error { +func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item *timelineItem, commentEdits <-chan userContentEdit) error { switch item.Typename { case "IssueComment": - err := gi.ensureComment(ctx, repo, b, &item.IssueComment) + err := gi.ensureComment(ctx, repo, b, &item.IssueComment, commentEdits) if err != nil { return fmt.Errorf("timeline comment creation: %v", err) } @@ -340,7 +345,7 @@ func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.Re return nil } -func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment) error { +func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment, commentEdits <-chan userContentEdit) error { author, err := gi.ensurePerson(ctx, repo, comment.Author) if err != nil { return err @@ -351,7 +356,7 @@ func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCac // real error return err } - firstEdit, hasEdit := <-gi.mediator.CommentEdits(comment) + firstEdit, hasEdit := <-commentEdits if err == cache.ErrNoMatchingOp { var textInput string if hasEdit { @@ -388,14 +393,14 @@ func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCac return fmt.Errorf("finding or creating issue comment") } // process remaining comment edits, if they exist - for edit := range gi.mediator.CommentEdits(comment) { + for edit := range commentEdits { // ensure editor identity _, err := gi.ensurePerson(ctx, repo, edit.Editor) if err != nil { return err } - err = gi.ensureCommentEdit(ctx, repo, b, targetOpID, edit) + err = gi.ensureCommentEdit(ctx, repo, b, targetOpID, &edit) if err != nil { return err } @@ -403,7 +408,7 @@ func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCac return nil } -func (gi *githubImporter) ensureCommentEdit(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit userContentEdit) error { +func (gi *githubImporter) ensureCommentEdit(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit *userContentEdit) error { _, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(edit.Id)) if err == nil { return nil diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go index 8bd33adb..02067286 100644 --- a/bridge/github/import_mediator.go +++ b/bridge/github/import_mediator.go @@ -19,9 +19,7 @@ const ( // These values influence how fast the github graphql rate limit is exha CHAN_CAPACITY = 128 ) -type varmap map[string]interface{} - -// importMediator provides an interface to retrieve Github issues. +// importMediator provides a convenient interface to retrieve issues from the Github GraphQL API. type importMediator struct { // Github graphql client gc *githubv4.Client @@ -32,28 +30,30 @@ type importMediator struct { // name of the Github repository project string - // The importMediator will only query issues updated or created after the date given in - // the variable since. + // since specifies which issues to import. Issues that have been updated at or after the + // given date should be imported. since time.Time - // channel for the issues - issues chan issue + // issues is a channel holding bundles of issues to be imported. Each bundle holds the data + // associated with one issue. + issues chan issueBundle - // channel for issue edits - issueEdits map[githubv4.ID]chan userContentEdit - issueEditsMut sync.Mutex + // Sticky error + err error - // channel for timeline items - timelineItems map[githubv4.ID]chan timelineItem - timelineItemsMut sync.Mutex + // errMut is a mutex to synchronize access to the sticky error variable err. + errMut sync.Mutex +} - // channel for comment edits - commentEdits map[githubv4.ID]chan userContentEdit - commentEditsMut sync.Mutex +type issueBundle struct { + issue issue + issueEdits <-chan userContentEdit + timelineBundles <-chan timelineBundle +} - // Sticky error - err error - errMut sync.Mutex +type timelineBundle struct { + timelineItem timelineItem + userContentEdits <-chan userContentEdit } func (mm *importMediator) setError(err error) { @@ -64,18 +64,12 @@ func (mm *importMediator) setError(err error) { func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator { mm := importMediator{ - gc: client, - owner: owner, - project: project, - since: since, - issues: make(chan issue, CHAN_CAPACITY), - issueEdits: make(map[githubv4.ID]chan userContentEdit), - issueEditsMut: sync.Mutex{}, - timelineItems: make(map[githubv4.ID]chan timelineItem), - timelineItemsMut: sync.Mutex{}, - commentEdits: make(map[githubv4.ID]chan userContentEdit), - commentEditsMut: sync.Mutex{}, - err: nil, + gc: client, + owner: owner, + project: project, + since: since, + issues: make(chan issueBundle, CHAN_CAPACITY), + err: nil, } go func() { mm.fillIssues(ctx) @@ -84,6 +78,8 @@ func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, proj return &mm } +type varmap map[string]interface{} + func newIssueVars(owner, project string, since time.Time) varmap { return varmap{ "owner": githubv4.String(owner), @@ -119,31 +115,10 @@ func newCommentEditVars() varmap { } } -func (mm *importMediator) Issues() <-chan issue { +func (mm *importMediator) Issues() <-chan issueBundle { return mm.issues } -func (mm *importMediator) IssueEdits(issue *issue) <-chan userContentEdit { - mm.issueEditsMut.Lock() - channel := mm.issueEdits[issue.Id] - mm.issueEditsMut.Unlock() - return channel -} - -func (mm *importMediator) TimelineItems(issue *issue) <-chan timelineItem { - mm.timelineItemsMut.Lock() - channel := mm.timelineItems[issue.Id] - mm.timelineItemsMut.Unlock() - return channel -} - -func (mm *importMediator) CommentEdits(comment *issueComment) <-chan userContentEdit { - mm.commentEditsMut.Lock() - channel := mm.commentEdits[comment.Id] - mm.commentEditsMut.Unlock() - return channel -} - func (mm *importMediator) Error() error { mm.errMut.Lock() err := mm.err @@ -165,26 +140,14 @@ func (mm *importMediator) fillIssues(ctx context.Context) { issues, hasIssues := mm.queryIssue(ctx, initialCursor) for hasIssues { for _, node := range issues.Nodes { - // The order of statements in this loop is crucial for the correct concurrent - // execution. - // - // The issue edit channel and the timeline channel need to be added to the - // corresponding maps before the issue is sent in the issue channel. - // Otherwise, the client could try to retrieve issue edits and timeline itmes - // before these channels are even created. In this case the client would - // receive a nil channel. + // We need to send an issue-bundle over the issue channel before we start + // filling the issue edits and the timeline items to avoid deadlocks. issueEditChan := make(chan userContentEdit, CHAN_CAPACITY) - timelineChan := make(chan timelineItem, CHAN_CAPACITY) - mm.issueEditsMut.Lock() - mm.issueEdits[node.issue.Id] = issueEditChan - mm.issueEditsMut.Unlock() - mm.timelineItemsMut.Lock() - mm.timelineItems[node.issue.Id] = timelineChan - mm.timelineItemsMut.Unlock() + timelineBundleChan := make(chan timelineBundle, CHAN_CAPACITY) select { case <-ctx.Done(): return - case mm.issues <- node.issue: + case mm.issues <- issueBundle{node.issue, issueEditChan, timelineBundleChan}: } // We do not know whether the client reads from the issue edit channel @@ -196,8 +159,8 @@ func (mm *importMediator) fillIssues(ctx context.Context) { close(issueEditChan) }(node) go func(node issueNode) { - mm.fillTimeline(ctx, &node, timelineChan) - close(timelineChan) + mm.fillTimeline(ctx, &node, timelineBundleChan) + close(timelineBundleChan) }(node) } if !issues.PageInfo.HasNextPage { @@ -231,21 +194,22 @@ func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNo } } -func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan timelineItem) { +func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan timelineBundle) { items := &issueNode.TimelineItems hasItems := true for hasItems { for _, item := range items.Nodes { if item.Typename == "IssueComment" { - // Here the order of statements is crucial for correct concurrency. + // Issue comments are different than other timeline items in that + // they may have associated user content edits. + // + // Send over the timeline-channel before starting to fill the comment + // edits. commentEditChan := make(chan userContentEdit, CHAN_CAPACITY) - mm.commentEditsMut.Lock() - mm.commentEdits[item.IssueComment.Id] = commentEditChan - mm.commentEditsMut.Unlock() select { case <-ctx.Done(): return - case channel <- item: + case channel <- timelineBundle{item, commentEditChan}: } // We need to create a new goroutine for filling the comment edit // channel. @@ -257,7 +221,7 @@ func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode select { case <-ctx.Done(): return - case channel <- item: + case channel <- timelineBundle{item, nil}: } } } |