From 689b640bbbb801772d9c5c4bd428d4ec750f00ce Mon Sep 17 00:00:00 2001 From: Alexander Scharinger Date: Sat, 27 Feb 2021 00:42:37 +0100 Subject: Deal with github bridge import rate limit --- bridge/github/import.go | 363 ++++++++++++++------------------- bridge/github/import_mediator.go | 394 ++++++++++++++++++++++++++++++++++++ bridge/github/import_query.go | 244 +++++++++++++--------- bridge/github/iterator.go | 423 --------------------------------------- 4 files changed, 687 insertions(+), 737 deletions(-) create mode 100644 bridge/github/import_mediator.go delete mode 100644 bridge/github/iterator.go (limited to 'bridge') diff --git a/bridge/github/import.go b/bridge/github/import.go index e8a4d3cb..2e36f5fe 100644 --- a/bridge/github/import.go +++ b/bridge/github/import.go @@ -3,6 +3,7 @@ package github import ( "context" "fmt" + "strconv" "time" "github.com/shurcooL/githubv4" @@ -19,41 +20,40 @@ import ( type githubImporter struct { conf core.Configuration - // default client - client *githubv4.Client - - // iterator - iterator *iterator + // mediator to access the Github API + mediator *importMediator // send only channel out chan<- core.ImportResult + + // closure to get the username from github without any additional parameters + ghUser func(string) (*user, error) } -func (gi *githubImporter) Init(_ context.Context, repo *cache.RepoCache, conf core.Configuration) error { +func (gi *githubImporter) Init(_ context.Context, _ *cache.RepoCache, conf core.Configuration) error { gi.conf = conf + return nil +} +// ImportAll iterate over all the configured repository issues and ensure the creation of the +// missing issues / timeline items / edits / label events ... +func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) { creds, err := auth.List(repo, auth.WithTarget(target), auth.WithKind(auth.KindToken), - auth.WithMeta(auth.MetaKeyLogin, conf[confKeyDefaultLogin]), + auth.WithMeta(auth.MetaKeyLogin, gi.conf[confKeyDefaultLogin]), ) if err != nil { - return err + return nil, err } - - if len(creds) == 0 { - return ErrMissingIdentityToken + if len(creds) <= 0 { + return nil, ErrMissingIdentityToken + } + client := buildClient(creds[0].(*auth.Token)) + gi.mediator = NewImportMediator(ctx, client, gi.conf[confKeyOwner], gi.conf[confKeyProject], since) + gi.ghUser = func(login string) (*user, error) { + return gi.mediator.User(ctx, login) } - - gi.client = buildClient(creds[0].(*auth.Token)) - - return nil -} - -// ImportAll iterate over all the configured repository issues and ensure the creation of the -// missing issues / timeline items / edits / label events ... -func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) { - gi.iterator = NewIterator(ctx, gi.client, 10, gi.conf[confKeyOwner], gi.conf[confKeyProject], since) out := make(chan core.ImportResult) gi.out = out @@ -61,19 +61,19 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, defer close(gi.out) // Loop over all matching issues - for gi.iterator.NextIssue() { - issue := gi.iterator.IssueValue() + for issue := range gi.mediator.Issues() { + // fmt.Println("issue loop") // create issue - b, err := gi.ensureIssue(repo, issue) + b, err := gi.ensureIssue(repo, &issue) if err != nil { err := fmt.Errorf("issue creation: %v", err) out <- core.NewImportError(err, "") return } + // fmt.Println("Just before timeline items loop") // loop over timeline items - for gi.iterator.NextTimelineItem() { - item := gi.iterator.TimelineItemValue() + for item := range gi.mediator.TimelineItems(&issue) { err := gi.ensureTimelineItem(repo, b, item) if err != nil { err = fmt.Errorf("timeline item creation: %v", err) @@ -92,7 +92,7 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, } } - if err := gi.iterator.Error(); err != nil { + if err := gi.mediator.Error(); err != nil { gi.out <- core.NewImportError(err, "") } }() @@ -100,8 +100,8 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, return out, nil } -func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issue) (*cache.BugCache, error) { - // ensure issue author +func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue *issue) (*cache.BugCache, error) { + // fmt.Printf("ensureIssue()\n") author, err := gi.ensurePerson(repo, issue.Author) if err != nil { return nil, err @@ -116,94 +116,73 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issue) (*cach return nil, err } - // get issue edits - var issueEdits []userContentEdit - for gi.iterator.NextIssueEdit() { - issueEdits = append(issueEdits, gi.iterator.IssueEditValue()) + // get first issue edit + // if it exists, then it holds the bug creation + firstEdit, hasEdit := <-gi.mediator.IssueEdits(issue) + // fmt.Printf("hasEdit == %v\n", hasEdit) + //fmt.Printf("%v\n", firstEdit) + + title := string(issue.Title) + if title == "" { + fmt.Printf("%v\n", issue) + fmt.Println("title == \"\" holds") + title = "#" + strconv.Itoa(int(issue.Number)) + fmt.Println("setting title := ", title) } - // if issueEdits is empty - if len(issueEdits) == 0 { - if err == bug.ErrBugNotExist { - cleanText, err := text.Cleanup(string(issue.Body)) - if err != nil { - return nil, err - } - - // create bug - b, _, err = repo.NewBugRaw( - author, - issue.CreatedAt.Unix(), - issue.Title, - cleanText, - nil, - map[string]string{ - core.MetaKeyOrigin: target, - metaKeyGithubId: parseId(issue.Id), - metaKeyGithubUrl: issue.Url.String(), - }) - if err != nil { - return nil, err - } - - // importing a new bug - gi.out <- core.NewImportBug(b.Id()) + if err == bug.ErrBugNotExist { + var textInput string + if hasEdit { + // use the first issue edit: it represents the bug creation itself + textInput = string(*firstEdit.Diff) + } else { + // if there are no issue edits then the issue struct holds the bug creation + textInput = string(issue.Body) } - } else { - // create bug from given issueEdits - for i, edit := range issueEdits { - if i == 0 && b != nil { - // The first edit in the github result is the issue creation itself, we already have that - continue - } - - cleanText, err := text.Cleanup(string(*edit.Diff)) - if err != nil { - return nil, err - } - - // if the bug doesn't exist - if b == nil { - // we create the bug as soon as we have a legit first edition - b, _, err = repo.NewBugRaw( - author, - issue.CreatedAt.Unix(), - issue.Title, // TODO: this is the *current* title, not the original one - cleanText, - nil, - map[string]string{ - core.MetaKeyOrigin: target, - metaKeyGithubId: parseId(issue.Id), - metaKeyGithubUrl: issue.Url.String(), - }, - ) - - if err != nil { - return nil, err - } - // importing a new bug - gi.out <- core.NewImportBug(b.Id()) - continue - } - - // other edits will be added as CommentEdit operations - target, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(issue.Id)) - if err == cache.ErrNoMatchingOp { - // original comment is missing somehow, issuing a warning - gi.out <- core.NewImportWarning(fmt.Errorf("comment ID %s to edit is missing", parseId(issue.Id)), b.Id()) - continue - } - if err != nil { - return nil, err - } - - err = gi.ensureCommentEdit(repo, b, target, edit) - if err != nil { - return nil, err - } + cleanText, err := text.Cleanup(textInput) + if err != nil { + return nil, err } + // create bug + b, _, err = repo.NewBugRaw( + author, + issue.CreatedAt.Unix(), + title, // TODO: this is the *current* title, not the original one + cleanText, + nil, + map[string]string{ + core.MetaKeyOrigin: target, + metaKeyGithubId: parseId(issue.Id), + metaKeyGithubUrl: issue.Url.String(), + }) + if err != nil { + fmt.Printf("%v\n", issue) + return nil, err + } + // importing a new bug + gi.out <- core.NewImportBug(b.Id()) + } + if b == nil { + return nil, fmt.Errorf("finding or creating issue") } + // process remaining issue edits, if they exist + for edit := range gi.mediator.IssueEdits(issue) { + // other edits will be added as CommentEdit operations + target, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(issue.Id)) + if err == cache.ErrNoMatchingOp { + // original comment is missing somehow, issuing a warning + gi.out <- core.NewImportWarning(fmt.Errorf("comment ID %s to edit is missing", parseId(issue.Id)), b.Id()) + continue + } + if err != nil { + return nil, err + } + err = gi.ensureCommentEdit(repo, b, target, edit) + if err != nil { + return nil, err + } + } return b, nil } @@ -211,14 +190,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug switch item.Typename { case "IssueComment": - // collect all comment edits - var commentEdits []userContentEdit - for gi.iterator.NextCommentEdit() { - commentEdits = append(commentEdits, gi.iterator.CommentEditValue()) - } - - // ensureTimelineComment send import events over out chanel - err := gi.ensureTimelineComment(repo, b, item.IssueComment, commentEdits) + err := gi.ensureComment(repo, b, &item.IssueComment) if err != nil { return fmt.Errorf("timeline comment creation: %v", err) } @@ -366,90 +338,64 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug return nil } -func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache.BugCache, item issueComment, edits []userContentEdit) error { - // ensure person - author, err := gi.ensurePerson(repo, item.Author) +func (gi *githubImporter) ensureComment(repo *cache.RepoCache, b *cache.BugCache, comment *issueComment) error { + author, err := gi.ensurePerson(repo, comment.Author) if err != nil { return err } - targetOpID, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(item.Id)) + targetOpID, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(comment.Id)) if err != nil && err != cache.ErrNoMatchingOp { // real error return err } - - // if no edits are given we create the comment - if len(edits) == 0 { - if err == cache.ErrNoMatchingOp { - cleanText, err := text.Cleanup(string(item.Body)) - if err != nil { - return err - } - - // add comment operation - op, err := b.AddCommentRaw( - author, - item.CreatedAt.Unix(), - cleanText, - nil, - map[string]string{ - metaKeyGithubId: parseId(item.Id), - metaKeyGithubUrl: parseId(item.Url.String()), - }, - ) - if err != nil { - return err - } - - gi.out <- core.NewImportComment(op.Id()) - return nil + firstEdit, hasEdit := <-gi.mediator.CommentEdits(comment) + if err == cache.ErrNoMatchingOp { + var textInput string + if hasEdit { + // use the first comment edit: it represents the comment creation itself + textInput = string(*firstEdit.Diff) + } else { + // if there are not comment edits, then the comment struct holds the comment creation + textInput = string(comment.Body) + } + cleanText, err := text.Cleanup(textInput) + if err != nil { + return err } - } else { - for i, edit := range edits { - if i == 0 && targetOpID != "" { - // The first edit in the github result is the comment creation itself, we already have that - continue - } - - // ensure editor identity - editor, err := gi.ensurePerson(repo, edit.Editor) - if err != nil { - return err - } - - // create comment when target is empty - if targetOpID == "" { - cleanText, err := text.Cleanup(string(*edit.Diff)) - if err != nil { - return err - } - - op, err := b.AddCommentRaw( - editor, - edit.CreatedAt.Unix(), - cleanText, - nil, - map[string]string{ - metaKeyGithubId: parseId(item.Id), - metaKeyGithubUrl: item.Url.String(), - }, - ) - if err != nil { - return err - } - gi.out <- core.NewImportComment(op.Id()) + // add comment operation + op, err := b.AddCommentRaw( + author, + comment.CreatedAt.Unix(), + cleanText, + nil, + map[string]string{ + metaKeyGithubId: parseId(comment.Id), + metaKeyGithubUrl: comment.Url.String(), + }, + ) + if err != nil { + return err + } - // set target for the next edit now that the comment is created - targetOpID = op.Id() - continue - } + gi.out <- core.NewImportComment(op.Id()) + targetOpID = op.Id() + } + if targetOpID == "" { + return fmt.Errorf("finding or creating issue comment") + } + // process remaining comment edits, if they exist + for edit := range gi.mediator.CommentEdits(comment) { + // ensure editor identity + _, err := gi.ensurePerson(repo, edit.Editor) + if err != nil { + return err + } - err = gi.ensureCommentEdit(repo, b, targetOpID, edit) - if err != nil { - return err - } + err = gi.ensureCommentEdit(repo, b, targetOpID, edit) + if err != nil { + return err } } return nil @@ -521,7 +467,6 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca } // importing a new identity - var name string var email string @@ -565,41 +510,27 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca } func (gi *githubImporter) getGhost(repo *cache.RepoCache) (*cache.IdentityCache, error) { + loginName := "ghost" // Look first in the cache - i, err := repo.ResolveIdentityImmutableMetadata(metaKeyGithubLogin, "ghost") + i, err := repo.ResolveIdentityImmutableMetadata(metaKeyGithubLogin, loginName) if err == nil { return i, nil } if entity.IsErrMultipleMatch(err) { return nil, err } - - var q ghostQuery - - variables := map[string]interface{}{ - "login": githubv4.String("ghost"), + user, err := gi.ghUser(loginName) + userName := "" + if user.Name != nil { + userName = string(*user.Name) } - - ctx, cancel := context.WithTimeout(gi.iterator.ctx, defaultTimeout) - defer cancel() - - err = gi.client.Query(ctx, &q, variables) - if err != nil { - return nil, err - } - - var name string - if q.User.Name != nil { - name = string(*q.User.Name) - } - return repo.NewIdentityRaw( - name, + userName, "", - string(q.User.Login), - string(q.User.AvatarUrl), + string(user.Login), + string(user.AvatarUrl), map[string]string{ - metaKeyGithubLogin: string(q.User.Login), + metaKeyGithubLogin: string(user.Login), }, ) } diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go new file mode 100644 index 00000000..428c5d36 --- /dev/null +++ b/bridge/github/import_mediator.go @@ -0,0 +1,394 @@ +package github + +import ( + "context" + "fmt" + "runtime" + "sync" + "time" + + "github.com/shurcooL/githubv4" +) + +type varmap map[string]interface{} + +func trace() { + pc := make([]uintptr, 15) + n := runtime.Callers(2, pc) + frames := runtime.CallersFrames(pc[:n]) + frame, _ := frames.Next() + fmt.Printf("%s:%d %s\n", frame.File, frame.Line, frame.Function) +} + +const ( + NUM_ISSUES = 50 + NUM_ISSUE_EDITS = 99 + NUM_TIMELINE_ITEMS = 99 + NUM_COMMENT_EDITS = 99 + + CHAN_CAPACITY = 128 +) + +// TODO: remove all debug output and trace() in all files. Use ag + +type importMediator struct { + // Github graphql client + gc *githubv4.Client + owner string + project string + // The iterator will only query issues updated or created after the date given in + // the variable since. + since time.Time + + issues chan issue + issueEditsMut sync.Mutex + timelineItemsMut sync.Mutex + commentEditsMut sync.Mutex + issueEdits map[githubv4.ID]chan userContentEdit + timelineItems map[githubv4.ID]chan timelineItem + commentEdits map[githubv4.ID]chan userContentEdit + + // Sticky error + err error +} + +func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator { + mm := importMediator{ + gc: client, + owner: owner, + project: project, + since: since, + issues: make(chan issue, CHAN_CAPACITY), + issueEditsMut: sync.Mutex{}, + timelineItemsMut: sync.Mutex{}, + commentEditsMut: sync.Mutex{}, + issueEdits: make(map[githubv4.ID]chan userContentEdit), + timelineItems: make(map[githubv4.ID]chan timelineItem), + commentEdits: make(map[githubv4.ID]chan userContentEdit), + err: nil, + } + go func() { + defer close(mm.issues) + mm.fillChannels(ctx) + }() + return &mm +} + +func (mm *importMediator) Issues() <-chan issue { + return mm.issues +} + +func (mm *importMediator) IssueEdits(issue *issue) <-chan userContentEdit { + mm.issueEditsMut.Lock() + channel := mm.issueEdits[issue.Id] + mm.issueEditsMut.Unlock() + return channel +} + +func (mm *importMediator) TimelineItems(issue *issue) <-chan timelineItem { + mm.timelineItemsMut.Lock() + channel := mm.timelineItems[issue.Id] + mm.timelineItemsMut.Unlock() + return channel +} + +func (mm *importMediator) CommentEdits(comment *issueComment) <-chan userContentEdit { + mm.commentEditsMut.Lock() + channel := mm.commentEdits[comment.Id] + mm.commentEditsMut.Unlock() + return channel +} + +func (mm *importMediator) Error() error { + return mm.err +} + +func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) { + query := userQuery{} + vars := varmap{"login": githubv4.String(loginName)} + c, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + if err := mm.mQuery(c, &query, vars); err != nil { + return nil, err + } + return &query.User, nil +} + +func (mm *importMediator) fillChannels(ctx context.Context) { + issueCursor := githubv4.String("") + for { + issues, hasIssues := mm.queryIssue(ctx, issueCursor) + if !hasIssues { + break + } + issueCursor = issues.PageInfo.EndCursor + for _, issueNode := range issues.Nodes { + // fmt.Printf(">>> issue: %v\n", issueNode.issue.Title) + mm.fillChannelIssueEdits(ctx, &issueNode) + mm.fillChannelTimeline(ctx, &issueNode) + // To avoid race conditions add the issue only after all its edits, + // timeline times, etc. are added to their respective channels. + mm.issues <- issueNode.issue + } + } +} + +func (mm *importMediator) fillChannelIssueEdits(ctx context.Context, issueNode *issueNode) { + // fmt.Printf("fillChannelIssueEdit() issue id == %v\n", issueNode.issue.Id) + // fmt.Printf("%v\n", issueNode) + channel := make(chan userContentEdit, CHAN_CAPACITY) + defer close(channel) + mm.issueEditsMut.Lock() + mm.issueEdits[issueNode.issue.Id] = channel + mm.issueEditsMut.Unlock() + edits := &issueNode.UserContentEdits + hasEdits := true + for hasEdits { + // fmt.Println("before the reversed loop") + for edit := range reverse(edits.Nodes) { + // fmt.Println("in the reversed loop") + if edit.Diff == nil || string(*edit.Diff) == "" { + // issueEdit.Diff == nil happen if the event is older than + // early 2018, Github doesn't have the data before that. + // Best we can do is to ignore the event. + continue + } + // fmt.Printf("about to push issue edit\n") + channel <- edit + } + // fmt.Printf("has next ? %v\n", edits.PageInfo.HasNextPage) + // fmt.Printf("has previous ? %v\n", edits.PageInfo.HasPreviousPage) + if !edits.PageInfo.HasPreviousPage { + break + } + edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor) + } +} + +func (mm *importMediator) fillChannelTimeline(ctx context.Context, issueNode *issueNode) { + // fmt.Printf("fullChannelTimeline()\n") + channel := make(chan timelineItem, CHAN_CAPACITY) + defer close(channel) + mm.timelineItemsMut.Lock() + mm.timelineItems[issueNode.issue.Id] = channel + mm.timelineItemsMut.Unlock() + items := &issueNode.TimelineItems + hasItems := true + for hasItems { + for _, item := range items.Nodes { + channel <- item + mm.fillChannelCommentEdits(ctx, &item) + } + // fmt.Printf("has next ? %v\n", items.PageInfo.HasNextPage) + // fmt.Printf("has previous ? %v\n", items.PageInfo.HasPreviousPage) + if !items.PageInfo.HasNextPage { + break + } + items, hasItems = mm.queryTimelineItems(ctx, issueNode.issue.Id, items.PageInfo.EndCursor) + } +} + +func (mm *importMediator) fillChannelCommentEdits(ctx context.Context, item *timelineItem) { + // This concerns only timeline items of type comment + if item.Typename != "IssueComment" { + return + } + comment := &item.IssueComment + channel := make(chan userContentEdit, CHAN_CAPACITY) + defer close(channel) + mm.commentEditsMut.Lock() + mm.commentEdits[comment.Id] = channel + mm.commentEditsMut.Unlock() + edits := &comment.UserContentEdits + hasEdits := true + for hasEdits { + for edit := range reverse(edits.Nodes) { + if edit.Diff == nil || string(*edit.Diff) == "" { + // issueEdit.Diff == nil happen if the event is older than + // early 2018, Github doesn't have the data before that. + // Best we can do is to ignore the event. + continue + } + channel <- edit + } + if !edits.PageInfo.HasPreviousPage { + break + } + edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor) + } +} + +func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) { + // trace() + vars := varmap{ + "gqlNodeId": nid, + "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), + } + if cursor == "" { + vars["commentEditBefore"] = (*githubv4.String)(nil) + } else { + vars["commentEditBefore"] = cursor + } + c, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + query := commentEditQuery{} + if err := mm.mQuery(c, &query, vars); err != nil { + mm.err = err + return nil, false + } + connection := &query.Node.IssueComment.UserContentEdits + if len(connection.Nodes) <= 0 { + return nil, false + } + return connection, true +} + +func (mm *importMediator) queryTimelineItems(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) { + // trace() + vars := varmap{ + "gqlNodeId": nid, + "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS), + "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), + "commentEditBefore": (*githubv4.String)(nil), + } + if cursor == "" { + vars["timelineAfter"] = (*githubv4.String)(nil) + } else { + vars["timelineAfter"] = cursor + } + c, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + query := timelineQuery{} + if err := mm.mQuery(c, &query, vars); err != nil { + mm.err = err + return nil, false + } + connection := &query.Node.Issue.TimelineItems + if len(connection.Nodes) <= 0 { + return nil, false + } + return connection, true +} + +func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) { + // trace() + vars := varmap{ + "gqlNodeId": nid, + "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS), + } + if cursor == "" { + vars["issueEditBefore"] = (*githubv4.String)(nil) + } else { + vars["issueEditBefore"] = cursor + } + c, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + query := issueEditQuery{} + if err := mm.mQuery(c, &query, vars); err != nil { + mm.err = err + return nil, false + } + connection := &query.Node.Issue.UserContentEdits + if len(connection.Nodes) <= 0 { + return nil, false + } + return connection, true +} + +func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) { + // trace() + vars := varmap{ + "owner": githubv4.String(mm.owner), + "name": githubv4.String(mm.project), + "issueSince": githubv4.DateTime{Time: mm.since}, + "issueFirst": githubv4.Int(NUM_ISSUES), + "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS), + "issueEditBefore": (*githubv4.String)(nil), + "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS), + "timelineAfter": (*githubv4.String)(nil), + "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), + "commentEditBefore": (*githubv4.String)(nil), + } + if cursor == "" { + vars["issueAfter"] = (*githubv4.String)(nil) + } else { + vars["issueAfter"] = githubv4.String(cursor) + } + c, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + query := issueQuery{} + if err := mm.mQuery(c, &query, vars); err != nil { + mm.err = err + return nil, false + } + connection := &query.Repository.Issues + if len(connection.Nodes) <= 0 { + return nil, false + } + return connection, true +} + +func reverse(eds []userContentEdit) chan userContentEdit { + ret := make(chan userContentEdit) + go func() { + for i := range eds { + ret <- eds[len(eds)-1-i] + } + close(ret) + }() + return ret +} + +type rateLimiter interface { + rateLimit() rateLimit +} + +// TODO: move that into its own file +// +// mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL +// query and it is used to populate the response into it. It should be a pointer to a struct +// that corresponds to the Github graphql schema and it should implement the rateLimiter +// interface. This function queries Github for the remaining rate limit points before +// executing the actual query. The function waits, if there are not enough rate limiting +// points left. +func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error { + // First: check the cost of the query and wait if necessary + vars["dryRun"] = githubv4.Boolean(true) + qctx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + if err := mm.gc.Query(qctx, query, vars); err != nil { + return err + } + fmt.Printf("%v\n", query) + rateLimit := query.rateLimit() + if rateLimit.Cost > rateLimit.Remaining { + resetTime := rateLimit.ResetAt.Time + fmt.Println("Github rate limit exhausted") + fmt.Printf("Sleeping until %s\n", resetTime.String()) + // Add a few seconds (8) for good measure + timer := time.NewTimer(time.Until(resetTime.Add(8 * time.Second))) + select { + case <-ctx.Done(): + stop(timer) + return ctx.Err() + case <-timer.C: + } + } + // Second: Do the actual query + vars["dryRun"] = githubv4.Boolean(false) + qctx, cancel = context.WithTimeout(ctx, defaultTimeout) + defer cancel() + if err := mm.gc.Query(qctx, query, vars); err != nil { + return err + } + return nil +} + +func stop(t *time.Timer) { + if !t.Stop() { + select { + case <-t.C: + default: + } + } +} diff --git a/bridge/github/import_query.go b/bridge/github/import_query.go index 228d204a..77c95e1d 100644 --- a/bridge/github/import_query.go +++ b/bridge/github/import_query.go @@ -2,37 +2,123 @@ package github import "github.com/shurcooL/githubv4" -type pageInfo struct { - EndCursor githubv4.String - HasNextPage bool - StartCursor githubv4.String - HasPreviousPage bool +type userQuery struct { + RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"` + User user `graphql:"user(login: $login)"` } -type actor struct { - Typename githubv4.String `graphql:"__typename"` +func (q userQuery) rateLimit() rateLimit { + return q.RateLimit +} + +type labelsQuery struct { + //RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"` + Repository struct { + Labels struct { + Nodes []struct { + ID string `graphql:"id"` + Name string `graphql:"name"` + Color string `graphql:"color"` + Description string `graphql:"description"` + } + PageInfo pageInfo + } `graphql:"labels(first: $first, after: $after)"` + } `graphql:"repository(owner: $owner, name: $name)"` +} + +type loginQuery struct { + //RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"` + Viewer struct { + Login string `graphql:"login"` + } `graphql:"viewer"` +} + +type issueQuery struct { + RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"` + Repository struct { + Issues issueConnection `graphql:"issues(first: $issueFirst, after: $issueAfter, orderBy: {field: CREATED_AT, direction: ASC}, filterBy: {since: $issueSince})"` + } `graphql:"repository(owner: $owner, name: $name)"` +} + +func (q issueQuery) rateLimit() rateLimit { + return q.RateLimit +} + +type issueEditQuery struct { + RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"` + Node struct { + Typename githubv4.String `graphql:"__typename"` + Issue struct { + UserContentEdits userContentEditConnection `graphql:"userContentEdits(last: $issueEditLast, before: $issueEditBefore)"` + } `graphql:"... on Issue"` + } `graphql:"node(id: $gqlNodeId)"` +} + +func (q issueEditQuery) rateLimit() rateLimit { + return q.RateLimit +} + +type timelineQuery struct { + RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"` + Node struct { + Typename githubv4.String `graphql:"__typename"` + Issue struct { + TimelineItems timelineItemsConnection `graphql:"timelineItems(first: $timelineFirst, after: $timelineAfter)"` + } `graphql:"... on Issue"` + } `graphql:"node(id: $gqlNodeId)"` +} + +func (q timelineQuery) rateLimit() rateLimit { + return q.RateLimit +} + +type commentEditQuery struct { + RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"` + Node struct { + Typename githubv4.String `graphql:"__typename"` + IssueComment struct { + UserContentEdits userContentEditConnection `graphql:"userContentEdits(last: $commentEditLast, before: $commentEditBefore)"` + } `graphql:"... on IssueComment"` + } `graphql:"node(id: $gqlNodeId)"` +} + +func (q commentEditQuery) rateLimit() rateLimit { + return q.RateLimit +} + +type user struct { Login githubv4.String AvatarUrl githubv4.String - User struct { - Name *githubv4.String - Email githubv4.String - } `graphql:"... on User"` - Organization struct { - Name *githubv4.String - Email *githubv4.String - } `graphql:"... on Organization"` + Name *githubv4.String } -type actorEvent struct { - Id githubv4.ID - CreatedAt githubv4.DateTime - Actor *actor +type issueConnection struct { + Nodes []issueNode + PageInfo pageInfo } -type authorEvent struct { - Id githubv4.ID - CreatedAt githubv4.DateTime - Author *actor +type issueNode struct { + issue + UserContentEdits userContentEditConnection `graphql:"userContentEdits(last: $issueEditLast, before: $issueEditBefore)"` + TimelineItems timelineItemsConnection `graphql:"timelineItems(first: $timelineFirst, after: $timelineAfter)"` +} + +type issue struct { + authorEvent + Title githubv4.String + Number githubv4.Int + Body githubv4.String + Url githubv4.URI +} + +type timelineItemsConnection struct { + Nodes []timelineItem + PageInfo pageInfo +} + +type userContentEditConnection struct { + Nodes []userContentEdit + PageInfo pageInfo } type userContentEdit struct { @@ -46,12 +132,6 @@ type userContentEdit struct { Diff *githubv4.String } -type issueComment struct { - authorEvent // NOTE: contains Id - Body githubv4.String - Url githubv4.URI -} - type timelineItem struct { Typename githubv4.String `graphql:"__typename"` @@ -91,84 +171,52 @@ type timelineItem struct { } `graphql:"... on RenamedTitleEvent"` } -type ghostQuery struct { - User struct { - Login githubv4.String - AvatarUrl githubv4.String - Name *githubv4.String - } `graphql:"user(login: $login)"` -} - -type labelsQuery struct { - Repository struct { - Labels struct { - Nodes []struct { - ID string `graphql:"id"` - Name string `graphql:"name"` - Color string `graphql:"color"` - Description string `graphql:"description"` - } - PageInfo pageInfo - } `graphql:"labels(first: $first, after: $after)"` - } `graphql:"repository(owner: $owner, name: $name)"` -} +type issueComment struct { + authorEvent // NOTE: contains Id + Body githubv4.String + Url githubv4.URI -type loginQuery struct { - Viewer struct { - Login string `graphql:"login"` - } `graphql:"viewer"` + UserContentEdits userContentEditConnection `graphql:"userContentEdits(last: $commentEditLast, before: $commentEditBefore)"` } -type issueQuery struct { - Repository struct { - Issues struct { - Nodes []issue - PageInfo pageInfo - } `graphql:"issues(first: $issueFirst, after: $issueAfter, orderBy: {field: CREATED_AT, direction: ASC}, filterBy: {since: $issueSince})"` - } `graphql:"repository(owner: $owner, name: $name)"` +type actor struct { + Typename githubv4.String `graphql:"__typename"` + Login githubv4.String + AvatarUrl githubv4.String + User struct { + Name *githubv4.String + Email githubv4.String + } `graphql:"... on User"` + Organization struct { + Name *githubv4.String + Email *githubv4.String + } `graphql:"... on Organization"` } -type issue struct { - authorEvent - Title string - Number githubv4.Int - Body githubv4.String - Url githubv4.URI +type actorEvent struct { + Id githubv4.ID + CreatedAt githubv4.DateTime + Actor *actor } -type issueEditQuery struct { - Node struct { - Typename githubv4.String `graphql:"__typename"` - Issue struct { - UserContentEdits struct { - Nodes []userContentEdit - TotalCount githubv4.Int - PageInfo pageInfo - } `graphql:"userContentEdits(last: $issueEditLast, before: $issueEditBefore)"` - } `graphql:"... on Issue"` - } `graphql:"node(id: $gqlNodeId)"` +type authorEvent struct { + Id githubv4.ID + CreatedAt githubv4.DateTime + Author *actor } -type timelineQuery struct { - Node struct { - Typename githubv4.String `graphql:"__typename"` - Issue struct { - TimelineItems struct { - Nodes []timelineItem - PageInfo pageInfo - } `graphql:"timelineItems(first: $timelineFirst, after: $timelineAfter)"` - } `graphql:"... on Issue"` - } `graphql:"node(id: $gqlNodeId)"` +type pageInfo struct { + EndCursor githubv4.String + HasNextPage bool + StartCursor githubv4.String + HasPreviousPage bool } -type commentEditQuery struct { - Node struct { - Typename githubv4.String `graphql:"__typename"` - IssueComment struct { - UserContentEdits struct { - Nodes []userContentEdit - PageInfo pageInfo - } `graphql:"userContentEdits(last: $commentEditLast, before: $commentEditBefore)"` - } `graphql:"... on IssueComment"` - } `graphql:"node(id: $gqlNodeId)"` +type rateLimit struct { + Cost githubv4.Int + Limit githubv4.Int + NodeCount githubv4.Int + Remaining githubv4.Int + ResetAt githubv4.DateTime + Used githubv4.Int } diff --git a/bridge/github/iterator.go b/bridge/github/iterator.go deleted file mode 100644 index d21faae8..00000000 --- a/bridge/github/iterator.go +++ /dev/null @@ -1,423 +0,0 @@ -package github - -import ( - "context" - "time" - - "github.com/pkg/errors" - "github.com/shurcooL/githubv4" -) - -type iterator struct { - // Github graphql client - gc *githubv4.Client - - // The iterator will only query issues updated or created after the date given in - // the variable since. - since time.Time - - // Shared context, which is used for all graphql queries. - ctx context.Context - - // Sticky error - err error - - // Issue iterator - issueIter issueIter -} - -type issueIter struct { - iterVars - query issueQuery - issueEditIter []issueEditIter - timelineIter []timelineIter -} - -type issueEditIter struct { - iterVars - query issueEditQuery -} - -type timelineIter struct { - iterVars - query timelineQuery - commentEditIter []commentEditIter -} - -type commentEditIter struct { - iterVars - query commentEditQuery -} - -type iterVars struct { - // Iterator index - index int - - // capacity is the number of elements (issues, issue edits, timeline items, or - // comment edits) to query at a time. More capacity = more used memory = - // less queries to make. - capacity int - - // Variable assignments for graphql query - variables varmap -} - -type varmap map[string]interface{} - -func newIterVars(capacity int) iterVars { - return iterVars{ - index: -1, - capacity: capacity, - variables: varmap{}, - } -} - -// NewIterator creates and initialize a new iterator. -func NewIterator(ctx context.Context, client *githubv4.Client, capacity int, owner, project string, since time.Time) *iterator { - i := &iterator{ - gc: client, - since: since, - ctx: ctx, - issueIter: issueIter{ - iterVars: newIterVars(capacity), - timelineIter: make([]timelineIter, capacity), - issueEditIter: make([]issueEditIter, capacity), - }, - } - i.issueIter.variables.setOwnerProject(owner, project) - for idx := range i.issueIter.issueEditIter { - ie := &i.issueIter.issueEditIter[idx] - ie.iterVars = newIterVars(capacity) - } - for i1 := range i.issueIter.timelineIter { - tli := &i.issueIter.timelineIter[i1] - tli.iterVars = newIterVars(capacity) - tli.commentEditIter = make([]commentEditIter, capacity) - for i2 := range tli.commentEditIter { - cei := &tli.commentEditIter[i2] - cei.iterVars = newIterVars(capacity) - } - } - i.resetIssueVars() - return i -} - -func (v *varmap) setOwnerProject(owner, project string) { - (*v)["owner"] = githubv4.String(owner) - (*v)["name"] = githubv4.String(project) -} - -func (i *iterator) resetIssueVars() { - vars := &i.issueIter.variables - (*vars)["issueFirst"] = githubv4.Int(i.issueIter.capacity) - (*vars)["issueAfter"] = (*githubv4.String)(nil) - (*vars)["issueSince"] = githubv4.DateTime{Time: i.since} - i.issueIter.query.Repository.Issues.PageInfo.HasNextPage = true - i.issueIter.query.Repository.Issues.PageInfo.EndCursor = "" -} - -func (i *iterator) resetIssueEditVars() { - for idx := range i.issueIter.issueEditIter { - ie := &i.issueIter.issueEditIter[idx] - ie.variables["issueEditLast"] = githubv4.Int(ie.capacity) - ie.variables["issueEditBefore"] = (*githubv4.String)(nil) - ie.query.Node.Issue.UserContentEdits.PageInfo.HasNextPage = true - ie.query.Node.Issue.UserContentEdits.PageInfo.EndCursor = "" - } -} - -func (i *iterator) resetTimelineVars() { - for idx := range i.issueIter.timelineIter { - ip := &i.issueIter.timelineIter[idx] - ip.variables["timelineFirst"] = githubv4.Int(ip.capacity) - ip.variables["timelineAfter"] = (*githubv4.String)(nil) - ip.query.Node.Issue.TimelineItems.PageInfo.HasNextPage = true - ip.query.Node.Issue.TimelineItems.PageInfo.EndCursor = "" - } -} - -func (i *iterator) resetCommentEditVars() { - for i1 := range i.issueIter.timelineIter { - for i2 := range i.issueIter.timelineIter[i1].commentEditIter { - ce := &i.issueIter.timelineIter[i1].commentEditIter[i2] - ce.variables["commentEditLast"] = githubv4.Int(ce.capacity) - ce.variables["commentEditBefore"] = (*githubv4.String)(nil) - ce.query.Node.IssueComment.UserContentEdits.PageInfo.HasNextPage = true - ce.query.Node.IssueComment.UserContentEdits.PageInfo.EndCursor = "" - } - } -} - -// Error return last encountered error -func (i *iterator) Error() error { - if i.err != nil { - return i.err - } - return i.ctx.Err() // might return nil -} - -func (i *iterator) HasError() bool { - return i.err != nil || i.ctx.Err() != nil -} - -func (i *iterator) currIssueItem() *issue { - return &i.issueIter.query.Repository.Issues.Nodes[i.issueIter.index] -} - -func (i *iterator) currIssueEditIter() *issueEditIter { - return &i.issueIter.issueEditIter[i.issueIter.index] -} - -func (i *iterator) currTimelineIter() *timelineIter { - return &i.issueIter.timelineIter[i.issueIter.index] -} - -func (i *iterator) currCommentEditIter() *commentEditIter { - timelineIter := i.currTimelineIter() - return &timelineIter.commentEditIter[timelineIter.index] -} - -func (i *iterator) currIssueGqlNodeId() githubv4.ID { - return i.currIssueItem().Id -} - -// NextIssue returns true if there exists a next issue and advances the iterator by one. -// It is used to iterate over all issues. Queries to github are made when necessary. -func (i *iterator) NextIssue() bool { - if i.HasError() { - return false - } - index := &i.issueIter.index - issues := &i.issueIter.query.Repository.Issues - issueItems := &issues.Nodes - if 0 <= *index && *index < len(*issueItems)-1 { - *index += 1 - return true - } - - if !issues.PageInfo.HasNextPage { - return false - } - nextIssue := i.queryIssue() - return nextIssue -} - -// IssueValue returns the actual issue value. -func (i *iterator) IssueValue() issue { - return *i.currIssueItem() -} - -func (i *iterator) queryIssue() bool { - ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout) - defer cancel() - if endCursor := i.issueIter.query.Repository.Issues.PageInfo.EndCursor; endCursor != "" { - i.issueIter.variables["issueAfter"] = endCursor - } - if err := i.gc.Query(ctx, &i.issueIter.query, i.issueIter.variables); err != nil { - i.err = err - return false - } - i.resetIssueEditVars() - i.resetTimelineVars() - issueItems := &i.issueIter.query.Repository.Issues.Nodes - if len(*issueItems) <= 0 { - i.issueIter.index = -1 - return false - } - i.issueIter.index = 0 - return true -} - -// NextIssueEdit returns true if there exists a next issue edit and advances the iterator -// by one. It is used to iterate over all the issue edits. Queries to github are made when -// necessary. -func (i *iterator) NextIssueEdit() bool { - if i.HasError() { - return false - } - ieIter := i.currIssueEditIter() - ieIdx := &ieIter.index - ieItems := ieIter.query.Node.Issue.UserContentEdits - if 0 <= *ieIdx && *ieIdx < len(ieItems.Nodes)-1 { - *ieIdx += 1 - return i.nextValidIssueEdit() - } - if !ieItems.PageInfo.HasNextPage { - return false - } - querySucc := i.queryIssueEdit() - if !querySucc { - return false - } - return i.nextValidIssueEdit() -} - -func (i *iterator) nextValidIssueEdit() bool { - // issueEdit.Diff == nil happen if the event is older than early 2018, Github doesn't have - // the data before that. Best we can do is to ignore the event. - if issueEdit := i.IssueEditValue(); issueEdit.Diff == nil || string(*issueEdit.Diff) == "" { - return i.NextIssueEdit() - } - return true -} - -// IssueEditValue returns the actual issue edit value. -func (i *iterator) IssueEditValue() userContentEdit { - iei := i.currIssueEditIter() - return iei.query.Node.Issue.UserContentEdits.Nodes[iei.index] -} - -func (i *iterator) queryIssueEdit() bool { - ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout) - defer cancel() - iei := i.currIssueEditIter() - if endCursor := iei.query.Node.Issue.UserContentEdits.PageInfo.EndCursor; endCursor != "" { - iei.variables["issueEditBefore"] = endCursor - } - iei.variables["gqlNodeId"] = i.currIssueGqlNodeId() - if err := i.gc.Query(ctx, &iei.query, iei.variables); err != nil { - i.err = err - return false - } - issueEditItems := iei.query.Node.Issue.UserContentEdits.Nodes - if len(issueEditItems) <= 0 { - iei.index = -1 - return false - } - // The UserContentEditConnection in the Github API serves its elements in reverse chronological - // order. For our purpose we have to reverse the edits. - reverseEdits(issueEditItems) - iei.index = 0 - return true -} - -// NextTimelineItem returns true if there exists a next timeline item and advances the iterator -// by one. It is used to iterate over all the timeline items. Queries to github are made when -// necessary. -func (i *iterator) NextTimelineItem() bool { - if i.HasError() { - return false - } - tlIter := &i.issueIter.timelineIter[i.issueIter.index] - tlIdx := &tlIter.index - tlItems := tlIter.query.Node.Issue.TimelineItems - if 0 <= *tlIdx && *tlIdx < len(tlItems.Nodes)-1 { - *tlIdx += 1 - return true - } - if !tlItems.PageInfo.HasNextPage { - return false - } - nextTlItem := i.queryTimeline() - return nextTlItem -} - -// TimelineItemValue returns the actual timeline item value. -func (i *iterator) TimelineItemValue() timelineItem { - tli := i.currTimelineIter() - return tli.query.Node.Issue.TimelineItems.Nodes[tli.index] -} - -func (i *iterator) queryTimeline() bool { - ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout) - defer cancel() - tli := i.currTimelineIter() - if endCursor := tli.query.Node.Issue.TimelineItems.PageInfo.EndCursor; endCursor != "" { - tli.variables["timelineAfter"] = endCursor - } - tli.variables["gqlNodeId"] = i.currIssueGqlNodeId() - if err := i.gc.Query(ctx, &tli.query, tli.variables); err != nil { - i.err = err - return false - } - i.resetCommentEditVars() - timelineItems := &tli.query.Node.Issue.TimelineItems - if len(timelineItems.Nodes) <= 0 { - tli.index = -1 - return false - } - tli.index = 0 - return true -} - -// NextCommentEdit returns true if there exists a next comment edit and advances the iterator -// by one. It is used to iterate over all issue edits. Queries to github are made when -// necessary. -func (i *iterator) NextCommentEdit() bool { - if i.HasError() { - return false - } - - tmlnVal := i.TimelineItemValue() - if tmlnVal.Typename != "IssueComment" { - // The timeline iterator does not point to a comment. - i.err = errors.New("Call to NextCommentEdit() while timeline item is not a comment") - return false - } - - cei := i.currCommentEditIter() - ceIdx := &cei.index - ceItems := &cei.query.Node.IssueComment.UserContentEdits - if 0 <= *ceIdx && *ceIdx < len(ceItems.Nodes)-1 { - *ceIdx += 1 - return i.nextValidCommentEdit() - } - if !ceItems.PageInfo.HasNextPage { - return false - } - querySucc := i.queryCommentEdit() - if !querySucc { - return false - } - return i.nextValidCommentEdit() -} - -func (i *iterator) nextValidCommentEdit() bool { - // if comment edit diff is a nil pointer or points to an empty string look for next value - if commentEdit := i.CommentEditValue(); commentEdit.Diff == nil || string(*commentEdit.Diff) == "" { - return i.NextCommentEdit() - } - return true -} - -// CommentEditValue returns the actual comment edit value. -func (i *iterator) CommentEditValue() userContentEdit { - cei := i.currCommentEditIter() - return cei.query.Node.IssueComment.UserContentEdits.Nodes[cei.index] -} - -func (i *iterator) queryCommentEdit() bool { - ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout) - defer cancel() - cei := i.currCommentEditIter() - - if endCursor := cei.query.Node.IssueComment.UserContentEdits.PageInfo.EndCursor; endCursor != "" { - cei.variables["commentEditBefore"] = endCursor - } - tmlnVal := i.TimelineItemValue() - if tmlnVal.Typename != "IssueComment" { - i.err = errors.New("Call to queryCommentEdit() while timeline item is not a comment") - return false - } - cei.variables["gqlNodeId"] = tmlnVal.IssueComment.Id - if err := i.gc.Query(ctx, &cei.query, cei.variables); err != nil { - i.err = err - return false - } - ceItems := cei.query.Node.IssueComment.UserContentEdits.Nodes - if len(ceItems) <= 0 { - cei.index = -1 - return false - } - // The UserContentEditConnection in the Github API serves its elements in reverse chronological - // order. For our purpose we have to reverse the edits. - reverseEdits(ceItems) - cei.index = 0 - return true -} - -func reverseEdits(edits []userContentEdit) { - for i, j := 0, len(edits)-1; i < j; i, j = i+1, j-1 { - edits[i], edits[j] = edits[j], edits[i] - } -} -- cgit