From 9a8e487613d99fb102e4619cb30464342b73fee7 Mon Sep 17 00:00:00 2001 From: Alexander Scharinger Date: Fri, 5 Mar 2021 20:06:21 +0100 Subject: Fix errors: deadlock and empty titles --- bridge/github/import.go | 86 ++++----- bridge/github/import_mediator.go | 370 ++++++++++++++++++++++----------------- bridge/github/import_query.go | 10 +- 3 files changed, 263 insertions(+), 203 deletions(-) diff --git a/bridge/github/import.go b/bridge/github/import.go index 2e36f5fe..09a39586 100644 --- a/bridge/github/import.go +++ b/bridge/github/import.go @@ -3,7 +3,6 @@ package github import ( "context" "fmt" - "strconv" "time" "github.com/shurcooL/githubv4" @@ -16,6 +15,8 @@ import ( "github.com/MichaelMure/git-bug/util/text" ) +const EMPTY_TITLE_PLACEHOLDER = "" + // githubImporter implement the Importer interface type githubImporter struct { conf core.Configuration @@ -25,9 +26,6 @@ type githubImporter struct { // send only channel out chan<- core.ImportResult - - // closure to get the username from github without any additional parameters - ghUser func(string) (*user, error) } func (gi *githubImporter) Init(_ context.Context, _ *cache.RepoCache, conf core.Configuration) error { @@ -51,9 +49,6 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, } client := buildClient(creds[0].(*auth.Token)) gi.mediator = NewImportMediator(ctx, client, gi.conf[confKeyOwner], gi.conf[confKeyProject], since) - gi.ghUser = func(login string) (*user, error) { - return gi.mediator.User(ctx, login) - } out := make(chan core.ImportResult) gi.out = out @@ -62,19 +57,17 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, // Loop over all matching issues for issue := range gi.mediator.Issues() { - // fmt.Println("issue loop") // create issue - b, err := gi.ensureIssue(repo, &issue) + b, err := gi.ensureIssue(ctx, repo, &issue) if err != nil { err := fmt.Errorf("issue creation: %v", err) out <- core.NewImportError(err, "") return } - // fmt.Println("Just before timeline items loop") // loop over timeline items for item := range gi.mediator.TimelineItems(&issue) { - err := gi.ensureTimelineItem(repo, b, item) + err := gi.ensureTimelineItem(ctx, repo, b, item) if err != nil { err = fmt.Errorf("timeline item creation: %v", err) out <- core.NewImportError(err, "") @@ -100,9 +93,8 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, return out, nil } -func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue *issue) (*cache.BugCache, error) { - // fmt.Printf("ensureIssue()\n") - author, err := gi.ensurePerson(repo, issue.Author) +func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue) (*cache.BugCache, error) { + author, err := gi.ensurePerson(ctx, repo, issue.Author) if err != nil { return nil, err } @@ -119,15 +111,15 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue *issue) (*cac // get first issue edit // if it exists, then it holds the bug creation firstEdit, hasEdit := <-gi.mediator.IssueEdits(issue) - // fmt.Printf("hasEdit == %v\n", hasEdit) - //fmt.Printf("%v\n", firstEdit) + // At Github there exist issues with seemingly empty titles. An example is + // https://github.com/NixOS/nixpkgs/issues/72730 . + // The title provided by the GraphQL API actually consists of a space followed by a + // zero width space (U+200B). This title would cause the NewBugRaw() function to + // return an error: empty title. title := string(issue.Title) - if title == "" { - fmt.Printf("%v\n", issue) - fmt.Println("title == \"\" holds") - title = "#" + strconv.Itoa(int(issue.Number)) - fmt.Println("setting title := ", title) + if title == " \u200b" { // U+200B == zero width space + title = EMPTY_TITLE_PLACEHOLDER } if err == bug.ErrBugNotExist { @@ -156,7 +148,6 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue *issue) (*cac metaKeyGithubUrl: issue.Url.String(), }) if err != nil { - fmt.Printf("%v\n", issue) return nil, err } // importing a new bug @@ -178,7 +169,7 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue *issue) (*cac return nil, err } - err = gi.ensureCommentEdit(repo, b, target, edit) + err = gi.ensureCommentEdit(ctx, repo, b, target, edit) if err != nil { return nil, err } @@ -186,11 +177,11 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue *issue) (*cac return b, nil } -func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.BugCache, item timelineItem) error { +func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item timelineItem) error { switch item.Typename { case "IssueComment": - err := gi.ensureComment(repo, b, &item.IssueComment) + err := gi.ensureComment(ctx, repo, b, &item.IssueComment) if err != nil { return fmt.Errorf("timeline comment creation: %v", err) } @@ -206,7 +197,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug if err != cache.ErrNoMatchingOp { return err } - author, err := gi.ensurePerson(repo, item.LabeledEvent.Actor) + author, err := gi.ensurePerson(ctx, repo, item.LabeledEvent.Actor) if err != nil { return err } @@ -235,7 +226,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug if err != cache.ErrNoMatchingOp { return err } - author, err := gi.ensurePerson(repo, item.UnlabeledEvent.Actor) + author, err := gi.ensurePerson(ctx, repo, item.UnlabeledEvent.Actor) if err != nil { return err } @@ -265,7 +256,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug if err == nil { return nil } - author, err := gi.ensurePerson(repo, item.ClosedEvent.Actor) + author, err := gi.ensurePerson(ctx, repo, item.ClosedEvent.Actor) if err != nil { return err } @@ -291,7 +282,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug if err == nil { return nil } - author, err := gi.ensurePerson(repo, item.ReopenedEvent.Actor) + author, err := gi.ensurePerson(ctx, repo, item.ReopenedEvent.Actor) if err != nil { return err } @@ -317,14 +308,25 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug if err == nil { return nil } - author, err := gi.ensurePerson(repo, item.RenamedTitleEvent.Actor) + author, err := gi.ensurePerson(ctx, repo, item.RenamedTitleEvent.Actor) if err != nil { return err } + + // At Github there exist issues with seemingly empty titles. An example is + // https://github.com/NixOS/nixpkgs/issues/72730 . + // The title provided by the GraphQL API actually consists of a space followed + // by a zero width space (U+200B). This title would cause the NewBugRaw() + // function to return an error: empty title. + title := string(item.RenamedTitleEvent.CurrentTitle) + if title == " \u200b" { // U+200B == zero width space + title = EMPTY_TITLE_PLACEHOLDER + } + op, err := b.SetTitleRaw( author, item.RenamedTitleEvent.CreatedAt.Unix(), - string(item.RenamedTitleEvent.CurrentTitle), + title, map[string]string{metaKeyGithubId: id}, ) if err != nil { @@ -338,8 +340,8 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug return nil } -func (gi *githubImporter) ensureComment(repo *cache.RepoCache, b *cache.BugCache, comment *issueComment) error { - author, err := gi.ensurePerson(repo, comment.Author) +func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment) error { + author, err := gi.ensurePerson(ctx, repo, comment.Author) if err != nil { return err } @@ -388,12 +390,12 @@ func (gi *githubImporter) ensureComment(repo *cache.RepoCache, b *cache.BugCache // process remaining comment edits, if they exist for edit := range gi.mediator.CommentEdits(comment) { // ensure editor identity - _, err := gi.ensurePerson(repo, edit.Editor) + _, err := gi.ensurePerson(ctx, repo, edit.Editor) if err != nil { return err } - err = gi.ensureCommentEdit(repo, b, targetOpID, edit) + err = gi.ensureCommentEdit(ctx, repo, b, targetOpID, edit) if err != nil { return err } @@ -401,7 +403,7 @@ func (gi *githubImporter) ensureComment(repo *cache.RepoCache, b *cache.BugCache return nil } -func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit userContentEdit) error { +func (gi *githubImporter) ensureCommentEdit(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit userContentEdit) error { _, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(edit.Id)) if err == nil { return nil @@ -411,7 +413,7 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC return err } - editor, err := gi.ensurePerson(repo, edit.Editor) + editor, err := gi.ensurePerson(ctx, repo, edit.Editor) if err != nil { return err } @@ -450,11 +452,11 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC } // ensurePerson create a bug.Person from the Github data -func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*cache.IdentityCache, error) { +func (gi *githubImporter) ensurePerson(ctx context.Context, repo *cache.RepoCache, actor *actor) (*cache.IdentityCache, error) { // When a user has been deleted, Github return a null actor, while displaying a profile named "ghost" // in it's UI. So we need a special case to get it. if actor == nil { - return gi.getGhost(repo) + return gi.getGhost(ctx, repo) } // Look first in the cache @@ -509,7 +511,7 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca return i, nil } -func (gi *githubImporter) getGhost(repo *cache.RepoCache) (*cache.IdentityCache, error) { +func (gi *githubImporter) getGhost(ctx context.Context, repo *cache.RepoCache) (*cache.IdentityCache, error) { loginName := "ghost" // Look first in the cache i, err := repo.ResolveIdentityImmutableMetadata(metaKeyGithubLogin, loginName) @@ -519,7 +521,7 @@ func (gi *githubImporter) getGhost(repo *cache.RepoCache) (*cache.IdentityCache, if entity.IsErrMultipleMatch(err) { return nil, err } - user, err := gi.ghUser(loginName) + user, err := gi.mediator.User(ctx, loginName) userName := "" if user.Name != nil { userName = string(*user.Name) @@ -535,7 +537,7 @@ func (gi *githubImporter) getGhost(repo *cache.RepoCache) (*cache.IdentityCache, ) } -// parseId convert the unusable githubv4.ID (an interface{}) into a string +// parseId converts the unusable githubv4.ID (an interface{}) into a string func parseId(id githubv4.ID) string { return fmt.Sprintf("%v", id) } diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go index 428c5d36..8bd33adb 100644 --- a/bridge/github/import_mediator.go +++ b/bridge/github/import_mediator.go @@ -3,53 +3,63 @@ package github import ( "context" "fmt" - "runtime" + "strings" "sync" "time" "github.com/shurcooL/githubv4" ) -type varmap map[string]interface{} - -func trace() { - pc := make([]uintptr, 15) - n := runtime.Callers(2, pc) - frames := runtime.CallersFrames(pc[:n]) - frame, _ := frames.Next() - fmt.Printf("%s:%d %s\n", frame.File, frame.Line, frame.Function) -} - -const ( - NUM_ISSUES = 50 - NUM_ISSUE_EDITS = 99 - NUM_TIMELINE_ITEMS = 99 - NUM_COMMENT_EDITS = 99 +const ( // These values influence how fast the github graphql rate limit is exhausted. + NUM_ISSUES = 40 + NUM_ISSUE_EDITS = 100 + NUM_TIMELINE_ITEMS = 100 + NUM_COMMENT_EDITS = 100 CHAN_CAPACITY = 128 ) -// TODO: remove all debug output and trace() in all files. Use ag +type varmap map[string]interface{} +// importMediator provides an interface to retrieve Github issues. type importMediator struct { // Github graphql client - gc *githubv4.Client - owner string + gc *githubv4.Client + + // name of the repository owner on Github + owner string + + // name of the Github repository project string - // The iterator will only query issues updated or created after the date given in + + // The importMediator will only query issues updated or created after the date given in // the variable since. since time.Time - issues chan issue - issueEditsMut sync.Mutex - timelineItemsMut sync.Mutex - commentEditsMut sync.Mutex - issueEdits map[githubv4.ID]chan userContentEdit + // channel for the issues + issues chan issue + + // channel for issue edits + issueEdits map[githubv4.ID]chan userContentEdit + issueEditsMut sync.Mutex + + // channel for timeline items timelineItems map[githubv4.ID]chan timelineItem - commentEdits map[githubv4.ID]chan userContentEdit + timelineItemsMut sync.Mutex + + // channel for comment edits + commentEdits map[githubv4.ID]chan userContentEdit + commentEditsMut sync.Mutex // Sticky error - err error + err error + errMut sync.Mutex +} + +func (mm *importMediator) setError(err error) { + mm.errMut.Lock() + mm.err = err + mm.errMut.Unlock() } func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator { @@ -59,21 +69,56 @@ func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, proj project: project, since: since, issues: make(chan issue, CHAN_CAPACITY), - issueEditsMut: sync.Mutex{}, - timelineItemsMut: sync.Mutex{}, - commentEditsMut: sync.Mutex{}, issueEdits: make(map[githubv4.ID]chan userContentEdit), + issueEditsMut: sync.Mutex{}, timelineItems: make(map[githubv4.ID]chan timelineItem), + timelineItemsMut: sync.Mutex{}, commentEdits: make(map[githubv4.ID]chan userContentEdit), + commentEditsMut: sync.Mutex{}, err: nil, } go func() { - defer close(mm.issues) - mm.fillChannels(ctx) + mm.fillIssues(ctx) + close(mm.issues) }() return &mm } +func newIssueVars(owner, project string, since time.Time) varmap { + return varmap{ + "owner": githubv4.String(owner), + "name": githubv4.String(project), + "issueSince": githubv4.DateTime{Time: since}, + "issueFirst": githubv4.Int(NUM_ISSUES), + "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS), + "issueEditBefore": (*githubv4.String)(nil), + "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS), + "timelineAfter": (*githubv4.String)(nil), + "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), + "commentEditBefore": (*githubv4.String)(nil), + } +} + +func newIssueEditVars() varmap { + return varmap{ + "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS), + } +} + +func newTimelineVars() varmap { + return varmap{ + "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS), + "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), + "commentEditBefore": (*githubv4.String)(nil), + } +} + +func newCommentEditVars() varmap { + return varmap{ + "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), + } +} + func (mm *importMediator) Issues() <-chan issue { return mm.issues } @@ -100,64 +145,85 @@ func (mm *importMediator) CommentEdits(comment *issueComment) <-chan userContent } func (mm *importMediator) Error() error { - return mm.err + mm.errMut.Lock() + err := mm.err + mm.errMut.Unlock() + return err } func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) { query := userQuery{} vars := varmap{"login": githubv4.String(loginName)} - c, cancel := context.WithTimeout(ctx, defaultTimeout) - defer cancel() - if err := mm.mQuery(c, &query, vars); err != nil { + if err := mm.mQuery(ctx, &query, vars); err != nil { return nil, err } return &query.User, nil } -func (mm *importMediator) fillChannels(ctx context.Context) { - issueCursor := githubv4.String("") - for { - issues, hasIssues := mm.queryIssue(ctx, issueCursor) - if !hasIssues { - break +func (mm *importMediator) fillIssues(ctx context.Context) { + initialCursor := githubv4.String("") + issues, hasIssues := mm.queryIssue(ctx, initialCursor) + for hasIssues { + for _, node := range issues.Nodes { + // The order of statements in this loop is crucial for the correct concurrent + // execution. + // + // The issue edit channel and the timeline channel need to be added to the + // corresponding maps before the issue is sent in the issue channel. + // Otherwise, the client could try to retrieve issue edits and timeline itmes + // before these channels are even created. In this case the client would + // receive a nil channel. + issueEditChan := make(chan userContentEdit, CHAN_CAPACITY) + timelineChan := make(chan timelineItem, CHAN_CAPACITY) + mm.issueEditsMut.Lock() + mm.issueEdits[node.issue.Id] = issueEditChan + mm.issueEditsMut.Unlock() + mm.timelineItemsMut.Lock() + mm.timelineItems[node.issue.Id] = timelineChan + mm.timelineItemsMut.Unlock() + select { + case <-ctx.Done(): + return + case mm.issues <- node.issue: + } + + // We do not know whether the client reads from the issue edit channel + // or the timeline channel first. Since the capacity of any channel is limited + // any send operation may block. Hence, in order to avoid deadlocks we need + // to send over both these channels concurrently. + go func(node issueNode) { + mm.fillIssueEdits(ctx, &node, issueEditChan) + close(issueEditChan) + }(node) + go func(node issueNode) { + mm.fillTimeline(ctx, &node, timelineChan) + close(timelineChan) + }(node) } - issueCursor = issues.PageInfo.EndCursor - for _, issueNode := range issues.Nodes { - // fmt.Printf(">>> issue: %v\n", issueNode.issue.Title) - mm.fillChannelIssueEdits(ctx, &issueNode) - mm.fillChannelTimeline(ctx, &issueNode) - // To avoid race conditions add the issue only after all its edits, - // timeline times, etc. are added to their respective channels. - mm.issues <- issueNode.issue + if !issues.PageInfo.HasNextPage { + break } + issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor) } } -func (mm *importMediator) fillChannelIssueEdits(ctx context.Context, issueNode *issueNode) { - // fmt.Printf("fillChannelIssueEdit() issue id == %v\n", issueNode.issue.Id) - // fmt.Printf("%v\n", issueNode) - channel := make(chan userContentEdit, CHAN_CAPACITY) - defer close(channel) - mm.issueEditsMut.Lock() - mm.issueEdits[issueNode.issue.Id] = channel - mm.issueEditsMut.Unlock() +func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNode, channel chan userContentEdit) { edits := &issueNode.UserContentEdits hasEdits := true for hasEdits { - // fmt.Println("before the reversed loop") for edit := range reverse(edits.Nodes) { - // fmt.Println("in the reversed loop") if edit.Diff == nil || string(*edit.Diff) == "" { - // issueEdit.Diff == nil happen if the event is older than - // early 2018, Github doesn't have the data before that. - // Best we can do is to ignore the event. + // issueEdit.Diff == nil happen if the event is older than early + // 2018, Github doesn't have the data before that. Best we can do is + // to ignore the event. continue } - // fmt.Printf("about to push issue edit\n") - channel <- edit + select { + case <-ctx.Done(): + return + case channel <- edit: + } } - // fmt.Printf("has next ? %v\n", edits.PageInfo.HasNextPage) - // fmt.Printf("has previous ? %v\n", edits.PageInfo.HasPreviousPage) if !edits.PageInfo.HasPreviousPage { break } @@ -165,51 +231,64 @@ func (mm *importMediator) fillChannelIssueEdits(ctx context.Context, issueNode * } } -func (mm *importMediator) fillChannelTimeline(ctx context.Context, issueNode *issueNode) { - // fmt.Printf("fullChannelTimeline()\n") - channel := make(chan timelineItem, CHAN_CAPACITY) - defer close(channel) - mm.timelineItemsMut.Lock() - mm.timelineItems[issueNode.issue.Id] = channel - mm.timelineItemsMut.Unlock() +func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan timelineItem) { items := &issueNode.TimelineItems hasItems := true for hasItems { for _, item := range items.Nodes { - channel <- item - mm.fillChannelCommentEdits(ctx, &item) + if item.Typename == "IssueComment" { + // Here the order of statements is crucial for correct concurrency. + commentEditChan := make(chan userContentEdit, CHAN_CAPACITY) + mm.commentEditsMut.Lock() + mm.commentEdits[item.IssueComment.Id] = commentEditChan + mm.commentEditsMut.Unlock() + select { + case <-ctx.Done(): + return + case channel <- item: + } + // We need to create a new goroutine for filling the comment edit + // channel. + go func(item timelineItem) { + mm.fillCommentEdits(ctx, &item, commentEditChan) + close(commentEditChan) + }(item) + } else { + select { + case <-ctx.Done(): + return + case channel <- item: + } + } } - // fmt.Printf("has next ? %v\n", items.PageInfo.HasNextPage) - // fmt.Printf("has previous ? %v\n", items.PageInfo.HasPreviousPage) if !items.PageInfo.HasNextPage { break } - items, hasItems = mm.queryTimelineItems(ctx, issueNode.issue.Id, items.PageInfo.EndCursor) + items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor) } } -func (mm *importMediator) fillChannelCommentEdits(ctx context.Context, item *timelineItem) { - // This concerns only timeline items of type comment +func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem, channel chan userContentEdit) { + // Here we are only concerned with timeline items of type issueComment. if item.Typename != "IssueComment" { return } comment := &item.IssueComment - channel := make(chan userContentEdit, CHAN_CAPACITY) - defer close(channel) - mm.commentEditsMut.Lock() - mm.commentEdits[comment.Id] = channel - mm.commentEditsMut.Unlock() edits := &comment.UserContentEdits hasEdits := true for hasEdits { for edit := range reverse(edits.Nodes) { if edit.Diff == nil || string(*edit.Diff) == "" { - // issueEdit.Diff == nil happen if the event is older than - // early 2018, Github doesn't have the data before that. - // Best we can do is to ignore the event. + // issueEdit.Diff == nil happen if the event is older than early + // 2018, Github doesn't have the data before that. Best we can do is + // to ignore the event. continue } - channel <- edit + select { + case <-ctx.Done(): + return + case channel <- edit: + } } if !edits.PageInfo.HasPreviousPage { break @@ -219,21 +298,16 @@ func (mm *importMediator) fillChannelCommentEdits(ctx context.Context, item *tim } func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) { - // trace() - vars := varmap{ - "gqlNodeId": nid, - "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), - } + vars := newCommentEditVars() + vars["gqlNodeId"] = nid if cursor == "" { vars["commentEditBefore"] = (*githubv4.String)(nil) } else { vars["commentEditBefore"] = cursor } - c, cancel := context.WithTimeout(ctx, defaultTimeout) - defer cancel() query := commentEditQuery{} - if err := mm.mQuery(c, &query, vars); err != nil { - mm.err = err + if err := mm.mQuery(ctx, &query, vars); err != nil { + mm.setError(err) return nil, false } connection := &query.Node.IssueComment.UserContentEdits @@ -243,24 +317,17 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID return connection, true } -func (mm *importMediator) queryTimelineItems(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) { - // trace() - vars := varmap{ - "gqlNodeId": nid, - "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS), - "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), - "commentEditBefore": (*githubv4.String)(nil), - } +func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) { + vars := newTimelineVars() + vars["gqlNodeId"] = nid if cursor == "" { vars["timelineAfter"] = (*githubv4.String)(nil) } else { vars["timelineAfter"] = cursor } - c, cancel := context.WithTimeout(ctx, defaultTimeout) - defer cancel() query := timelineQuery{} - if err := mm.mQuery(c, &query, vars); err != nil { - mm.err = err + if err := mm.mQuery(ctx, &query, vars); err != nil { + mm.setError(err) return nil, false } connection := &query.Node.Issue.TimelineItems @@ -271,21 +338,16 @@ func (mm *importMediator) queryTimelineItems(ctx context.Context, nid githubv4.I } func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) { - // trace() - vars := varmap{ - "gqlNodeId": nid, - "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS), - } + vars := newIssueEditVars() + vars["gqlNodeId"] = nid if cursor == "" { vars["issueEditBefore"] = (*githubv4.String)(nil) } else { vars["issueEditBefore"] = cursor } - c, cancel := context.WithTimeout(ctx, defaultTimeout) - defer cancel() query := issueEditQuery{} - if err := mm.mQuery(c, &query, vars); err != nil { - mm.err = err + if err := mm.mQuery(ctx, &query, vars); err != nil { + mm.setError(err) return nil, false } connection := &query.Node.Issue.UserContentEdits @@ -296,29 +358,15 @@ func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, } func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) { - // trace() - vars := varmap{ - "owner": githubv4.String(mm.owner), - "name": githubv4.String(mm.project), - "issueSince": githubv4.DateTime{Time: mm.since}, - "issueFirst": githubv4.Int(NUM_ISSUES), - "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS), - "issueEditBefore": (*githubv4.String)(nil), - "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS), - "timelineAfter": (*githubv4.String)(nil), - "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), - "commentEditBefore": (*githubv4.String)(nil), - } + vars := newIssueVars(mm.owner, mm.project, mm.since) if cursor == "" { vars["issueAfter"] = (*githubv4.String)(nil) } else { vars["issueAfter"] = githubv4.String(cursor) } - c, cancel := context.WithTimeout(ctx, defaultTimeout) - defer cancel() query := issueQuery{} - if err := mm.mQuery(c, &query, vars); err != nil { - mm.err = err + if err := mm.mQuery(ctx, &query, vars); err != nil { + mm.setError(err) return nil, false } connection := &query.Repository.Issues @@ -343,30 +391,42 @@ type rateLimiter interface { rateLimit() rateLimit } -// TODO: move that into its own file -// -// mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL -// query and it is used to populate the response into it. It should be a pointer to a struct -// that corresponds to the Github graphql schema and it should implement the rateLimiter -// interface. This function queries Github for the remaining rate limit points before -// executing the actual query. The function waits, if there are not enough rate limiting -// points left. +// mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL query +// and it is used to populate the response into it. It should be a pointer to a struct that +// corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If +// there is a Github rate limiting error, then the function sleeps and retries after the rate limit +// is expired. func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error { - // First: check the cost of the query and wait if necessary - vars["dryRun"] = githubv4.Boolean(true) + // first: just send the query to the graphql api + vars["dryRun"] = githubv4.Boolean(false) qctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() + err := mm.gc.Query(qctx, query, vars) + if err == nil { + // no error: done + return nil + } + // matching the error string + if !strings.Contains(err.Error(), "API rate limit exceeded") { + // an error, but not the API rate limit error: done + return err + } + // a rate limit error + // ask the graphql api for rate limiting information + vars["dryRun"] = githubv4.Boolean(true) + qctx, cancel = context.WithTimeout(ctx, defaultTimeout) + defer cancel() if err := mm.gc.Query(qctx, query, vars); err != nil { return err } - fmt.Printf("%v\n", query) rateLimit := query.rateLimit() if rateLimit.Cost > rateLimit.Remaining { + // sleep resetTime := rateLimit.ResetAt.Time - fmt.Println("Github rate limit exhausted") - fmt.Printf("Sleeping until %s\n", resetTime.String()) // Add a few seconds (8) for good measure - timer := time.NewTimer(time.Until(resetTime.Add(8 * time.Second))) + resetTime = resetTime.Add(8 * time.Second) + fmt.Printf("Github rate limit exhausted. Sleeping until %s\n", resetTime.String()) + timer := time.NewTimer(time.Until(resetTime)) select { case <-ctx.Done(): stop(timer) @@ -374,14 +434,12 @@ func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars ma case <-timer.C: } } - // Second: Do the actual query + // run the original query again vars["dryRun"] = githubv4.Boolean(false) qctx, cancel = context.WithTimeout(ctx, defaultTimeout) defer cancel() - if err := mm.gc.Query(qctx, query, vars); err != nil { - return err - } - return nil + err = mm.gc.Query(qctx, query, vars) + return err // might be nil } func stop(t *time.Timer) { diff --git a/bridge/github/import_query.go b/bridge/github/import_query.go index 77c95e1d..c4ab2aa9 100644 --- a/bridge/github/import_query.go +++ b/bridge/github/import_query.go @@ -7,7 +7,7 @@ type userQuery struct { User user `graphql:"user(login: $login)"` } -func (q userQuery) rateLimit() rateLimit { +func (q *userQuery) rateLimit() rateLimit { return q.RateLimit } @@ -40,7 +40,7 @@ type issueQuery struct { } `graphql:"repository(owner: $owner, name: $name)"` } -func (q issueQuery) rateLimit() rateLimit { +func (q *issueQuery) rateLimit() rateLimit { return q.RateLimit } @@ -54,7 +54,7 @@ type issueEditQuery struct { } `graphql:"node(id: $gqlNodeId)"` } -func (q issueEditQuery) rateLimit() rateLimit { +func (q *issueEditQuery) rateLimit() rateLimit { return q.RateLimit } @@ -68,7 +68,7 @@ type timelineQuery struct { } `graphql:"node(id: $gqlNodeId)"` } -func (q timelineQuery) rateLimit() rateLimit { +func (q *timelineQuery) rateLimit() rateLimit { return q.RateLimit } @@ -82,7 +82,7 @@ type commentEditQuery struct { } `graphql:"node(id: $gqlNodeId)"` } -func (q commentEditQuery) rateLimit() rateLimit { +func (q *commentEditQuery) rateLimit() rateLimit { return q.RateLimit } -- cgit