aboutsummaryrefslogtreecommitdiffstats
path: root/bridge
diff options
context:
space:
mode:
authorAlexander Scharinger <rng.dynamics@gmail.com>2021-03-08 07:53:09 +0100
committerAlexander Scharinger <rng.dynamics@gmail.com>2021-03-15 07:15:00 +0100
commit93b14c509b8260d8238ec1b32394b4a03bcd1349 (patch)
treed44ed8da71d72ef2f2d085dc0937e84a74495198 /bridge
parent9a8e487613d99fb102e4619cb30464342b73fee7 (diff)
downloadgit-bug-93b14c509b8260d8238ec1b32394b4a03bcd1349.tar.gz
Remove maps containing channels.
The old implementation of the github bridge used maps to store several channels holding data obtained from the Github API. Removing the maps and simply packing data and channels together in a struct and passing it through one single channel makes the program simpler in terms of concurrency and, additionally, enables the garbage collector to free the memory gradually without any additional provisions.
Diffstat (limited to 'bridge')
-rw-r--r--bridge/github/import.go35
-rw-r--r--bridge/github/import_mediator.go120
2 files changed, 62 insertions, 93 deletions
diff --git a/bridge/github/import.go b/bridge/github/import.go
index 09a39586..d492488b 100644
--- a/bridge/github/import.go
+++ b/bridge/github/import.go
@@ -56,9 +56,12 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache,
defer close(gi.out)
// Loop over all matching issues
- for issue := range gi.mediator.Issues() {
+ for bundle := range gi.mediator.Issues() {
+ issue := bundle.issue
+ issueEdits := bundle.issueEdits
+ timelineBundles := bundle.timelineBundles
// create issue
- b, err := gi.ensureIssue(ctx, repo, &issue)
+ b, err := gi.ensureIssue(ctx, repo, &issue, issueEdits)
if err != nil {
err := fmt.Errorf("issue creation: %v", err)
out <- core.NewImportError(err, "")
@@ -66,8 +69,10 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache,
}
// loop over timeline items
- for item := range gi.mediator.TimelineItems(&issue) {
- err := gi.ensureTimelineItem(ctx, repo, b, item)
+ for bundle := range timelineBundles {
+ item := bundle.timelineItem
+ edits := bundle.userContentEdits
+ err := gi.ensureTimelineItem(ctx, repo, b, &item, edits)
if err != nil {
err = fmt.Errorf("timeline item creation: %v", err)
out <- core.NewImportError(err, "")
@@ -93,7 +98,7 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache,
return out, nil
}
-func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue) (*cache.BugCache, error) {
+func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue, issueEdits <-chan userContentEdit) (*cache.BugCache, error) {
author, err := gi.ensurePerson(ctx, repo, issue.Author)
if err != nil {
return nil, err
@@ -110,7 +115,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache
// get first issue edit
// if it exists, then it holds the bug creation
- firstEdit, hasEdit := <-gi.mediator.IssueEdits(issue)
+ firstEdit, hasEdit := <-issueEdits
// At Github there exist issues with seemingly empty titles. An example is
// https://github.com/NixOS/nixpkgs/issues/72730 .
@@ -157,7 +162,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache
return nil, fmt.Errorf("finding or creating issue")
}
// process remaining issue edits, if they exist
- for edit := range gi.mediator.IssueEdits(issue) {
+ for edit := range issueEdits {
// other edits will be added as CommentEdit operations
target, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(issue.Id))
if err == cache.ErrNoMatchingOp {
@@ -169,7 +174,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache
return nil, err
}
- err = gi.ensureCommentEdit(ctx, repo, b, target, edit)
+ err = gi.ensureCommentEdit(ctx, repo, b, target, &edit)
if err != nil {
return nil, err
}
@@ -177,11 +182,11 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache
return b, nil
}
-func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item timelineItem) error {
+func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item *timelineItem, commentEdits <-chan userContentEdit) error {
switch item.Typename {
case "IssueComment":
- err := gi.ensureComment(ctx, repo, b, &item.IssueComment)
+ err := gi.ensureComment(ctx, repo, b, &item.IssueComment, commentEdits)
if err != nil {
return fmt.Errorf("timeline comment creation: %v", err)
}
@@ -340,7 +345,7 @@ func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.Re
return nil
}
-func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment) error {
+func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment, commentEdits <-chan userContentEdit) error {
author, err := gi.ensurePerson(ctx, repo, comment.Author)
if err != nil {
return err
@@ -351,7 +356,7 @@ func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCac
// real error
return err
}
- firstEdit, hasEdit := <-gi.mediator.CommentEdits(comment)
+ firstEdit, hasEdit := <-commentEdits
if err == cache.ErrNoMatchingOp {
var textInput string
if hasEdit {
@@ -388,14 +393,14 @@ func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCac
return fmt.Errorf("finding or creating issue comment")
}
// process remaining comment edits, if they exist
- for edit := range gi.mediator.CommentEdits(comment) {
+ for edit := range commentEdits {
// ensure editor identity
_, err := gi.ensurePerson(ctx, repo, edit.Editor)
if err != nil {
return err
}
- err = gi.ensureCommentEdit(ctx, repo, b, targetOpID, edit)
+ err = gi.ensureCommentEdit(ctx, repo, b, targetOpID, &edit)
if err != nil {
return err
}
@@ -403,7 +408,7 @@ func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCac
return nil
}
-func (gi *githubImporter) ensureCommentEdit(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit userContentEdit) error {
+func (gi *githubImporter) ensureCommentEdit(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit *userContentEdit) error {
_, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(edit.Id))
if err == nil {
return nil
diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go
index 8bd33adb..02067286 100644
--- a/bridge/github/import_mediator.go
+++ b/bridge/github/import_mediator.go
@@ -19,9 +19,7 @@ const ( // These values influence how fast the github graphql rate limit is exha
CHAN_CAPACITY = 128
)
-type varmap map[string]interface{}
-
-// importMediator provides an interface to retrieve Github issues.
+// importMediator provides a convenient interface to retrieve issues from the Github GraphQL API.
type importMediator struct {
// Github graphql client
gc *githubv4.Client
@@ -32,28 +30,30 @@ type importMediator struct {
// name of the Github repository
project string
- // The importMediator will only query issues updated or created after the date given in
- // the variable since.
+ // since specifies which issues to import. Issues that have been updated at or after the
+ // given date should be imported.
since time.Time
- // channel for the issues
- issues chan issue
+ // issues is a channel holding bundles of issues to be imported. Each bundle holds the data
+ // associated with one issue.
+ issues chan issueBundle
- // channel for issue edits
- issueEdits map[githubv4.ID]chan userContentEdit
- issueEditsMut sync.Mutex
+ // Sticky error
+ err error
- // channel for timeline items
- timelineItems map[githubv4.ID]chan timelineItem
- timelineItemsMut sync.Mutex
+ // errMut is a mutex to synchronize access to the sticky error variable err.
+ errMut sync.Mutex
+}
- // channel for comment edits
- commentEdits map[githubv4.ID]chan userContentEdit
- commentEditsMut sync.Mutex
+type issueBundle struct {
+ issue issue
+ issueEdits <-chan userContentEdit
+ timelineBundles <-chan timelineBundle
+}
- // Sticky error
- err error
- errMut sync.Mutex
+type timelineBundle struct {
+ timelineItem timelineItem
+ userContentEdits <-chan userContentEdit
}
func (mm *importMediator) setError(err error) {
@@ -64,18 +64,12 @@ func (mm *importMediator) setError(err error) {
func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator {
mm := importMediator{
- gc: client,
- owner: owner,
- project: project,
- since: since,
- issues: make(chan issue, CHAN_CAPACITY),
- issueEdits: make(map[githubv4.ID]chan userContentEdit),
- issueEditsMut: sync.Mutex{},
- timelineItems: make(map[githubv4.ID]chan timelineItem),
- timelineItemsMut: sync.Mutex{},
- commentEdits: make(map[githubv4.ID]chan userContentEdit),
- commentEditsMut: sync.Mutex{},
- err: nil,
+ gc: client,
+ owner: owner,
+ project: project,
+ since: since,
+ issues: make(chan issueBundle, CHAN_CAPACITY),
+ err: nil,
}
go func() {
mm.fillIssues(ctx)
@@ -84,6 +78,8 @@ func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, proj
return &mm
}
+type varmap map[string]interface{}
+
func newIssueVars(owner, project string, since time.Time) varmap {
return varmap{
"owner": githubv4.String(owner),
@@ -119,31 +115,10 @@ func newCommentEditVars() varmap {
}
}
-func (mm *importMediator) Issues() <-chan issue {
+func (mm *importMediator) Issues() <-chan issueBundle {
return mm.issues
}
-func (mm *importMediator) IssueEdits(issue *issue) <-chan userContentEdit {
- mm.issueEditsMut.Lock()
- channel := mm.issueEdits[issue.Id]
- mm.issueEditsMut.Unlock()
- return channel
-}
-
-func (mm *importMediator) TimelineItems(issue *issue) <-chan timelineItem {
- mm.timelineItemsMut.Lock()
- channel := mm.timelineItems[issue.Id]
- mm.timelineItemsMut.Unlock()
- return channel
-}
-
-func (mm *importMediator) CommentEdits(comment *issueComment) <-chan userContentEdit {
- mm.commentEditsMut.Lock()
- channel := mm.commentEdits[comment.Id]
- mm.commentEditsMut.Unlock()
- return channel
-}
-
func (mm *importMediator) Error() error {
mm.errMut.Lock()
err := mm.err
@@ -165,26 +140,14 @@ func (mm *importMediator) fillIssues(ctx context.Context) {
issues, hasIssues := mm.queryIssue(ctx, initialCursor)
for hasIssues {
for _, node := range issues.Nodes {
- // The order of statements in this loop is crucial for the correct concurrent
- // execution.
- //
- // The issue edit channel and the timeline channel need to be added to the
- // corresponding maps before the issue is sent in the issue channel.
- // Otherwise, the client could try to retrieve issue edits and timeline itmes
- // before these channels are even created. In this case the client would
- // receive a nil channel.
+ // We need to send an issue-bundle over the issue channel before we start
+ // filling the issue edits and the timeline items to avoid deadlocks.
issueEditChan := make(chan userContentEdit, CHAN_CAPACITY)
- timelineChan := make(chan timelineItem, CHAN_CAPACITY)
- mm.issueEditsMut.Lock()
- mm.issueEdits[node.issue.Id] = issueEditChan
- mm.issueEditsMut.Unlock()
- mm.timelineItemsMut.Lock()
- mm.timelineItems[node.issue.Id] = timelineChan
- mm.timelineItemsMut.Unlock()
+ timelineBundleChan := make(chan timelineBundle, CHAN_CAPACITY)
select {
case <-ctx.Done():
return
- case mm.issues <- node.issue:
+ case mm.issues <- issueBundle{node.issue, issueEditChan, timelineBundleChan}:
}
// We do not know whether the client reads from the issue edit channel
@@ -196,8 +159,8 @@ func (mm *importMediator) fillIssues(ctx context.Context) {
close(issueEditChan)
}(node)
go func(node issueNode) {
- mm.fillTimeline(ctx, &node, timelineChan)
- close(timelineChan)
+ mm.fillTimeline(ctx, &node, timelineBundleChan)
+ close(timelineBundleChan)
}(node)
}
if !issues.PageInfo.HasNextPage {
@@ -231,21 +194,22 @@ func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNo
}
}
-func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan timelineItem) {
+func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan timelineBundle) {
items := &issueNode.TimelineItems
hasItems := true
for hasItems {
for _, item := range items.Nodes {
if item.Typename == "IssueComment" {
- // Here the order of statements is crucial for correct concurrency.
+ // Issue comments are different than other timeline items in that
+ // they may have associated user content edits.
+ //
+ // Send over the timeline-channel before starting to fill the comment
+ // edits.
commentEditChan := make(chan userContentEdit, CHAN_CAPACITY)
- mm.commentEditsMut.Lock()
- mm.commentEdits[item.IssueComment.Id] = commentEditChan
- mm.commentEditsMut.Unlock()
select {
case <-ctx.Done():
return
- case channel <- item:
+ case channel <- timelineBundle{item, commentEditChan}:
}
// We need to create a new goroutine for filling the comment edit
// channel.
@@ -257,7 +221,7 @@ func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode
select {
case <-ctx.Done():
return
- case channel <- item:
+ case channel <- timelineBundle{item, nil}:
}
}
}