aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Scharinger <rng.dynamics@gmail.com>2021-03-05 20:06:21 +0100
committerAlexander Scharinger <rng.dynamics@gmail.com>2021-03-15 07:14:40 +0100
commit9a8e487613d99fb102e4619cb30464342b73fee7 (patch)
treeae00ae280cb55d537981faca8c6b846a81bb2c30
parent689b640bbbb801772d9c5c4bd428d4ec750f00ce (diff)
downloadgit-bug-9a8e487613d99fb102e4619cb30464342b73fee7.tar.gz
Fix errors: deadlock and empty titles
-rw-r--r--bridge/github/import.go86
-rw-r--r--bridge/github/import_mediator.go370
-rw-r--r--bridge/github/import_query.go10
3 files changed, 263 insertions, 203 deletions
diff --git a/bridge/github/import.go b/bridge/github/import.go
index 2e36f5fe..09a39586 100644
--- a/bridge/github/import.go
+++ b/bridge/github/import.go
@@ -3,7 +3,6 @@ package github
import (
"context"
"fmt"
- "strconv"
"time"
"github.com/shurcooL/githubv4"
@@ -16,6 +15,8 @@ import (
"github.com/MichaelMure/git-bug/util/text"
)
+const EMPTY_TITLE_PLACEHOLDER = "<empty string>"
+
// githubImporter implement the Importer interface
type githubImporter struct {
conf core.Configuration
@@ -25,9 +26,6 @@ type githubImporter struct {
// send only channel
out chan<- core.ImportResult
-
- // closure to get the username from github without any additional parameters
- ghUser func(string) (*user, error)
}
func (gi *githubImporter) Init(_ context.Context, _ *cache.RepoCache, conf core.Configuration) error {
@@ -51,9 +49,6 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache,
}
client := buildClient(creds[0].(*auth.Token))
gi.mediator = NewImportMediator(ctx, client, gi.conf[confKeyOwner], gi.conf[confKeyProject], since)
- gi.ghUser = func(login string) (*user, error) {
- return gi.mediator.User(ctx, login)
- }
out := make(chan core.ImportResult)
gi.out = out
@@ -62,19 +57,17 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache,
// Loop over all matching issues
for issue := range gi.mediator.Issues() {
- // fmt.Println("issue loop")
// create issue
- b, err := gi.ensureIssue(repo, &issue)
+ b, err := gi.ensureIssue(ctx, repo, &issue)
if err != nil {
err := fmt.Errorf("issue creation: %v", err)
out <- core.NewImportError(err, "")
return
}
- // fmt.Println("Just before timeline items loop")
// loop over timeline items
for item := range gi.mediator.TimelineItems(&issue) {
- err := gi.ensureTimelineItem(repo, b, item)
+ err := gi.ensureTimelineItem(ctx, repo, b, item)
if err != nil {
err = fmt.Errorf("timeline item creation: %v", err)
out <- core.NewImportError(err, "")
@@ -100,9 +93,8 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache,
return out, nil
}
-func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue *issue) (*cache.BugCache, error) {
- // fmt.Printf("ensureIssue()\n")
- author, err := gi.ensurePerson(repo, issue.Author)
+func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue) (*cache.BugCache, error) {
+ author, err := gi.ensurePerson(ctx, repo, issue.Author)
if err != nil {
return nil, err
}
@@ -119,15 +111,15 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue *issue) (*cac
// get first issue edit
// if it exists, then it holds the bug creation
firstEdit, hasEdit := <-gi.mediator.IssueEdits(issue)
- // fmt.Printf("hasEdit == %v\n", hasEdit)
- //fmt.Printf("%v\n", firstEdit)
+ // At Github there exist issues with seemingly empty titles. An example is
+ // https://github.com/NixOS/nixpkgs/issues/72730 .
+ // The title provided by the GraphQL API actually consists of a space followed by a
+ // zero width space (U+200B). This title would cause the NewBugRaw() function to
+ // return an error: empty title.
title := string(issue.Title)
- if title == "" {
- fmt.Printf("%v\n", issue)
- fmt.Println("title == \"\" holds")
- title = "#" + strconv.Itoa(int(issue.Number))
- fmt.Println("setting title := ", title)
+ if title == " \u200b" { // U+200B == zero width space
+ title = EMPTY_TITLE_PLACEHOLDER
}
if err == bug.ErrBugNotExist {
@@ -156,7 +148,6 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue *issue) (*cac
metaKeyGithubUrl: issue.Url.String(),
})
if err != nil {
- fmt.Printf("%v\n", issue)
return nil, err
}
// importing a new bug
@@ -178,7 +169,7 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue *issue) (*cac
return nil, err
}
- err = gi.ensureCommentEdit(repo, b, target, edit)
+ err = gi.ensureCommentEdit(ctx, repo, b, target, edit)
if err != nil {
return nil, err
}
@@ -186,11 +177,11 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue *issue) (*cac
return b, nil
}
-func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.BugCache, item timelineItem) error {
+func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item timelineItem) error {
switch item.Typename {
case "IssueComment":
- err := gi.ensureComment(repo, b, &item.IssueComment)
+ err := gi.ensureComment(ctx, repo, b, &item.IssueComment)
if err != nil {
return fmt.Errorf("timeline comment creation: %v", err)
}
@@ -206,7 +197,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
if err != cache.ErrNoMatchingOp {
return err
}
- author, err := gi.ensurePerson(repo, item.LabeledEvent.Actor)
+ author, err := gi.ensurePerson(ctx, repo, item.LabeledEvent.Actor)
if err != nil {
return err
}
@@ -235,7 +226,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
if err != cache.ErrNoMatchingOp {
return err
}
- author, err := gi.ensurePerson(repo, item.UnlabeledEvent.Actor)
+ author, err := gi.ensurePerson(ctx, repo, item.UnlabeledEvent.Actor)
if err != nil {
return err
}
@@ -265,7 +256,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
if err == nil {
return nil
}
- author, err := gi.ensurePerson(repo, item.ClosedEvent.Actor)
+ author, err := gi.ensurePerson(ctx, repo, item.ClosedEvent.Actor)
if err != nil {
return err
}
@@ -291,7 +282,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
if err == nil {
return nil
}
- author, err := gi.ensurePerson(repo, item.ReopenedEvent.Actor)
+ author, err := gi.ensurePerson(ctx, repo, item.ReopenedEvent.Actor)
if err != nil {
return err
}
@@ -317,14 +308,25 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
if err == nil {
return nil
}
- author, err := gi.ensurePerson(repo, item.RenamedTitleEvent.Actor)
+ author, err := gi.ensurePerson(ctx, repo, item.RenamedTitleEvent.Actor)
if err != nil {
return err
}
+
+ // At Github there exist issues with seemingly empty titles. An example is
+ // https://github.com/NixOS/nixpkgs/issues/72730 .
+ // The title provided by the GraphQL API actually consists of a space followed
+ // by a zero width space (U+200B). This title would cause the NewBugRaw()
+ // function to return an error: empty title.
+ title := string(item.RenamedTitleEvent.CurrentTitle)
+ if title == " \u200b" { // U+200B == zero width space
+ title = EMPTY_TITLE_PLACEHOLDER
+ }
+
op, err := b.SetTitleRaw(
author,
item.RenamedTitleEvent.CreatedAt.Unix(),
- string(item.RenamedTitleEvent.CurrentTitle),
+ title,
map[string]string{metaKeyGithubId: id},
)
if err != nil {
@@ -338,8 +340,8 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
return nil
}
-func (gi *githubImporter) ensureComment(repo *cache.RepoCache, b *cache.BugCache, comment *issueComment) error {
- author, err := gi.ensurePerson(repo, comment.Author)
+func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment) error {
+ author, err := gi.ensurePerson(ctx, repo, comment.Author)
if err != nil {
return err
}
@@ -388,12 +390,12 @@ func (gi *githubImporter) ensureComment(repo *cache.RepoCache, b *cache.BugCache
// process remaining comment edits, if they exist
for edit := range gi.mediator.CommentEdits(comment) {
// ensure editor identity
- _, err := gi.ensurePerson(repo, edit.Editor)
+ _, err := gi.ensurePerson(ctx, repo, edit.Editor)
if err != nil {
return err
}
- err = gi.ensureCommentEdit(repo, b, targetOpID, edit)
+ err = gi.ensureCommentEdit(ctx, repo, b, targetOpID, edit)
if err != nil {
return err
}
@@ -401,7 +403,7 @@ func (gi *githubImporter) ensureComment(repo *cache.RepoCache, b *cache.BugCache
return nil
}
-func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit userContentEdit) error {
+func (gi *githubImporter) ensureCommentEdit(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit userContentEdit) error {
_, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(edit.Id))
if err == nil {
return nil
@@ -411,7 +413,7 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC
return err
}
- editor, err := gi.ensurePerson(repo, edit.Editor)
+ editor, err := gi.ensurePerson(ctx, repo, edit.Editor)
if err != nil {
return err
}
@@ -450,11 +452,11 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC
}
// ensurePerson create a bug.Person from the Github data
-func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*cache.IdentityCache, error) {
+func (gi *githubImporter) ensurePerson(ctx context.Context, repo *cache.RepoCache, actor *actor) (*cache.IdentityCache, error) {
// When a user has been deleted, Github return a null actor, while displaying a profile named "ghost"
// in it's UI. So we need a special case to get it.
if actor == nil {
- return gi.getGhost(repo)
+ return gi.getGhost(ctx, repo)
}
// Look first in the cache
@@ -509,7 +511,7 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca
return i, nil
}
-func (gi *githubImporter) getGhost(repo *cache.RepoCache) (*cache.IdentityCache, error) {
+func (gi *githubImporter) getGhost(ctx context.Context, repo *cache.RepoCache) (*cache.IdentityCache, error) {
loginName := "ghost"
// Look first in the cache
i, err := repo.ResolveIdentityImmutableMetadata(metaKeyGithubLogin, loginName)
@@ -519,7 +521,7 @@ func (gi *githubImporter) getGhost(repo *cache.RepoCache) (*cache.IdentityCache,
if entity.IsErrMultipleMatch(err) {
return nil, err
}
- user, err := gi.ghUser(loginName)
+ user, err := gi.mediator.User(ctx, loginName)
userName := ""
if user.Name != nil {
userName = string(*user.Name)
@@ -535,7 +537,7 @@ func (gi *githubImporter) getGhost(repo *cache.RepoCache) (*cache.IdentityCache,
)
}
-// parseId convert the unusable githubv4.ID (an interface{}) into a string
+// parseId converts the unusable githubv4.ID (an interface{}) into a string
func parseId(id githubv4.ID) string {
return fmt.Sprintf("%v", id)
}
diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go
index 428c5d36..8bd33adb 100644
--- a/bridge/github/import_mediator.go
+++ b/bridge/github/import_mediator.go
@@ -3,53 +3,63 @@ package github
import (
"context"
"fmt"
- "runtime"
+ "strings"
"sync"
"time"
"github.com/shurcooL/githubv4"
)
-type varmap map[string]interface{}
-
-func trace() {
- pc := make([]uintptr, 15)
- n := runtime.Callers(2, pc)
- frames := runtime.CallersFrames(pc[:n])
- frame, _ := frames.Next()
- fmt.Printf("%s:%d %s\n", frame.File, frame.Line, frame.Function)
-}
-
-const (
- NUM_ISSUES = 50
- NUM_ISSUE_EDITS = 99
- NUM_TIMELINE_ITEMS = 99
- NUM_COMMENT_EDITS = 99
+const ( // These values influence how fast the github graphql rate limit is exhausted.
+ NUM_ISSUES = 40
+ NUM_ISSUE_EDITS = 100
+ NUM_TIMELINE_ITEMS = 100
+ NUM_COMMENT_EDITS = 100
CHAN_CAPACITY = 128
)
-// TODO: remove all debug output and trace() in all files. Use ag
+type varmap map[string]interface{}
+// importMediator provides an interface to retrieve Github issues.
type importMediator struct {
// Github graphql client
- gc *githubv4.Client
- owner string
+ gc *githubv4.Client
+
+ // name of the repository owner on Github
+ owner string
+
+ // name of the Github repository
project string
- // The iterator will only query issues updated or created after the date given in
+
+ // The importMediator will only query issues updated or created after the date given in
// the variable since.
since time.Time
- issues chan issue
- issueEditsMut sync.Mutex
- timelineItemsMut sync.Mutex
- commentEditsMut sync.Mutex
- issueEdits map[githubv4.ID]chan userContentEdit
+ // channel for the issues
+ issues chan issue
+
+ // channel for issue edits
+ issueEdits map[githubv4.ID]chan userContentEdit
+ issueEditsMut sync.Mutex
+
+ // channel for timeline items
timelineItems map[githubv4.ID]chan timelineItem
- commentEdits map[githubv4.ID]chan userContentEdit
+ timelineItemsMut sync.Mutex
+
+ // channel for comment edits
+ commentEdits map[githubv4.ID]chan userContentEdit
+ commentEditsMut sync.Mutex
// Sticky error
- err error
+ err error
+ errMut sync.Mutex
+}
+
+func (mm *importMediator) setError(err error) {
+ mm.errMut.Lock()
+ mm.err = err
+ mm.errMut.Unlock()
}
func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator {
@@ -59,21 +69,56 @@ func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, proj
project: project,
since: since,
issues: make(chan issue, CHAN_CAPACITY),
- issueEditsMut: sync.Mutex{},
- timelineItemsMut: sync.Mutex{},
- commentEditsMut: sync.Mutex{},
issueEdits: make(map[githubv4.ID]chan userContentEdit),
+ issueEditsMut: sync.Mutex{},
timelineItems: make(map[githubv4.ID]chan timelineItem),
+ timelineItemsMut: sync.Mutex{},
commentEdits: make(map[githubv4.ID]chan userContentEdit),
+ commentEditsMut: sync.Mutex{},
err: nil,
}
go func() {
- defer close(mm.issues)
- mm.fillChannels(ctx)
+ mm.fillIssues(ctx)
+ close(mm.issues)
}()
return &mm
}
+func newIssueVars(owner, project string, since time.Time) varmap {
+ return varmap{
+ "owner": githubv4.String(owner),
+ "name": githubv4.String(project),
+ "issueSince": githubv4.DateTime{Time: since},
+ "issueFirst": githubv4.Int(NUM_ISSUES),
+ "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS),
+ "issueEditBefore": (*githubv4.String)(nil),
+ "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS),
+ "timelineAfter": (*githubv4.String)(nil),
+ "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS),
+ "commentEditBefore": (*githubv4.String)(nil),
+ }
+}
+
+func newIssueEditVars() varmap {
+ return varmap{
+ "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS),
+ }
+}
+
+func newTimelineVars() varmap {
+ return varmap{
+ "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS),
+ "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS),
+ "commentEditBefore": (*githubv4.String)(nil),
+ }
+}
+
+func newCommentEditVars() varmap {
+ return varmap{
+ "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS),
+ }
+}
+
func (mm *importMediator) Issues() <-chan issue {
return mm.issues
}
@@ -100,64 +145,85 @@ func (mm *importMediator) CommentEdits(comment *issueComment) <-chan userContent
}
func (mm *importMediator) Error() error {
- return mm.err
+ mm.errMut.Lock()
+ err := mm.err
+ mm.errMut.Unlock()
+ return err
}
func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) {
query := userQuery{}
vars := varmap{"login": githubv4.String(loginName)}
- c, cancel := context.WithTimeout(ctx, defaultTimeout)
- defer cancel()
- if err := mm.mQuery(c, &query, vars); err != nil {
+ if err := mm.mQuery(ctx, &query, vars); err != nil {
return nil, err
}
return &query.User, nil
}
-func (mm *importMediator) fillChannels(ctx context.Context) {
- issueCursor := githubv4.String("")
- for {
- issues, hasIssues := mm.queryIssue(ctx, issueCursor)
- if !hasIssues {
- break
+func (mm *importMediator) fillIssues(ctx context.Context) {
+ initialCursor := githubv4.String("")
+ issues, hasIssues := mm.queryIssue(ctx, initialCursor)
+ for hasIssues {
+ for _, node := range issues.Nodes {
+ // The order of statements in this loop is crucial for the correct concurrent
+ // execution.
+ //
+ // The issue edit channel and the timeline channel need to be added to the
+ // corresponding maps before the issue is sent in the issue channel.
+ // Otherwise, the client could try to retrieve issue edits and timeline itmes
+ // before these channels are even created. In this case the client would
+ // receive a nil channel.
+ issueEditChan := make(chan userContentEdit, CHAN_CAPACITY)
+ timelineChan := make(chan timelineItem, CHAN_CAPACITY)
+ mm.issueEditsMut.Lock()
+ mm.issueEdits[node.issue.Id] = issueEditChan
+ mm.issueEditsMut.Unlock()
+ mm.timelineItemsMut.Lock()
+ mm.timelineItems[node.issue.Id] = timelineChan
+ mm.timelineItemsMut.Unlock()
+ select {
+ case <-ctx.Done():
+ return
+ case mm.issues <- node.issue:
+ }
+
+ // We do not know whether the client reads from the issue edit channel
+ // or the timeline channel first. Since the capacity of any channel is limited
+ // any send operation may block. Hence, in order to avoid deadlocks we need
+ // to send over both these channels concurrently.
+ go func(node issueNode) {
+ mm.fillIssueEdits(ctx, &node, issueEditChan)
+ close(issueEditChan)
+ }(node)
+ go func(node issueNode) {
+ mm.fillTimeline(ctx, &node, timelineChan)
+ close(timelineChan)
+ }(node)
}
- issueCursor = issues.PageInfo.EndCursor
- for _, issueNode := range issues.Nodes {
- // fmt.Printf(">>> issue: %v\n", issueNode.issue.Title)
- mm.fillChannelIssueEdits(ctx, &issueNode)
- mm.fillChannelTimeline(ctx, &issueNode)
- // To avoid race conditions add the issue only after all its edits,
- // timeline times, etc. are added to their respective channels.
- mm.issues <- issueNode.issue
+ if !issues.PageInfo.HasNextPage {
+ break
}
+ issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor)
}
}
-func (mm *importMediator) fillChannelIssueEdits(ctx context.Context, issueNode *issueNode) {
- // fmt.Printf("fillChannelIssueEdit() issue id == %v\n", issueNode.issue.Id)
- // fmt.Printf("%v\n", issueNode)
- channel := make(chan userContentEdit, CHAN_CAPACITY)
- defer close(channel)
- mm.issueEditsMut.Lock()
- mm.issueEdits[issueNode.issue.Id] = channel
- mm.issueEditsMut.Unlock()
+func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNode, channel chan userContentEdit) {
edits := &issueNode.UserContentEdits
hasEdits := true
for hasEdits {
- // fmt.Println("before the reversed loop")
for edit := range reverse(edits.Nodes) {
- // fmt.Println("in the reversed loop")
if edit.Diff == nil || string(*edit.Diff) == "" {
- // issueEdit.Diff == nil happen if the event is older than
- // early 2018, Github doesn't have the data before that.
- // Best we can do is to ignore the event.
+ // issueEdit.Diff == nil happen if the event is older than early
+ // 2018, Github doesn't have the data before that. Best we can do is
+ // to ignore the event.
continue
}
- // fmt.Printf("about to push issue edit\n")
- channel <- edit
+ select {
+ case <-ctx.Done():
+ return
+ case channel <- edit:
+ }
}
- // fmt.Printf("has next ? %v\n", edits.PageInfo.HasNextPage)
- // fmt.Printf("has previous ? %v\n", edits.PageInfo.HasPreviousPage)
if !edits.PageInfo.HasPreviousPage {
break
}
@@ -165,51 +231,64 @@ func (mm *importMediator) fillChannelIssueEdits(ctx context.Context, issueNode *
}
}
-func (mm *importMediator) fillChannelTimeline(ctx context.Context, issueNode *issueNode) {
- // fmt.Printf("fullChannelTimeline()\n")
- channel := make(chan timelineItem, CHAN_CAPACITY)
- defer close(channel)
- mm.timelineItemsMut.Lock()
- mm.timelineItems[issueNode.issue.Id] = channel
- mm.timelineItemsMut.Unlock()
+func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan timelineItem) {
items := &issueNode.TimelineItems
hasItems := true
for hasItems {
for _, item := range items.Nodes {
- channel <- item
- mm.fillChannelCommentEdits(ctx, &item)
+ if item.Typename == "IssueComment" {
+ // Here the order of statements is crucial for correct concurrency.
+ commentEditChan := make(chan userContentEdit, CHAN_CAPACITY)
+ mm.commentEditsMut.Lock()
+ mm.commentEdits[item.IssueComment.Id] = commentEditChan
+ mm.commentEditsMut.Unlock()
+ select {
+ case <-ctx.Done():
+ return
+ case channel <- item:
+ }
+ // We need to create a new goroutine for filling the comment edit
+ // channel.
+ go func(item timelineItem) {
+ mm.fillCommentEdits(ctx, &item, commentEditChan)
+ close(commentEditChan)
+ }(item)
+ } else {
+ select {
+ case <-ctx.Done():
+ return
+ case channel <- item:
+ }
+ }
}
- // fmt.Printf("has next ? %v\n", items.PageInfo.HasNextPage)
- // fmt.Printf("has previous ? %v\n", items.PageInfo.HasPreviousPage)
if !items.PageInfo.HasNextPage {
break
}
- items, hasItems = mm.queryTimelineItems(ctx, issueNode.issue.Id, items.PageInfo.EndCursor)
+ items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor)
}
}
-func (mm *importMediator) fillChannelCommentEdits(ctx context.Context, item *timelineItem) {
- // This concerns only timeline items of type comment
+func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem, channel chan userContentEdit) {
+ // Here we are only concerned with timeline items of type issueComment.
if item.Typename != "IssueComment" {
return
}
comment := &item.IssueComment
- channel := make(chan userContentEdit, CHAN_CAPACITY)
- defer close(channel)
- mm.commentEditsMut.Lock()
- mm.commentEdits[comment.Id] = channel
- mm.commentEditsMut.Unlock()
edits := &comment.UserContentEdits
hasEdits := true
for hasEdits {
for edit := range reverse(edits.Nodes) {
if edit.Diff == nil || string(*edit.Diff) == "" {
- // issueEdit.Diff == nil happen if the event is older than
- // early 2018, Github doesn't have the data before that.
- // Best we can do is to ignore the event.
+ // issueEdit.Diff == nil happen if the event is older than early
+ // 2018, Github doesn't have the data before that. Best we can do is
+ // to ignore the event.
continue
}
- channel <- edit
+ select {
+ case <-ctx.Done():
+ return
+ case channel <- edit:
+ }
}
if !edits.PageInfo.HasPreviousPage {
break
@@ -219,21 +298,16 @@ func (mm *importMediator) fillChannelCommentEdits(ctx context.Context, item *tim
}
func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
- // trace()
- vars := varmap{
- "gqlNodeId": nid,
- "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS),
- }
+ vars := newCommentEditVars()
+ vars["gqlNodeId"] = nid
if cursor == "" {
vars["commentEditBefore"] = (*githubv4.String)(nil)
} else {
vars["commentEditBefore"] = cursor
}
- c, cancel := context.WithTimeout(ctx, defaultTimeout)
- defer cancel()
query := commentEditQuery{}
- if err := mm.mQuery(c, &query, vars); err != nil {
- mm.err = err
+ if err := mm.mQuery(ctx, &query, vars); err != nil {
+ mm.setError(err)
return nil, false
}
connection := &query.Node.IssueComment.UserContentEdits
@@ -243,24 +317,17 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID
return connection, true
}
-func (mm *importMediator) queryTimelineItems(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) {
- // trace()
- vars := varmap{
- "gqlNodeId": nid,
- "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS),
- "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS),
- "commentEditBefore": (*githubv4.String)(nil),
- }
+func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) {
+ vars := newTimelineVars()
+ vars["gqlNodeId"] = nid
if cursor == "" {
vars["timelineAfter"] = (*githubv4.String)(nil)
} else {
vars["timelineAfter"] = cursor
}
- c, cancel := context.WithTimeout(ctx, defaultTimeout)
- defer cancel()
query := timelineQuery{}
- if err := mm.mQuery(c, &query, vars); err != nil {
- mm.err = err
+ if err := mm.mQuery(ctx, &query, vars); err != nil {
+ mm.setError(err)
return nil, false
}
connection := &query.Node.Issue.TimelineItems
@@ -271,21 +338,16 @@ func (mm *importMediator) queryTimelineItems(ctx context.Context, nid githubv4.I
}
func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
- // trace()
- vars := varmap{
- "gqlNodeId": nid,
- "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS),
- }
+ vars := newIssueEditVars()
+ vars["gqlNodeId"] = nid
if cursor == "" {
vars["issueEditBefore"] = (*githubv4.String)(nil)
} else {
vars["issueEditBefore"] = cursor
}
- c, cancel := context.WithTimeout(ctx, defaultTimeout)
- defer cancel()
query := issueEditQuery{}
- if err := mm.mQuery(c, &query, vars); err != nil {
- mm.err = err
+ if err := mm.mQuery(ctx, &query, vars); err != nil {
+ mm.setError(err)
return nil, false
}
connection := &query.Node.Issue.UserContentEdits
@@ -296,29 +358,15 @@ func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID,
}
func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) {
- // trace()
- vars := varmap{
- "owner": githubv4.String(mm.owner),
- "name": githubv4.String(mm.project),
- "issueSince": githubv4.DateTime{Time: mm.since},
- "issueFirst": githubv4.Int(NUM_ISSUES),
- "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS),
- "issueEditBefore": (*githubv4.String)(nil),
- "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS),
- "timelineAfter": (*githubv4.String)(nil),
- "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS),
- "commentEditBefore": (*githubv4.String)(nil),
- }
+ vars := newIssueVars(mm.owner, mm.project, mm.since)
if cursor == "" {
vars["issueAfter"] = (*githubv4.String)(nil)
} else {
vars["issueAfter"] = githubv4.String(cursor)
}
- c, cancel := context.WithTimeout(ctx, defaultTimeout)
- defer cancel()
query := issueQuery{}
- if err := mm.mQuery(c, &query, vars); err != nil {
- mm.err = err
+ if err := mm.mQuery(ctx, &query, vars); err != nil {
+ mm.setError(err)
return nil, false
}
connection := &query.Repository.Issues
@@ -343,30 +391,42 @@ type rateLimiter interface {
rateLimit() rateLimit
}
-// TODO: move that into its own file
-//
-// mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL
-// query and it is used to populate the response into it. It should be a pointer to a struct
-// that corresponds to the Github graphql schema and it should implement the rateLimiter
-// interface. This function queries Github for the remaining rate limit points before
-// executing the actual query. The function waits, if there are not enough rate limiting
-// points left.
+// mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL query
+// and it is used to populate the response into it. It should be a pointer to a struct that
+// corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If
+// there is a Github rate limiting error, then the function sleeps and retries after the rate limit
+// is expired.
func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
- // First: check the cost of the query and wait if necessary
- vars["dryRun"] = githubv4.Boolean(true)
+ // first: just send the query to the graphql api
+ vars["dryRun"] = githubv4.Boolean(false)
qctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
+ err := mm.gc.Query(qctx, query, vars)
+ if err == nil {
+ // no error: done
+ return nil
+ }
+ // matching the error string
+ if !strings.Contains(err.Error(), "API rate limit exceeded") {
+ // an error, but not the API rate limit error: done
+ return err
+ }
+ // a rate limit error
+ // ask the graphql api for rate limiting information
+ vars["dryRun"] = githubv4.Boolean(true)
+ qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
+ defer cancel()
if err := mm.gc.Query(qctx, query, vars); err != nil {
return err
}
- fmt.Printf("%v\n", query)
rateLimit := query.rateLimit()
if rateLimit.Cost > rateLimit.Remaining {
+ // sleep
resetTime := rateLimit.ResetAt.Time
- fmt.Println("Github rate limit exhausted")
- fmt.Printf("Sleeping until %s\n", resetTime.String())
// Add a few seconds (8) for good measure
- timer := time.NewTimer(time.Until(resetTime.Add(8 * time.Second)))
+ resetTime = resetTime.Add(8 * time.Second)
+ fmt.Printf("Github rate limit exhausted. Sleeping until %s\n", resetTime.String())
+ timer := time.NewTimer(time.Until(resetTime))
select {
case <-ctx.Done():
stop(timer)
@@ -374,14 +434,12 @@ func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars ma
case <-timer.C:
}
}
- // Second: Do the actual query
+ // run the original query again
vars["dryRun"] = githubv4.Boolean(false)
qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
defer cancel()
- if err := mm.gc.Query(qctx, query, vars); err != nil {
- return err
- }
- return nil
+ err = mm.gc.Query(qctx, query, vars)
+ return err // might be nil
}
func stop(t *time.Timer) {
diff --git a/bridge/github/import_query.go b/bridge/github/import_query.go
index 77c95e1d..c4ab2aa9 100644
--- a/bridge/github/import_query.go
+++ b/bridge/github/import_query.go
@@ -7,7 +7,7 @@ type userQuery struct {
User user `graphql:"user(login: $login)"`
}
-func (q userQuery) rateLimit() rateLimit {
+func (q *userQuery) rateLimit() rateLimit {
return q.RateLimit
}
@@ -40,7 +40,7 @@ type issueQuery struct {
} `graphql:"repository(owner: $owner, name: $name)"`
}
-func (q issueQuery) rateLimit() rateLimit {
+func (q *issueQuery) rateLimit() rateLimit {
return q.RateLimit
}
@@ -54,7 +54,7 @@ type issueEditQuery struct {
} `graphql:"node(id: $gqlNodeId)"`
}
-func (q issueEditQuery) rateLimit() rateLimit {
+func (q *issueEditQuery) rateLimit() rateLimit {
return q.RateLimit
}
@@ -68,7 +68,7 @@ type timelineQuery struct {
} `graphql:"node(id: $gqlNodeId)"`
}
-func (q timelineQuery) rateLimit() rateLimit {
+func (q *timelineQuery) rateLimit() rateLimit {
return q.RateLimit
}
@@ -82,7 +82,7 @@ type commentEditQuery struct {
} `graphql:"node(id: $gqlNodeId)"`
}
-func (q commentEditQuery) rateLimit() rateLimit {
+func (q *commentEditQuery) rateLimit() rateLimit {
return q.RateLimit
}