From 689b640bbbb801772d9c5c4bd428d4ec750f00ce Mon Sep 17 00:00:00 2001 From: Alexander Scharinger Date: Sat, 27 Feb 2021 00:42:37 +0100 Subject: Deal with github bridge import rate limit --- bridge/github/import.go | 363 ++++++++++++++------------------- bridge/github/import_mediator.go | 394 ++++++++++++++++++++++++++++++++++++ bridge/github/import_query.go | 244 +++++++++++++--------- bridge/github/iterator.go | 423 --------------------------------------- 4 files changed, 687 insertions(+), 737 deletions(-) create mode 100644 bridge/github/import_mediator.go delete mode 100644 bridge/github/iterator.go (limited to 'bridge') diff --git a/bridge/github/import.go b/bridge/github/import.go index e8a4d3cb..2e36f5fe 100644 --- a/bridge/github/import.go +++ b/bridge/github/import.go @@ -3,6 +3,7 @@ package github import ( "context" "fmt" + "strconv" "time" "github.com/shurcooL/githubv4" @@ -19,41 +20,40 @@ import ( type githubImporter struct { conf core.Configuration - // default client - client *githubv4.Client - - // iterator - iterator *iterator + // mediator to access the Github API + mediator *importMediator // send only channel out chan<- core.ImportResult + + // closure to get the username from github without any additional parameters + ghUser func(string) (*user, error) } -func (gi *githubImporter) Init(_ context.Context, repo *cache.RepoCache, conf core.Configuration) error { +func (gi *githubImporter) Init(_ context.Context, _ *cache.RepoCache, conf core.Configuration) error { gi.conf = conf + return nil +} +// ImportAll iterate over all the configured repository issues and ensure the creation of the +// missing issues / timeline items / edits / label events ... +func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) { creds, err := auth.List(repo, auth.WithTarget(target), auth.WithKind(auth.KindToken), - auth.WithMeta(auth.MetaKeyLogin, conf[confKeyDefaultLogin]), + auth.WithMeta(auth.MetaKeyLogin, gi.conf[confKeyDefaultLogin]), ) if err != nil { - return err + return nil, err } - - if len(creds) == 0 { - return ErrMissingIdentityToken + if len(creds) <= 0 { + return nil, ErrMissingIdentityToken + } + client := buildClient(creds[0].(*auth.Token)) + gi.mediator = NewImportMediator(ctx, client, gi.conf[confKeyOwner], gi.conf[confKeyProject], since) + gi.ghUser = func(login string) (*user, error) { + return gi.mediator.User(ctx, login) } - - gi.client = buildClient(creds[0].(*auth.Token)) - - return nil -} - -// ImportAll iterate over all the configured repository issues and ensure the creation of the -// missing issues / timeline items / edits / label events ... -func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) { - gi.iterator = NewIterator(ctx, gi.client, 10, gi.conf[confKeyOwner], gi.conf[confKeyProject], since) out := make(chan core.ImportResult) gi.out = out @@ -61,19 +61,19 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, defer close(gi.out) // Loop over all matching issues - for gi.iterator.NextIssue() { - issue := gi.iterator.IssueValue() + for issue := range gi.mediator.Issues() { + // fmt.Println("issue loop") // create issue - b, err := gi.ensureIssue(repo, issue) + b, err := gi.ensureIssue(repo, &issue) if err != nil { err := fmt.Errorf("issue creation: %v", err) out <- core.NewImportError(err, "") return } + // fmt.Println("Just before timeline items loop") // loop over timeline items - for gi.iterator.NextTimelineItem() { - item := gi.iterator.TimelineItemValue() + for item := range gi.mediator.TimelineItems(&issue) { err := gi.ensureTimelineItem(repo, b, item) if err != nil { err = fmt.Errorf("timeline item creation: %v", err) @@ -92,7 +92,7 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, } } - if err := gi.iterator.Error(); err != nil { + if err := gi.mediator.Error(); err != nil { gi.out <- core.NewImportError(err, "") } }() @@ -100,8 +100,8 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, return out, nil } -func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issue) (*cache.BugCache, error) { - // ensure issue author +func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue *issue) (*cache.BugCache, error) { + // fmt.Printf("ensureIssue()\n") author, err := gi.ensurePerson(repo, issue.Author) if err != nil { return nil, err @@ -116,94 +116,73 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issue) (*cach return nil, err } - // get issue edits - var issueEdits []userContentEdit - for gi.iterator.NextIssueEdit() { - issueEdits = append(issueEdits, gi.iterator.IssueEditValue()) + // get first issue edit + // if it exists, then it holds the bug creation + firstEdit, hasEdit := <-gi.mediator.IssueEdits(issue) + // fmt.Printf("hasEdit == %v\n", hasEdit) + //fmt.Printf("%v\n", firstEdit) + + title := string(issue.Title) + if title == "" { + fmt.Printf("%v\n", issue) + fmt.Println("title == \"\" holds") + title = "#" + strconv.Itoa(int(issue.Number)) + fmt.Println("setting title := ", title) } - // if issueEdits is empty - if len(issueEdits) == 0 { - if err == bug.ErrBugNotExist { - cleanText, err := text.Cleanup(string(issue.Body)) - if err != nil { - return nil, err - } - - // create bug - b, _, err = repo.NewBugRaw( - author, - issue.CreatedAt.Unix(), - issue.Title, - cleanText, - nil, - map[string]string{ - core.MetaKeyOrigin: target, - metaKeyGithubId: parseId(issue.Id), - metaKeyGithubUrl: issue.Url.String(), - }) - if err != nil { - return nil, err - } - - // importing a new bug - gi.out <- core.NewImportBug(b.Id()) + if err == bug.ErrBugNotExist { + var textInput string + if hasEdit { + // use the first issue edit: it represents the bug creation itself + textInput = string(*firstEdit.Diff) + } else { + // if there are no issue edits then the issue struct holds the bug creation + textInput = string(issue.Body) } - } else { - // create bug from given issueEdits - for i, edit := range issueEdits { - if i == 0 && b != nil { - // The first edit in the github result is the issue creation itself, we already have that - continue - } - - cleanText, err := text.Cleanup(string(*edit.Diff)) - if err != nil { - return nil, err - } - - // if the bug doesn't exist - if b == nil { - // we create the bug as soon as we have a legit first edition - b, _, err = repo.NewBugRaw( - author, - issue.CreatedAt.Unix(), - issue.Title, // TODO: this is the *current* title, not the original one - cleanText, - nil, - map[string]string{ - core.MetaKeyOrigin: target, - metaKeyGithubId: parseId(issue.Id), - metaKeyGithubUrl: issue.Url.String(), - }, - ) - - if err != nil { - return nil, err - } - // importing a new bug - gi.out <- core.NewImportBug(b.Id()) - continue - } - - // other edits will be added as CommentEdit operations - target, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(issue.Id)) - if err == cache.ErrNoMatchingOp { - // original comment is missing somehow, issuing a warning - gi.out <- core.NewImportWarning(fmt.Errorf("comment ID %s to edit is missing", parseId(issue.Id)), b.Id()) - continue - } - if err != nil { - return nil, err - } - - err = gi.ensureCommentEdit(repo, b, target, edit) - if err != nil { - return nil, err - } + cleanText, err := text.Cleanup(textInput) + if err != nil { + return nil, err } + // create bug + b, _, err = repo.NewBugRaw( + author, + issue.CreatedAt.Unix(), + title, // TODO: this is the *current* title, not the original one + cleanText, + nil, + map[string]string{ + core.MetaKeyOrigin: target, + metaKeyGithubId: parseId(issue.Id), + metaKeyGithubUrl: issue.Url.String(), + }) + if err != nil { + fmt.Printf("%v\n", issue) + return nil, err + } + // importing a new bug + gi.out <- core.NewImportBug(b.Id()) + } + if b == nil { + return nil, fmt.Errorf("finding or creating issue") } + // process remaining issue edits, if they exist + for edit := range gi.mediator.IssueEdits(issue) { + // other edits will be added as CommentEdit operations + target, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(issue.Id)) + if err == cache.ErrNoMatchingOp { + // original comment is missing somehow, issuing a warning + gi.out <- core.NewImportWarning(fmt.Errorf("comment ID %s to edit is missing", parseId(issue.Id)), b.Id()) + continue + } + if err != nil { + return nil, err + } + err = gi.ensureCommentEdit(repo, b, target, edit) + if err != nil { + return nil, err + } + } return b, nil } @@ -211,14 +190,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug switch item.Typename { case "IssueComment": - // collect all comment edits - var commentEdits []userContentEdit - for gi.iterator.NextCommentEdit() { - commentEdits = append(commentEdits, gi.iterator.CommentEditValue()) - } - - // ensureTimelineComment send import events over out chanel - err := gi.ensureTimelineComment(repo, b, item.IssueComment, commentEdits) + err := gi.ensureComment(repo, b, &item.IssueComment) if err != nil { return fmt.Errorf("timeline comment creation: %v", err) } @@ -366,90 +338,64 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug return nil } -func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache.BugCache, item issueComment, edits []userContentEdit) error { - // ensure person - author, err := gi.ensurePerson(repo, item.Author) +func (gi *githubImporter) ensureComment(repo *cache.RepoCache, b *cache.BugCache, comment *issueComment) error { + author, err := gi.ensurePerson(repo, comment.Author) if err != nil { return err } - targetOpID, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(item.Id)) + targetOpID, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(comment.Id)) if err != nil && err != cache.ErrNoMatchingOp { // real error return err } - - // if no edits are given we create the comment - if len(edits) == 0 { - if err == cache.ErrNoMatchingOp { - cleanText, err := text.Cleanup(string(item.Body)) - if err != nil { - return err - } - - // add comment operation - op, err := b.AddCommentRaw( - author, - item.CreatedAt.Unix(), - cleanText, - nil, - map[string]string{ - metaKeyGithubId: parseId(item.Id), - metaKeyGithubUrl: parseId(item.Url.String()), - }, - ) - if err != nil { - return err - } - - gi.out <- core.NewImportComment(op.Id()) - return nil + firstEdit, hasEdit := <-gi.mediator.CommentEdits(comment) + if err == cache.ErrNoMatchingOp { + var textInput string + if hasEdit { + // use the first comment edit: it represents the comment creation itself + textInput = string(*firstEdit.Diff) + } else { + // if there are not comment edits, then the comment struct holds the comment creation + textInput = string(comment.Body) + } + cleanText, err := text.Cleanup(textInput) + if err != nil { + return err } - } else { - for i, edit := range edits { - if i == 0 && targetOpID != "" { - // The first edit in the github result is the comment creation itself, we already have that - continue - } - - // ensure editor identity - editor, err := gi.ensurePerson(repo, edit.Editor) - if err != nil { - return err - } - - // create comment when target is empty - if targetOpID == "" { - cleanText, err := text.Cleanup(string(*edit.Diff)) - if err != nil { - return err - } - - op, err := b.AddCommentRaw( - editor, - edit.CreatedAt.Unix(), - cleanText, - nil, - map[string]string{ - metaKeyGithubId: parseId(item.Id), - metaKeyGithubUrl: item.Url.String(), - }, - ) - if err != nil { - return err - } - gi.out <- core.NewImportComment(op.Id()) + // add comment operation + op, err := b.AddCommentRaw( + author, + comment.CreatedAt.Unix(), + cleanText, + nil, + map[string]string{ + metaKeyGithubId: parseId(comment.Id), + metaKeyGithubUrl: comment.Url.String(), + }, + ) + if err != nil { + return err + } - // set target for the next edit now that the comment is created - targetOpID = op.Id() - continue - } + gi.out <- core.NewImportComment(op.Id()) + targetOpID = op.Id() + } + if targetOpID == "" { + return fmt.Errorf("finding or creating issue comment") + } + // process remaining comment edits, if they exist + for edit := range gi.mediator.CommentEdits(comment) { + // ensure editor identity + _, err := gi.ensurePerson(repo, edit.Editor) + if err != nil { + return err + } - err = gi.ensureCommentEdit(repo, b, targetOpID, edit) - if err != nil { - return err - } + err = gi.ensureCommentEdit(repo, b, targetOpID, edit) + if err != nil { + return err } } return nil @@ -521,7 +467,6 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca } // importing a new identity - var name string var email string @@ -565,41 +510,27 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca } func (gi *githubImporter) getGhost(repo *cache.RepoCache) (*cache.IdentityCache, error) { + loginName := "ghost" // Look first in the cache - i, err := repo.ResolveIdentityImmutableMetadata(metaKeyGithubLogin, "ghost") + i, err := repo.ResolveIdentityImmutableMetadata(metaKeyGithubLogin, loginName) if err == nil { return i, nil } if entity.IsErrMultipleMatch(err) { return nil, err } - - var q ghostQuery - - variables := map[string]interface{}{ - "login": githubv4.String("ghost"), + user, err := gi.ghUser(loginName) + userName := "" + if user.Name != nil { + userName = string(*user.Name) } - - ctx, cancel := context.WithTimeout(gi.iterator.ctx, defaultTimeout) - defer cancel() - - err = gi.client.Query(ctx, &q, variables) - if err != nil { - return nil, err - } - - var name string - if q.User.Name != nil { - name = string(*q.User.Name) - } - return repo.NewIdentityRaw( - name, + userName, "", - string(q.User.Login), - string(q.User.AvatarUrl), + string(user.Login), + string(user.AvatarUrl), map[string]string{ - metaKeyGithubLogin: string(q.User.Login), + metaKeyGithubLogin: string(user.Login), }, ) } diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go new file mode 100644 index 00000000..428c5d36 --- /dev/null +++ b/bridge/github/import_mediator.go @@ -0,0 +1,394 @@ +package github + +import ( + "context" + "fmt" + "runtime" + "sync" + "time" + + "github.com/shurcooL/githubv4" +) + +type varmap map[string]interface{} + +func trace() { + pc := make([]uintptr, 15) + n := runtime.Callers(2, pc) + frames := runtime.CallersFrames(pc[:n]) + frame, _ := frames.Next() + fmt.Printf("%s:%d %s\n", frame.File, frame.Line, frame.Function) +} + +const ( + NUM_ISSUES = 50 + NUM_ISSUE_EDITS = 99 + NUM_TIMELINE_ITEMS = 99 + NUM_COMMENT_EDITS = 99 + + CHAN_CAPACITY = 128 +) + +// TODO: remove all debug output and trace() in all files. Use ag + +type importMediator struct { + // Github graphql client + gc *githubv4.Client + owner string + project string + // The iterator will only query issues updated or created after the date given in + // the variable since. + since time.Time + + issues chan issue + issueEditsMut sync.Mutex + timelineItemsMut sync.Mutex + commentEditsMut sync.Mutex + issueEdits map[githubv4.ID]chan userContentEdit + timelineItems map[githubv4.ID]chan timelineItem + commentEdits map[githubv4.ID]chan userContentEdit + + // Sticky error + err error +} + +func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator { + mm := importMediator{ + gc: client, + owner: owner, + project: project, + since: since, + issues: make(chan issue, CHAN_CAPACITY), + issueEditsMut: sync.Mutex{}, + timelineItemsMut: sync.Mutex{}, + commentEditsMut: sync.Mutex{}, + issueEdits: make(map[githubv4.ID]chan userContentEdit), + timelineItems: make(map[githubv4.ID]chan timelineItem), + commentEdits: make(map[githubv4.ID]chan userContentEdit), + err: nil, + } + go func() { + defer close(mm.issues) + mm.fillChannels(ctx) + }() + return &mm +} + +func (mm *importMediator) Issues() <-chan issue { + return mm.issues +} + +func (mm *importMediator) IssueEdits(issue *issue) <-chan userContentEdit { + mm.issueEditsMut.Lock() + channel := mm.issueEdits[issue.Id] + mm.issueEditsMut.Unlock() + return channel +} + +func (mm *importMediator) TimelineItems(issue *issue) <-chan timelineItem { + mm.timelineItemsMut.Lock() + channel := mm.timelineItems[issue.Id] + mm.timelineItemsMut.Unlock() + return channel +} + +func (mm *importMediator) CommentEdits(comment *issueComment) <-chan userContentEdit { + mm.commentEditsMut.Lock() + channel := mm.commentEdits[comment.Id] + mm.commentEditsMut.Unlock() + return channel +} + +func (mm *importMediator) Error() error { + return mm.err +} + +func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) { + query := userQuery{} + vars := varmap{"login": githubv4.String(loginName)} + c, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + if err := mm.mQuery(c, &query, vars); err != nil { + return nil, err + } + return &query.User, nil +} + +func (mm *importMediator) fillChannels(ctx context.Context) { + issueCursor := githubv4.String("") + for { + issues, hasIssues := mm.queryIssue(ctx, issueCursor) + if !hasIssues { + break + } + issueCursor = issues.PageInfo.EndCursor + for _, issueNode := range issues.Nodes { + // fmt.Printf(">>> issue: %v\n", issueNode.issue.Title) + mm.fillChannelIssueEdits(ctx, &issueNode) + mm.fillChannelTimeline(ctx, &issueNode) + // To avoid race conditions add the issue only after all its edits, + // timeline times, etc. are added to their respective channels. + mm.issues <- issueNode.issue + } + } +} + +func (mm *importMediator) fillChannelIssueEdits(ctx context.Context, issueNode *issueNode) { + // fmt.Printf("fillChannelIssueEdit() issue id == %v\n", issueNode.issue.Id) + // fmt.Printf("%v\n", issueNode) + channel := make(chan userContentEdit, CHAN_CAPACITY) + defer close(channel) + mm.issueEditsMut.Lock() + mm.issueEdits[issueNode.issue.Id] = channel + mm.issueEditsMut.Unlock() + edits := &issueNode.UserContentEdits + hasEdits := true + for hasEdits { + // fmt.Println("before the reversed loop") + for edit := range reverse(edits.Nodes) { + // fmt.Println("in the reversed loop") + if edit.Diff == nil || string(*edit.Diff) == "" { + // issueEdit.Diff == nil happen if the event is older than + // early 2018, Github doesn't have the data before that. + // Best we can do is to ignore the event. + continue + } + // fmt.Printf("about to push issue edit\n") + channel <- edit + } + // fmt.Printf("has next ? %v\n", edits.PageInfo.HasNextPage) + // fmt.Printf("has previous ? %v\n", edits.PageInfo.HasPreviousPage) + if !edits.PageInfo.HasPreviousPage { + break + } + edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor) + } +} + +func (mm *importMediator) fillChannelTimeline(ctx context.Context, issueNode *issueNode) { + // fmt.Printf("fullChannelTimeline()\n") + channel := make(chan timelineItem, CHAN_CAPACITY) + defer close(channel) + mm.timelineItemsMut.Lock() + mm.timelineItems[issueNode.issue.Id] = channel + mm.timelineItemsMut.Unlock() + items := &issueNode.TimelineItems + hasItems := true + for hasItems { + for _, item := range items.Nodes { + channel <- item + mm.fillChannelCommentEdits(ctx, &item) + } + // fmt.Printf("has next ? %v\n", items.PageInfo.HasNextPage) + // fmt.Printf("has previous ? %v\n", items.PageInfo.HasPreviousPage) + if !items.PageInfo.HasNextPage { + break + } + items, hasItems = mm.queryTimelineItems(ctx, issueNode.issue.Id, items.PageInfo.EndCursor) + } +} + +func (mm *importMediator) fillChannelCommentEdits(ctx context.Context, item *timelineItem) { + // This concerns only timeline items of type comment + if item.Typename != "IssueComment" { + return + } + comment := &item.IssueComment + channel := make(chan userContentEdit, CHAN_CAPACITY) + defer close(channel) + mm.commentEditsMut.Lock() + mm.commentEdits[comment.Id] = channel + mm.commentEditsMut.Unlock() + edits := &comment.UserContentEdits + hasEdits := true + for hasEdits { + for edit := range reverse(edits.Nodes) { + if edit.Diff == nil || string(*edit.Diff) == "" { + // issueEdit.Diff == nil happen if the event is older than + // early 2018, Github doesn't have the data before that. + // Best we can do is to ignore the event. + continue + } + channel <- edit + } + if !edits.PageInfo.HasPreviousPage { + break + } + edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor) + } +} + +func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) { + // trace() + vars := varmap{ + "gqlNodeId": nid, + "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), + } + if cursor == "" { + vars["commentEditBefore"] = (*githubv4.String)(nil) + } else { + vars["commentEditBefore"] = cursor + } + c, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + query := commentEditQuery{} + if err := mm.mQuery(c, &query, vars); err != nil { + mm.err = err + return nil, false + } + connection := &query.Node.IssueComment.UserContentEdits + if len(connection.Nodes) <= 0 { + return nil, false + } + return connection, true +} + +func (mm *importMediator) queryTimelineItems(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) { + // trace() + vars := varmap{ + "gqlNodeId": nid, + "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS), + "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), + "commentEditBefore": (*githubv4.String)(nil), + } + if cursor == "" { + vars["timelineAfter"] = (*githubv4.String)(nil) + } else { + vars["timelineAfter"] = cursor + } + c, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + query := timelineQuery{} + if err := mm.mQuery(c, &query, vars); err != nil { + mm.err = err + return nil, false + } + connection := &query.Node.Issue.TimelineItems + if len(connection.Nodes) <= 0 { + return nil, false + } + return connection, true +} + +func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) { + // trace() + vars := varmap{ + "gqlNodeId": nid, + "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS), + } + if cursor == "" { + vars["issueEditBefore"] = (*githubv4.String)(nil) + } else { + vars["issueEditBefore"] = cursor + } + c, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + query := issueEditQuery{} + if err := mm.mQuery(c, &query, vars); err != nil { + mm.err = err + return nil, false + } + connection := &query.Node.Issue.UserContentEdits + if len(connection.Nodes) <= 0 { + return nil, false + } + return connection, true +} + +func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) { + // trace() + vars := varmap{ + "owner": githubv4.String(mm.owner), + "name": githubv4.String(mm.project), + "issueSince": githubv4.DateTime{Time: mm.since}, + "issueFirst": githubv4.Int(NUM_ISSUES), + "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS), + "issueEditBefore": (*githubv4.String)(nil), + "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS), + "timelineAfter": (*githubv4.String)(nil), + "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), + "commentEditBefore": (*githubv4.String)(nil), + } + if cursor == "" { + vars["issueAfter"] = (*githubv4.String)(nil) + } else { + vars["issueAfter"] = githubv4.String(cursor) + } + c, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + query := issueQuery{} + if err := mm.mQuery(c, &query, vars); err != nil { + mm.err = err + return nil, false + } + connection := &query.Repository.Issues + if len(connection.Nodes) <= 0 { + return nil, false + } + return connection, true +} + +func reverse(eds []userContentEdit) chan userContentEdit { + ret := make(chan userContentEdit) + go func() { + for i := range eds { + ret <- eds[len(eds)-1-i] + } + close(ret) + }() + return ret +} + +type rateLimiter interface { + rateLimit() rateLimit +} + +// TODO: move that into its own file +// +// mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL +// query and it is used to populate the response into it. It should be a pointer to a struct +// that corresponds to the Github graphql schema and it should implement the rateLimiter +// interface. This function queries Github for the remaining rate limit points before +// executing the actual query. The function waits, if there are not enough rate limiting +// points left. +func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error { + // First: check the cost of the query and wait if necessary + vars["dryRun"] = githubv4.Boolean(true) + qctx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + if err := mm.gc.Query(qctx, query, vars); err != nil { + return err + } + fmt.Printf("%v\n", query) + rateLimit := query.rateLimit() + if rateLimit.Cost > rateLimit.Remaining { + resetTime := rateLimit.ResetAt.Time + fmt.Println("Github rate limit exhausted") + fmt.Printf("Sleeping until %s\n", resetTime.String()) + // Add a few seconds (8) for good measure + timer := time.NewTimer(time.Until(resetTime.Add(8 * time.Second))) + select { + case <-ctx.Done(): + stop(timer) + return ctx.Err() + case <-timer.C: + } + } + // Second: Do the actual query + vars["dryRun"] = githubv4.Boolean(false) + qctx, cancel = context.WithTimeout(ctx, defaultTimeout) + defer cancel() + if err := mm.gc.Query(qctx, query, vars); err != nil { + return err + } + return nil +} + +func stop(t *time.Timer) { + if !t.Stop() { + select { + case <-t.C: + default: + } + } +} diff --git a/bridge/github/import_query.go b/bridge/github/import_query.go index 228d204a..77c95e1d 100644 --- a/bridge/github/import_query.go +++ b/bridge/github/import_query.go @@ -2,37 +2,123 @@ package github import "github.com/shurcooL/githubv4" -type pageInfo struct { - EndCursor githubv4.String - HasNextPage bool - StartCursor githubv4.String - HasPreviousPage bool +type userQuery struct { + RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"` + User user `graphql:"user(login: $login)"` } -type actor struct { - Typename githubv4.String `graphql:"__typename"` +func (q userQuery) rateLimit() rateLimit { + return q.RateLimit +} + +type labelsQuery struct { + //RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"` + Repository struct { + Labels struct { + Nodes []struct { + ID string `graphql:"id"` + Name string `graphql:"name"` + Color string `graphql:"color"` + Description string `graphql:"description"` + } + PageInfo pageInfo + } `graphql:"labels(first: $first, after: $after)"` + } `graphql:"repository(owner: $owner, name: $name)"` +} + +type loginQuery struct { + //RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"` + Viewer struct { + Login string `graphql:"login"` + } `graphql:"viewer"` +} + +type issueQuery struct { + RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"` + Repository struct { + Issues issueConnection `graphql:"issues(first: $issueFirst, after: $issueAfter, orderBy: {field: CREATED_AT, direction: ASC}, filterBy: {since: $issueSince})"` + } `graphql:"repository(owner: $owner, name: $name)"` +} + +func (q issueQuery) rateLimit() rateLimit { + return q.RateLimit +} + +type issueEditQuery struct { + RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"` + Node struct { + Typename githubv4.String `graphql:"__typename"` + Issue struct { + UserContentEdits userContentEditConnection `graphql:"userContentEdits(last: $issueEditLast, before: $issueEditBefore)"` + } `graphql:"... on Issue"` + } `graphql:"node(id: $gqlNodeId)"` +} + +func (q issueEditQuery) rateLimit() rateLimit { + return q.RateLimit +} + +type timelineQuery struct { + RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"` + Node struct { + Typename githubv4.String `graphql:"__typename"` + Issue struct { + TimelineItems timelineItemsConnection `graphql:"timelineItems(first: $timelineFirst, after: $timelineAfter)"` + } `graphql:"... on Issue"` + } `graphql:"node(id: $gqlNodeId)"` +} + +func (q timelineQuery) rateLimit() rateLimit { + return q.RateLimit +} + +type commentEditQuery struct { + RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"` + Node struct { + Typename githubv4.String `graphql:"__typename"` + IssueComment struct { + UserContentEdits userContentEditConnection `graphql:"userContentEdits(last: $commentEditLast, before: $commentEditBefore)"` + } `graphql:"... on IssueComment"` + } `graphql:"node(id: $gqlNodeId)"` +} + +func (q commentEditQuery) rateLimit() rateLimit { + return q.RateLimit +} + +type user struct { Login githubv4.String AvatarUrl githubv4.String - User struct { - Name *githubv4.String - Email githubv4.String - } `graphql:"... on User"` - Organization struct { - Name *githubv4.String - Email *githubv4.String - } `graphql:"... on Organization"` + Name *githubv4.String } -type actorEvent struct { - Id githubv4.ID - CreatedAt githubv4.DateTime - Actor *actor +type issueConnection struct { + Nodes []issueNode + PageInfo pageInfo } -type authorEvent struct { - Id githubv4.ID - CreatedAt githubv4.DateTime - Author *actor +type issueNode struct { + issue + UserContentEdits userContentEditConnection `graphql:"userContentEdits(last: $issueEditLast, before: $issueEditBefore)"` + TimelineItems timelineItemsConnection `graphql:"timelineItems(first: $timelineFirst, after: $timelineAfter)"` +} + +type issue struct { + authorEvent + Title githubv4.String + Number githubv4.Int + Body githubv4.String + Url githubv4.URI +} + +type timelineItemsConnection struct { + Nodes []timelineItem + PageInfo pageInfo +} + +type userContentEditConnection struct { + Nodes []userContentEdit + PageInfo pageInfo } type userContentEdit struct { @@ -46,12 +132,6 @@ type userContentEdit struct { Diff *githubv4.String } -type issueComment struct { - authorEvent // NOTE: contains Id - Body githubv4.String - Url githubv4.URI -} - type timelineItem struct { Typename githubv4.String `graphql:"__typename"` @@ -91,84 +171,52 @@ type timelineItem struct { } `graphql:"... on RenamedTitleEvent"` } -type ghostQuery struct { - User struct { - Login githubv4.String - AvatarUrl githubv4.String - Name *githubv4.String - } `graphql:"user(login: $login)"` -} - -type labelsQuery struct { - Repository struct { - Labels struct { - Nodes []struct { - ID string `graphql:"id"` - Name string `graphql:"name"` - Color string `graphql:"color"` - Description string `graphql:"description"` - } - PageInfo pageInfo - } `graphql:"labels(first: $first, after: $after)"` - } `graphql:"repository(owner: $owner, name: $name)"` -} +type issueComment struct { + authorEvent // NOTE: contains Id + Body githubv4.String + Url githubv4.URI -type loginQuery struct { - Viewer struct { - Login string `graphql:"login"` - } `graphql:"viewer"` + UserContentEdits userContentEditConnection `graphql:"userContentEdits(last: $commentEditLast, before: $commentEditBefore)"` } -type issueQuery struct { - Repository struct { - Issues struct { - Nodes []issue - PageInfo pageInfo - } `graphql:"issues(first: $issueFirst, after: $issueAfter, orderBy: {field: CREATED_AT, direction: ASC}, filterBy: {since: $issueSince})"` - } `graphql:"repository(owner: $owner, name: $name)"` +type actor struct { + Typename githubv4.String `graphql:"__typename"` + Login githubv4.String + AvatarUrl githubv4.String + User struct { + Name *githubv4.String + Email githubv4.String + } `graphql:"... on User"` + Organization struct { + Name *githubv4.String + Email *githubv4.String + } `graphql:"... on Organization"` } -type issue struct { - authorEvent - Title string - Number githubv4.Int - Body githubv4.String - Url githubv4.URI +type actorEvent struct { + Id githubv4.ID + CreatedAt githubv4.DateTime + Actor *actor } -type issueEditQuery struct { - Node struct { - Typename githubv4.String `graphql:"__typename"` - Issue struct { - UserContentEdits struct { - Nodes []userContentEdit - TotalCount githubv4.Int - PageInfo pageInfo - } `graphql:"userContentEdits(last: $issueEditLast, before: $issueEditBefore)"` - } `graphql:"... on Issue"` - } `graphql:"node(id: $gqlNodeId)"` +type authorEvent struct { + Id githubv4.ID + CreatedAt githubv4.DateTime + Author *actor } -type timelineQuery struct { - Node struct { - Typename githubv4.String `graphql:"__typename"` - Issue struct { - TimelineItems struct { - Nodes []timelineItem - PageInfo pageInfo - } `graphql:"timelineItems(first: $timelineFirst, after: $timelineAfter)"` - } `graphql:"... on Issue"` - } `graphql:"node(id: $gqlNodeId)"` +type pageInfo struct { + EndCursor githubv4.String + HasNextPage bool + StartCursor githubv4.String + HasPreviousPage bool } -type commentEditQuery struct { - Node struct { - Typename githubv4.String `graphql:"__typename"` - IssueComment struct { - UserContentEdits struct { - Nodes []userContentEdit - PageInfo pageInfo - } `graphql:"userContentEdits(last: $commentEditLast, before: $commentEditBefore)"` - } `graphql:"... on IssueComment"` - } `graphql:"node(id: $gqlNodeId)"` +type rateLimit struct { + Cost githubv4.Int + Limit githubv4.Int + NodeCount githubv4.Int + Remaining githubv4.Int + ResetAt githubv4.DateTime + Used githubv4.Int } diff --git a/bridge/github/iterator.go b/bridge/github/iterator.go deleted file mode 100644 index d21faae8..00000000 --- a/bridge/github/iterator.go +++ /dev/null @@ -1,423 +0,0 @@ -package github - -import ( - "context" - "time" - - "github.com/pkg/errors" - "github.com/shurcooL/githubv4" -) - -type iterator struct { - // Github graphql client - gc *githubv4.Client - - // The iterator will only query issues updated or created after the date given in - // the variable since. - since time.Time - - // Shared context, which is used for all graphql queries. - ctx context.Context - - // Sticky error - err error - - // Issue iterator - issueIter issueIter -} - -type issueIter struct { - iterVars - query issueQuery - issueEditIter []issueEditIter - timelineIter []timelineIter -} - -type issueEditIter struct { - iterVars - query issueEditQuery -} - -type timelineIter struct { - iterVars - query timelineQuery - commentEditIter []commentEditIter -} - -type commentEditIter struct { - iterVars - query commentEditQuery -} - -type iterVars struct { - // Iterator index - index int - - // capacity is the number of elements (issues, issue edits, timeline items, or - // comment edits) to query at a time. More capacity = more used memory = - // less queries to make. - capacity int - - // Variable assignments for graphql query - variables varmap -} - -type varmap map[string]interface{} - -func newIterVars(capacity int) iterVars { - return iterVars{ - index: -1, - capacity: capacity, - variables: varmap{}, - } -} - -// NewIterator creates and initialize a new iterator. -func NewIterator(ctx context.Context, client *githubv4.Client, capacity int, owner, project string, since time.Time) *iterator { - i := &iterator{ - gc: client, - since: since, - ctx: ctx, - issueIter: issueIter{ - iterVars: newIterVars(capacity), - timelineIter: make([]timelineIter, capacity), - issueEditIter: make([]issueEditIter, capacity), - }, - } - i.issueIter.variables.setOwnerProject(owner, project) - for idx := range i.issueIter.issueEditIter { - ie := &i.issueIter.issueEditIter[idx] - ie.iterVars = newIterVars(capacity) - } - for i1 := range i.issueIter.timelineIter { - tli := &i.issueIter.timelineIter[i1] - tli.iterVars = newIterVars(capacity) - tli.commentEditIter = make([]commentEditIter, capacity) - for i2 := range tli.commentEditIter { - cei := &tli.commentEditIter[i2] - cei.iterVars = newIterVars(capacity) - } - } - i.resetIssueVars() - return i -} - -func (v *varmap) setOwnerProject(owner, project string) { - (*v)["owner"] = githubv4.String(owner) - (*v)["name"] = githubv4.String(project) -} - -func (i *iterator) resetIssueVars() { - vars := &i.issueIter.variables - (*vars)["issueFirst"] = githubv4.Int(i.issueIter.capacity) - (*vars)["issueAfter"] = (*githubv4.String)(nil) - (*vars)["issueSince"] = githubv4.DateTime{Time: i.since} - i.issueIter.query.Repository.Issues.PageInfo.HasNextPage = true - i.issueIter.query.Repository.Issues.PageInfo.EndCursor = "" -} - -func (i *iterator) resetIssueEditVars() { - for idx := range i.issueIter.issueEditIter { - ie := &i.issueIter.issueEditIter[idx] - ie.variables["issueEditLast"] = githubv4.Int(ie.capacity) - ie.variables["issueEditBefore"] = (*githubv4.String)(nil) - ie.query.Node.Issue.UserContentEdits.PageInfo.HasNextPage = true - ie.query.Node.Issue.UserContentEdits.PageInfo.EndCursor = "" - } -} - -func (i *iterator) resetTimelineVars() { - for idx := range i.issueIter.timelineIter { - ip := &i.issueIter.timelineIter[idx] - ip.variables["timelineFirst"] = githubv4.Int(ip.capacity) - ip.variables["timelineAfter"] = (*githubv4.String)(nil) - ip.query.Node.Issue.TimelineItems.PageInfo.HasNextPage = true - ip.query.Node.Issue.TimelineItems.PageInfo.EndCursor = "" - } -} - -func (i *iterator) resetCommentEditVars() { - for i1 := range i.issueIter.timelineIter { - for i2 := range i.issueIter.timelineIter[i1].commentEditIter { - ce := &i.issueIter.timelineIter[i1].commentEditIter[i2] - ce.variables["commentEditLast"] = githubv4.Int(ce.capacity) - ce.variables["commentEditBefore"] = (*githubv4.String)(nil) - ce.query.Node.IssueComment.UserContentEdits.PageInfo.HasNextPage = true - ce.query.Node.IssueComment.UserContentEdits.PageInfo.EndCursor = "" - } - } -} - -// Error return last encountered error -func (i *iterator) Error() error { - if i.err != nil { - return i.err - } - return i.ctx.Err() // might return nil -} - -func (i *iterator) HasError() bool { - return i.err != nil || i.ctx.Err() != nil -} - -func (i *iterator) currIssueItem() *issue { - return &i.issueIter.query.Repository.Issues.Nodes[i.issueIter.index] -} - -func (i *iterator) currIssueEditIter() *issueEditIter { - return &i.issueIter.issueEditIter[i.issueIter.index] -} - -func (i *iterator) currTimelineIter() *timelineIter { - return &i.issueIter.timelineIter[i.issueIter.index] -} - -func (i *iterator) currCommentEditIter() *commentEditIter { - timelineIter := i.currTimelineIter() - return &timelineIter.commentEditIter[timelineIter.index] -} - -func (i *iterator) currIssueGqlNodeId() githubv4.ID { - return i.currIssueItem().Id -} - -// NextIssue returns true if there exists a next issue and advances the iterator by one. -// It is used to iterate over all issues. Queries to github are made when necessary. -func (i *iterator) NextIssue() bool { - if i.HasError() { - return false - } - index := &i.issueIter.index - issues := &i.issueIter.query.Repository.Issues - issueItems := &issues.Nodes - if 0 <= *index && *index < len(*issueItems)-1 { - *index += 1 - return true - } - - if !issues.PageInfo.HasNextPage { - return false - } - nextIssue := i.queryIssue() - return nextIssue -} - -// IssueValue returns the actual issue value. -func (i *iterator) IssueValue() issue { - return *i.currIssueItem() -} - -func (i *iterator) queryIssue() bool { - ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout) - defer cancel() - if endCursor := i.issueIter.query.Repository.Issues.PageInfo.EndCursor; endCursor != "" { - i.issueIter.variables["issueAfter"] = endCursor - } - if err := i.gc.Query(ctx, &i.issueIter.query, i.issueIter.variables); err != nil { - i.err = err - return false - } - i.resetIssueEditVars() - i.resetTimelineVars() - issueItems := &i.issueIter.query.Repository.Issues.Nodes - if len(*issueItems) <= 0 { - i.issueIter.index = -1 - return false - } - i.issueIter.index = 0 - return true -} - -// NextIssueEdit returns true if there exists a next issue edit and advances the iterator -// by one. It is used to iterate over all the issue edits. Queries to github are made when -// necessary. -func (i *iterator) NextIssueEdit() bool { - if i.HasError() { - return false - } - ieIter := i.currIssueEditIter() - ieIdx := &ieIter.index - ieItems := ieIter.query.Node.Issue.UserContentEdits - if 0 <= *ieIdx && *ieIdx < len(ieItems.Nodes)-1 { - *ieIdx += 1 - return i.nextValidIssueEdit() - } - if !ieItems.PageInfo.HasNextPage { - return false - } - querySucc := i.queryIssueEdit() - if !querySucc { - return false - } - return i.nextValidIssueEdit() -} - -func (i *iterator) nextValidIssueEdit() bool { - // issueEdit.Diff == nil happen if the event is older than early 2018, Github doesn't have - // the data before that. Best we can do is to ignore the event. - if issueEdit := i.IssueEditValue(); issueEdit.Diff == nil || string(*issueEdit.Diff) == "" { - return i.NextIssueEdit() - } - return true -} - -// IssueEditValue returns the actual issue edit value. -func (i *iterator) IssueEditValue() userContentEdit { - iei := i.currIssueEditIter() - return iei.query.Node.Issue.UserContentEdits.Nodes[iei.index] -} - -func (i *iterator) queryIssueEdit() bool { - ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout) - defer cancel() - iei := i.currIssueEditIter() - if endCursor := iei.query.Node.Issue.UserContentEdits.PageInfo.EndCursor; endCursor != "" { - iei.variables["issueEditBefore"] = endCursor - } - iei.variables["gqlNodeId"] = i.currIssueGqlNodeId() - if err := i.gc.Query(ctx, &iei.query, iei.variables); err != nil { - i.err = err - return false - } - issueEditItems := iei.query.Node.Issue.UserContentEdits.Nodes - if len(issueEditItems) <= 0 { - iei.index = -1 - return false - } - // The UserContentEditConnection in the Github API serves its elements in reverse chronological - // order. For our purpose we have to reverse the edits. - reverseEdits(issueEditItems) - iei.index = 0 - return true -} - -// NextTimelineItem returns true if there exists a next timeline item and advances the iterator -// by one. It is used to iterate over all the timeline items. Queries to github are made when -// necessary. -func (i *iterator) NextTimelineItem() bool { - if i.HasError() { - return false - } - tlIter := &i.issueIter.timelineIter[i.issueIter.index] - tlIdx := &tlIter.index - tlItems := tlIter.query.Node.Issue.TimelineItems - if 0 <= *tlIdx && *tlIdx < len(tlItems.Nodes)-1 { - *tlIdx += 1 - return true - } - if !tlItems.PageInfo.HasNextPage { - return false - } - nextTlItem := i.queryTimeline() - return nextTlItem -} - -// TimelineItemValue returns the actual timeline item value. -func (i *iterator) TimelineItemValue() timelineItem { - tli := i.currTimelineIter() - return tli.query.Node.Issue.TimelineItems.Nodes[tli.index] -} - -func (i *iterator) queryTimeline() bool { - ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout) - defer cancel() - tli := i.currTimelineIter() - if endCursor := tli.query.Node.Issue.TimelineItems.PageInfo.EndCursor; endCursor != "" { - tli.variables["timelineAfter"] = endCursor - } - tli.variables["gqlNodeId"] = i.currIssueGqlNodeId() - if err := i.gc.Query(ctx, &tli.query, tli.variables); err != nil { - i.err = err - return false - } - i.resetCommentEditVars() - timelineItems := &tli.query.Node.Issue.TimelineItems - if len(timelineItems.Nodes) <= 0 { - tli.index = -1 - return false - } - tli.index = 0 - return true -} - -// NextCommentEdit returns true if there exists a next comment edit and advances the iterator -// by one. It is used to iterate over all issue edits. Queries to github are made when -// necessary. -func (i *iterator) NextCommentEdit() bool { - if i.HasError() { - return false - } - - tmlnVal := i.TimelineItemValue() - if tmlnVal.Typename != "IssueComment" { - // The timeline iterator does not point to a comment. - i.err = errors.New("Call to NextCommentEdit() while timeline item is not a comment") - return false - } - - cei := i.currCommentEditIter() - ceIdx := &cei.index - ceItems := &cei.query.Node.IssueComment.UserContentEdits - if 0 <= *ceIdx && *ceIdx < len(ceItems.Nodes)-1 { - *ceIdx += 1 - return i.nextValidCommentEdit() - } - if !ceItems.PageInfo.HasNextPage { - return false - } - querySucc := i.queryCommentEdit() - if !querySucc { - return false - } - return i.nextValidCommentEdit() -} - -func (i *iterator) nextValidCommentEdit() bool { - // if comment edit diff is a nil pointer or points to an empty string look for next value - if commentEdit := i.CommentEditValue(); commentEdit.Diff == nil || string(*commentEdit.Diff) == "" { - return i.NextCommentEdit() - } - return true -} - -// CommentEditValue returns the actual comment edit value. -func (i *iterator) CommentEditValue() userContentEdit { - cei := i.currCommentEditIter() - return cei.query.Node.IssueComment.UserContentEdits.Nodes[cei.index] -} - -func (i *iterator) queryCommentEdit() bool { - ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout) - defer cancel() - cei := i.currCommentEditIter() - - if endCursor := cei.query.Node.IssueComment.UserContentEdits.PageInfo.EndCursor; endCursor != "" { - cei.variables["commentEditBefore"] = endCursor - } - tmlnVal := i.TimelineItemValue() - if tmlnVal.Typename != "IssueComment" { - i.err = errors.New("Call to queryCommentEdit() while timeline item is not a comment") - return false - } - cei.variables["gqlNodeId"] = tmlnVal.IssueComment.Id - if err := i.gc.Query(ctx, &cei.query, cei.variables); err != nil { - i.err = err - return false - } - ceItems := cei.query.Node.IssueComment.UserContentEdits.Nodes - if len(ceItems) <= 0 { - cei.index = -1 - return false - } - // The UserContentEditConnection in the Github API serves its elements in reverse chronological - // order. For our purpose we have to reverse the edits. - reverseEdits(ceItems) - cei.index = 0 - return true -} - -func reverseEdits(edits []userContentEdit) { - for i, j := 0, len(edits)-1; i < j; i, j = i+1, j-1 { - edits[i], edits[j] = edits[j], edits[i] - } -} -- cgit From 9a8e487613d99fb102e4619cb30464342b73fee7 Mon Sep 17 00:00:00 2001 From: Alexander Scharinger Date: Fri, 5 Mar 2021 20:06:21 +0100 Subject: Fix errors: deadlock and empty titles --- bridge/github/import.go | 86 ++++----- bridge/github/import_mediator.go | 370 ++++++++++++++++++++++----------------- bridge/github/import_query.go | 10 +- 3 files changed, 263 insertions(+), 203 deletions(-) (limited to 'bridge') diff --git a/bridge/github/import.go b/bridge/github/import.go index 2e36f5fe..09a39586 100644 --- a/bridge/github/import.go +++ b/bridge/github/import.go @@ -3,7 +3,6 @@ package github import ( "context" "fmt" - "strconv" "time" "github.com/shurcooL/githubv4" @@ -16,6 +15,8 @@ import ( "github.com/MichaelMure/git-bug/util/text" ) +const EMPTY_TITLE_PLACEHOLDER = "" + // githubImporter implement the Importer interface type githubImporter struct { conf core.Configuration @@ -25,9 +26,6 @@ type githubImporter struct { // send only channel out chan<- core.ImportResult - - // closure to get the username from github without any additional parameters - ghUser func(string) (*user, error) } func (gi *githubImporter) Init(_ context.Context, _ *cache.RepoCache, conf core.Configuration) error { @@ -51,9 +49,6 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, } client := buildClient(creds[0].(*auth.Token)) gi.mediator = NewImportMediator(ctx, client, gi.conf[confKeyOwner], gi.conf[confKeyProject], since) - gi.ghUser = func(login string) (*user, error) { - return gi.mediator.User(ctx, login) - } out := make(chan core.ImportResult) gi.out = out @@ -62,19 +57,17 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, // Loop over all matching issues for issue := range gi.mediator.Issues() { - // fmt.Println("issue loop") // create issue - b, err := gi.ensureIssue(repo, &issue) + b, err := gi.ensureIssue(ctx, repo, &issue) if err != nil { err := fmt.Errorf("issue creation: %v", err) out <- core.NewImportError(err, "") return } - // fmt.Println("Just before timeline items loop") // loop over timeline items for item := range gi.mediator.TimelineItems(&issue) { - err := gi.ensureTimelineItem(repo, b, item) + err := gi.ensureTimelineItem(ctx, repo, b, item) if err != nil { err = fmt.Errorf("timeline item creation: %v", err) out <- core.NewImportError(err, "") @@ -100,9 +93,8 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, return out, nil } -func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue *issue) (*cache.BugCache, error) { - // fmt.Printf("ensureIssue()\n") - author, err := gi.ensurePerson(repo, issue.Author) +func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue) (*cache.BugCache, error) { + author, err := gi.ensurePerson(ctx, repo, issue.Author) if err != nil { return nil, err } @@ -119,15 +111,15 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue *issue) (*cac // get first issue edit // if it exists, then it holds the bug creation firstEdit, hasEdit := <-gi.mediator.IssueEdits(issue) - // fmt.Printf("hasEdit == %v\n", hasEdit) - //fmt.Printf("%v\n", firstEdit) + // At Github there exist issues with seemingly empty titles. An example is + // https://github.com/NixOS/nixpkgs/issues/72730 . + // The title provided by the GraphQL API actually consists of a space followed by a + // zero width space (U+200B). This title would cause the NewBugRaw() function to + // return an error: empty title. title := string(issue.Title) - if title == "" { - fmt.Printf("%v\n", issue) - fmt.Println("title == \"\" holds") - title = "#" + strconv.Itoa(int(issue.Number)) - fmt.Println("setting title := ", title) + if title == " \u200b" { // U+200B == zero width space + title = EMPTY_TITLE_PLACEHOLDER } if err == bug.ErrBugNotExist { @@ -156,7 +148,6 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue *issue) (*cac metaKeyGithubUrl: issue.Url.String(), }) if err != nil { - fmt.Printf("%v\n", issue) return nil, err } // importing a new bug @@ -178,7 +169,7 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue *issue) (*cac return nil, err } - err = gi.ensureCommentEdit(repo, b, target, edit) + err = gi.ensureCommentEdit(ctx, repo, b, target, edit) if err != nil { return nil, err } @@ -186,11 +177,11 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue *issue) (*cac return b, nil } -func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.BugCache, item timelineItem) error { +func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item timelineItem) error { switch item.Typename { case "IssueComment": - err := gi.ensureComment(repo, b, &item.IssueComment) + err := gi.ensureComment(ctx, repo, b, &item.IssueComment) if err != nil { return fmt.Errorf("timeline comment creation: %v", err) } @@ -206,7 +197,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug if err != cache.ErrNoMatchingOp { return err } - author, err := gi.ensurePerson(repo, item.LabeledEvent.Actor) + author, err := gi.ensurePerson(ctx, repo, item.LabeledEvent.Actor) if err != nil { return err } @@ -235,7 +226,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug if err != cache.ErrNoMatchingOp { return err } - author, err := gi.ensurePerson(repo, item.UnlabeledEvent.Actor) + author, err := gi.ensurePerson(ctx, repo, item.UnlabeledEvent.Actor) if err != nil { return err } @@ -265,7 +256,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug if err == nil { return nil } - author, err := gi.ensurePerson(repo, item.ClosedEvent.Actor) + author, err := gi.ensurePerson(ctx, repo, item.ClosedEvent.Actor) if err != nil { return err } @@ -291,7 +282,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug if err == nil { return nil } - author, err := gi.ensurePerson(repo, item.ReopenedEvent.Actor) + author, err := gi.ensurePerson(ctx, repo, item.ReopenedEvent.Actor) if err != nil { return err } @@ -317,14 +308,25 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug if err == nil { return nil } - author, err := gi.ensurePerson(repo, item.RenamedTitleEvent.Actor) + author, err := gi.ensurePerson(ctx, repo, item.RenamedTitleEvent.Actor) if err != nil { return err } + + // At Github there exist issues with seemingly empty titles. An example is + // https://github.com/NixOS/nixpkgs/issues/72730 . + // The title provided by the GraphQL API actually consists of a space followed + // by a zero width space (U+200B). This title would cause the NewBugRaw() + // function to return an error: empty title. + title := string(item.RenamedTitleEvent.CurrentTitle) + if title == " \u200b" { // U+200B == zero width space + title = EMPTY_TITLE_PLACEHOLDER + } + op, err := b.SetTitleRaw( author, item.RenamedTitleEvent.CreatedAt.Unix(), - string(item.RenamedTitleEvent.CurrentTitle), + title, map[string]string{metaKeyGithubId: id}, ) if err != nil { @@ -338,8 +340,8 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug return nil } -func (gi *githubImporter) ensureComment(repo *cache.RepoCache, b *cache.BugCache, comment *issueComment) error { - author, err := gi.ensurePerson(repo, comment.Author) +func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment) error { + author, err := gi.ensurePerson(ctx, repo, comment.Author) if err != nil { return err } @@ -388,12 +390,12 @@ func (gi *githubImporter) ensureComment(repo *cache.RepoCache, b *cache.BugCache // process remaining comment edits, if they exist for edit := range gi.mediator.CommentEdits(comment) { // ensure editor identity - _, err := gi.ensurePerson(repo, edit.Editor) + _, err := gi.ensurePerson(ctx, repo, edit.Editor) if err != nil { return err } - err = gi.ensureCommentEdit(repo, b, targetOpID, edit) + err = gi.ensureCommentEdit(ctx, repo, b, targetOpID, edit) if err != nil { return err } @@ -401,7 +403,7 @@ func (gi *githubImporter) ensureComment(repo *cache.RepoCache, b *cache.BugCache return nil } -func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit userContentEdit) error { +func (gi *githubImporter) ensureCommentEdit(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit userContentEdit) error { _, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(edit.Id)) if err == nil { return nil @@ -411,7 +413,7 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC return err } - editor, err := gi.ensurePerson(repo, edit.Editor) + editor, err := gi.ensurePerson(ctx, repo, edit.Editor) if err != nil { return err } @@ -450,11 +452,11 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC } // ensurePerson create a bug.Person from the Github data -func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*cache.IdentityCache, error) { +func (gi *githubImporter) ensurePerson(ctx context.Context, repo *cache.RepoCache, actor *actor) (*cache.IdentityCache, error) { // When a user has been deleted, Github return a null actor, while displaying a profile named "ghost" // in it's UI. So we need a special case to get it. if actor == nil { - return gi.getGhost(repo) + return gi.getGhost(ctx, repo) } // Look first in the cache @@ -509,7 +511,7 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca return i, nil } -func (gi *githubImporter) getGhost(repo *cache.RepoCache) (*cache.IdentityCache, error) { +func (gi *githubImporter) getGhost(ctx context.Context, repo *cache.RepoCache) (*cache.IdentityCache, error) { loginName := "ghost" // Look first in the cache i, err := repo.ResolveIdentityImmutableMetadata(metaKeyGithubLogin, loginName) @@ -519,7 +521,7 @@ func (gi *githubImporter) getGhost(repo *cache.RepoCache) (*cache.IdentityCache, if entity.IsErrMultipleMatch(err) { return nil, err } - user, err := gi.ghUser(loginName) + user, err := gi.mediator.User(ctx, loginName) userName := "" if user.Name != nil { userName = string(*user.Name) @@ -535,7 +537,7 @@ func (gi *githubImporter) getGhost(repo *cache.RepoCache) (*cache.IdentityCache, ) } -// parseId convert the unusable githubv4.ID (an interface{}) into a string +// parseId converts the unusable githubv4.ID (an interface{}) into a string func parseId(id githubv4.ID) string { return fmt.Sprintf("%v", id) } diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go index 428c5d36..8bd33adb 100644 --- a/bridge/github/import_mediator.go +++ b/bridge/github/import_mediator.go @@ -3,53 +3,63 @@ package github import ( "context" "fmt" - "runtime" + "strings" "sync" "time" "github.com/shurcooL/githubv4" ) -type varmap map[string]interface{} - -func trace() { - pc := make([]uintptr, 15) - n := runtime.Callers(2, pc) - frames := runtime.CallersFrames(pc[:n]) - frame, _ := frames.Next() - fmt.Printf("%s:%d %s\n", frame.File, frame.Line, frame.Function) -} - -const ( - NUM_ISSUES = 50 - NUM_ISSUE_EDITS = 99 - NUM_TIMELINE_ITEMS = 99 - NUM_COMMENT_EDITS = 99 +const ( // These values influence how fast the github graphql rate limit is exhausted. + NUM_ISSUES = 40 + NUM_ISSUE_EDITS = 100 + NUM_TIMELINE_ITEMS = 100 + NUM_COMMENT_EDITS = 100 CHAN_CAPACITY = 128 ) -// TODO: remove all debug output and trace() in all files. Use ag +type varmap map[string]interface{} +// importMediator provides an interface to retrieve Github issues. type importMediator struct { // Github graphql client - gc *githubv4.Client - owner string + gc *githubv4.Client + + // name of the repository owner on Github + owner string + + // name of the Github repository project string - // The iterator will only query issues updated or created after the date given in + + // The importMediator will only query issues updated or created after the date given in // the variable since. since time.Time - issues chan issue - issueEditsMut sync.Mutex - timelineItemsMut sync.Mutex - commentEditsMut sync.Mutex - issueEdits map[githubv4.ID]chan userContentEdit + // channel for the issues + issues chan issue + + // channel for issue edits + issueEdits map[githubv4.ID]chan userContentEdit + issueEditsMut sync.Mutex + + // channel for timeline items timelineItems map[githubv4.ID]chan timelineItem - commentEdits map[githubv4.ID]chan userContentEdit + timelineItemsMut sync.Mutex + + // channel for comment edits + commentEdits map[githubv4.ID]chan userContentEdit + commentEditsMut sync.Mutex // Sticky error - err error + err error + errMut sync.Mutex +} + +func (mm *importMediator) setError(err error) { + mm.errMut.Lock() + mm.err = err + mm.errMut.Unlock() } func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator { @@ -59,21 +69,56 @@ func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, proj project: project, since: since, issues: make(chan issue, CHAN_CAPACITY), - issueEditsMut: sync.Mutex{}, - timelineItemsMut: sync.Mutex{}, - commentEditsMut: sync.Mutex{}, issueEdits: make(map[githubv4.ID]chan userContentEdit), + issueEditsMut: sync.Mutex{}, timelineItems: make(map[githubv4.ID]chan timelineItem), + timelineItemsMut: sync.Mutex{}, commentEdits: make(map[githubv4.ID]chan userContentEdit), + commentEditsMut: sync.Mutex{}, err: nil, } go func() { - defer close(mm.issues) - mm.fillChannels(ctx) + mm.fillIssues(ctx) + close(mm.issues) }() return &mm } +func newIssueVars(owner, project string, since time.Time) varmap { + return varmap{ + "owner": githubv4.String(owner), + "name": githubv4.String(project), + "issueSince": githubv4.DateTime{Time: since}, + "issueFirst": githubv4.Int(NUM_ISSUES), + "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS), + "issueEditBefore": (*githubv4.String)(nil), + "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS), + "timelineAfter": (*githubv4.String)(nil), + "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), + "commentEditBefore": (*githubv4.String)(nil), + } +} + +func newIssueEditVars() varmap { + return varmap{ + "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS), + } +} + +func newTimelineVars() varmap { + return varmap{ + "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS), + "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), + "commentEditBefore": (*githubv4.String)(nil), + } +} + +func newCommentEditVars() varmap { + return varmap{ + "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), + } +} + func (mm *importMediator) Issues() <-chan issue { return mm.issues } @@ -100,64 +145,85 @@ func (mm *importMediator) CommentEdits(comment *issueComment) <-chan userContent } func (mm *importMediator) Error() error { - return mm.err + mm.errMut.Lock() + err := mm.err + mm.errMut.Unlock() + return err } func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) { query := userQuery{} vars := varmap{"login": githubv4.String(loginName)} - c, cancel := context.WithTimeout(ctx, defaultTimeout) - defer cancel() - if err := mm.mQuery(c, &query, vars); err != nil { + if err := mm.mQuery(ctx, &query, vars); err != nil { return nil, err } return &query.User, nil } -func (mm *importMediator) fillChannels(ctx context.Context) { - issueCursor := githubv4.String("") - for { - issues, hasIssues := mm.queryIssue(ctx, issueCursor) - if !hasIssues { - break +func (mm *importMediator) fillIssues(ctx context.Context) { + initialCursor := githubv4.String("") + issues, hasIssues := mm.queryIssue(ctx, initialCursor) + for hasIssues { + for _, node := range issues.Nodes { + // The order of statements in this loop is crucial for the correct concurrent + // execution. + // + // The issue edit channel and the timeline channel need to be added to the + // corresponding maps before the issue is sent in the issue channel. + // Otherwise, the client could try to retrieve issue edits and timeline itmes + // before these channels are even created. In this case the client would + // receive a nil channel. + issueEditChan := make(chan userContentEdit, CHAN_CAPACITY) + timelineChan := make(chan timelineItem, CHAN_CAPACITY) + mm.issueEditsMut.Lock() + mm.issueEdits[node.issue.Id] = issueEditChan + mm.issueEditsMut.Unlock() + mm.timelineItemsMut.Lock() + mm.timelineItems[node.issue.Id] = timelineChan + mm.timelineItemsMut.Unlock() + select { + case <-ctx.Done(): + return + case mm.issues <- node.issue: + } + + // We do not know whether the client reads from the issue edit channel + // or the timeline channel first. Since the capacity of any channel is limited + // any send operation may block. Hence, in order to avoid deadlocks we need + // to send over both these channels concurrently. + go func(node issueNode) { + mm.fillIssueEdits(ctx, &node, issueEditChan) + close(issueEditChan) + }(node) + go func(node issueNode) { + mm.fillTimeline(ctx, &node, timelineChan) + close(timelineChan) + }(node) } - issueCursor = issues.PageInfo.EndCursor - for _, issueNode := range issues.Nodes { - // fmt.Printf(">>> issue: %v\n", issueNode.issue.Title) - mm.fillChannelIssueEdits(ctx, &issueNode) - mm.fillChannelTimeline(ctx, &issueNode) - // To avoid race conditions add the issue only after all its edits, - // timeline times, etc. are added to their respective channels. - mm.issues <- issueNode.issue + if !issues.PageInfo.HasNextPage { + break } + issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor) } } -func (mm *importMediator) fillChannelIssueEdits(ctx context.Context, issueNode *issueNode) { - // fmt.Printf("fillChannelIssueEdit() issue id == %v\n", issueNode.issue.Id) - // fmt.Printf("%v\n", issueNode) - channel := make(chan userContentEdit, CHAN_CAPACITY) - defer close(channel) - mm.issueEditsMut.Lock() - mm.issueEdits[issueNode.issue.Id] = channel - mm.issueEditsMut.Unlock() +func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNode, channel chan userContentEdit) { edits := &issueNode.UserContentEdits hasEdits := true for hasEdits { - // fmt.Println("before the reversed loop") for edit := range reverse(edits.Nodes) { - // fmt.Println("in the reversed loop") if edit.Diff == nil || string(*edit.Diff) == "" { - // issueEdit.Diff == nil happen if the event is older than - // early 2018, Github doesn't have the data before that. - // Best we can do is to ignore the event. + // issueEdit.Diff == nil happen if the event is older than early + // 2018, Github doesn't have the data before that. Best we can do is + // to ignore the event. continue } - // fmt.Printf("about to push issue edit\n") - channel <- edit + select { + case <-ctx.Done(): + return + case channel <- edit: + } } - // fmt.Printf("has next ? %v\n", edits.PageInfo.HasNextPage) - // fmt.Printf("has previous ? %v\n", edits.PageInfo.HasPreviousPage) if !edits.PageInfo.HasPreviousPage { break } @@ -165,51 +231,64 @@ func (mm *importMediator) fillChannelIssueEdits(ctx context.Context, issueNode * } } -func (mm *importMediator) fillChannelTimeline(ctx context.Context, issueNode *issueNode) { - // fmt.Printf("fullChannelTimeline()\n") - channel := make(chan timelineItem, CHAN_CAPACITY) - defer close(channel) - mm.timelineItemsMut.Lock() - mm.timelineItems[issueNode.issue.Id] = channel - mm.timelineItemsMut.Unlock() +func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan timelineItem) { items := &issueNode.TimelineItems hasItems := true for hasItems { for _, item := range items.Nodes { - channel <- item - mm.fillChannelCommentEdits(ctx, &item) + if item.Typename == "IssueComment" { + // Here the order of statements is crucial for correct concurrency. + commentEditChan := make(chan userContentEdit, CHAN_CAPACITY) + mm.commentEditsMut.Lock() + mm.commentEdits[item.IssueComment.Id] = commentEditChan + mm.commentEditsMut.Unlock() + select { + case <-ctx.Done(): + return + case channel <- item: + } + // We need to create a new goroutine for filling the comment edit + // channel. + go func(item timelineItem) { + mm.fillCommentEdits(ctx, &item, commentEditChan) + close(commentEditChan) + }(item) + } else { + select { + case <-ctx.Done(): + return + case channel <- item: + } + } } - // fmt.Printf("has next ? %v\n", items.PageInfo.HasNextPage) - // fmt.Printf("has previous ? %v\n", items.PageInfo.HasPreviousPage) if !items.PageInfo.HasNextPage { break } - items, hasItems = mm.queryTimelineItems(ctx, issueNode.issue.Id, items.PageInfo.EndCursor) + items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor) } } -func (mm *importMediator) fillChannelCommentEdits(ctx context.Context, item *timelineItem) { - // This concerns only timeline items of type comment +func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem, channel chan userContentEdit) { + // Here we are only concerned with timeline items of type issueComment. if item.Typename != "IssueComment" { return } comment := &item.IssueComment - channel := make(chan userContentEdit, CHAN_CAPACITY) - defer close(channel) - mm.commentEditsMut.Lock() - mm.commentEdits[comment.Id] = channel - mm.commentEditsMut.Unlock() edits := &comment.UserContentEdits hasEdits := true for hasEdits { for edit := range reverse(edits.Nodes) { if edit.Diff == nil || string(*edit.Diff) == "" { - // issueEdit.Diff == nil happen if the event is older than - // early 2018, Github doesn't have the data before that. - // Best we can do is to ignore the event. + // issueEdit.Diff == nil happen if the event is older than early + // 2018, Github doesn't have the data before that. Best we can do is + // to ignore the event. continue } - channel <- edit + select { + case <-ctx.Done(): + return + case channel <- edit: + } } if !edits.PageInfo.HasPreviousPage { break @@ -219,21 +298,16 @@ func (mm *importMediator) fillChannelCommentEdits(ctx context.Context, item *tim } func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) { - // trace() - vars := varmap{ - "gqlNodeId": nid, - "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), - } + vars := newCommentEditVars() + vars["gqlNodeId"] = nid if cursor == "" { vars["commentEditBefore"] = (*githubv4.String)(nil) } else { vars["commentEditBefore"] = cursor } - c, cancel := context.WithTimeout(ctx, defaultTimeout) - defer cancel() query := commentEditQuery{} - if err := mm.mQuery(c, &query, vars); err != nil { - mm.err = err + if err := mm.mQuery(ctx, &query, vars); err != nil { + mm.setError(err) return nil, false } connection := &query.Node.IssueComment.UserContentEdits @@ -243,24 +317,17 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID return connection, true } -func (mm *importMediator) queryTimelineItems(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) { - // trace() - vars := varmap{ - "gqlNodeId": nid, - "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS), - "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), - "commentEditBefore": (*githubv4.String)(nil), - } +func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) { + vars := newTimelineVars() + vars["gqlNodeId"] = nid if cursor == "" { vars["timelineAfter"] = (*githubv4.String)(nil) } else { vars["timelineAfter"] = cursor } - c, cancel := context.WithTimeout(ctx, defaultTimeout) - defer cancel() query := timelineQuery{} - if err := mm.mQuery(c, &query, vars); err != nil { - mm.err = err + if err := mm.mQuery(ctx, &query, vars); err != nil { + mm.setError(err) return nil, false } connection := &query.Node.Issue.TimelineItems @@ -271,21 +338,16 @@ func (mm *importMediator) queryTimelineItems(ctx context.Context, nid githubv4.I } func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) { - // trace() - vars := varmap{ - "gqlNodeId": nid, - "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS), - } + vars := newIssueEditVars() + vars["gqlNodeId"] = nid if cursor == "" { vars["issueEditBefore"] = (*githubv4.String)(nil) } else { vars["issueEditBefore"] = cursor } - c, cancel := context.WithTimeout(ctx, defaultTimeout) - defer cancel() query := issueEditQuery{} - if err := mm.mQuery(c, &query, vars); err != nil { - mm.err = err + if err := mm.mQuery(ctx, &query, vars); err != nil { + mm.setError(err) return nil, false } connection := &query.Node.Issue.UserContentEdits @@ -296,29 +358,15 @@ func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, } func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) { - // trace() - vars := varmap{ - "owner": githubv4.String(mm.owner), - "name": githubv4.String(mm.project), - "issueSince": githubv4.DateTime{Time: mm.since}, - "issueFirst": githubv4.Int(NUM_ISSUES), - "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS), - "issueEditBefore": (*githubv4.String)(nil), - "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS), - "timelineAfter": (*githubv4.String)(nil), - "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), - "commentEditBefore": (*githubv4.String)(nil), - } + vars := newIssueVars(mm.owner, mm.project, mm.since) if cursor == "" { vars["issueAfter"] = (*githubv4.String)(nil) } else { vars["issueAfter"] = githubv4.String(cursor) } - c, cancel := context.WithTimeout(ctx, defaultTimeout) - defer cancel() query := issueQuery{} - if err := mm.mQuery(c, &query, vars); err != nil { - mm.err = err + if err := mm.mQuery(ctx, &query, vars); err != nil { + mm.setError(err) return nil, false } connection := &query.Repository.Issues @@ -343,30 +391,42 @@ type rateLimiter interface { rateLimit() rateLimit } -// TODO: move that into its own file -// -// mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL -// query and it is used to populate the response into it. It should be a pointer to a struct -// that corresponds to the Github graphql schema and it should implement the rateLimiter -// interface. This function queries Github for the remaining rate limit points before -// executing the actual query. The function waits, if there are not enough rate limiting -// points left. +// mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL query +// and it is used to populate the response into it. It should be a pointer to a struct that +// corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If +// there is a Github rate limiting error, then the function sleeps and retries after the rate limit +// is expired. func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error { - // First: check the cost of the query and wait if necessary - vars["dryRun"] = githubv4.Boolean(true) + // first: just send the query to the graphql api + vars["dryRun"] = githubv4.Boolean(false) qctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() + err := mm.gc.Query(qctx, query, vars) + if err == nil { + // no error: done + return nil + } + // matching the error string + if !strings.Contains(err.Error(), "API rate limit exceeded") { + // an error, but not the API rate limit error: done + return err + } + // a rate limit error + // ask the graphql api for rate limiting information + vars["dryRun"] = githubv4.Boolean(true) + qctx, cancel = context.WithTimeout(ctx, defaultTimeout) + defer cancel() if err := mm.gc.Query(qctx, query, vars); err != nil { return err } - fmt.Printf("%v\n", query) rateLimit := query.rateLimit() if rateLimit.Cost > rateLimit.Remaining { + // sleep resetTime := rateLimit.ResetAt.Time - fmt.Println("Github rate limit exhausted") - fmt.Printf("Sleeping until %s\n", resetTime.String()) // Add a few seconds (8) for good measure - timer := time.NewTimer(time.Until(resetTime.Add(8 * time.Second))) + resetTime = resetTime.Add(8 * time.Second) + fmt.Printf("Github rate limit exhausted. Sleeping until %s\n", resetTime.String()) + timer := time.NewTimer(time.Until(resetTime)) select { case <-ctx.Done(): stop(timer) @@ -374,14 +434,12 @@ func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars ma case <-timer.C: } } - // Second: Do the actual query + // run the original query again vars["dryRun"] = githubv4.Boolean(false) qctx, cancel = context.WithTimeout(ctx, defaultTimeout) defer cancel() - if err := mm.gc.Query(qctx, query, vars); err != nil { - return err - } - return nil + err = mm.gc.Query(qctx, query, vars) + return err // might be nil } func stop(t *time.Timer) { diff --git a/bridge/github/import_query.go b/bridge/github/import_query.go index 77c95e1d..c4ab2aa9 100644 --- a/bridge/github/import_query.go +++ b/bridge/github/import_query.go @@ -7,7 +7,7 @@ type userQuery struct { User user `graphql:"user(login: $login)"` } -func (q userQuery) rateLimit() rateLimit { +func (q *userQuery) rateLimit() rateLimit { return q.RateLimit } @@ -40,7 +40,7 @@ type issueQuery struct { } `graphql:"repository(owner: $owner, name: $name)"` } -func (q issueQuery) rateLimit() rateLimit { +func (q *issueQuery) rateLimit() rateLimit { return q.RateLimit } @@ -54,7 +54,7 @@ type issueEditQuery struct { } `graphql:"node(id: $gqlNodeId)"` } -func (q issueEditQuery) rateLimit() rateLimit { +func (q *issueEditQuery) rateLimit() rateLimit { return q.RateLimit } @@ -68,7 +68,7 @@ type timelineQuery struct { } `graphql:"node(id: $gqlNodeId)"` } -func (q timelineQuery) rateLimit() rateLimit { +func (q *timelineQuery) rateLimit() rateLimit { return q.RateLimit } @@ -82,7 +82,7 @@ type commentEditQuery struct { } `graphql:"node(id: $gqlNodeId)"` } -func (q commentEditQuery) rateLimit() rateLimit { +func (q *commentEditQuery) rateLimit() rateLimit { return q.RateLimit } -- cgit From 93b14c509b8260d8238ec1b32394b4a03bcd1349 Mon Sep 17 00:00:00 2001 From: Alexander Scharinger Date: Mon, 8 Mar 2021 07:53:09 +0100 Subject: Remove maps containing channels. The old implementation of the github bridge used maps to store several channels holding data obtained from the Github API. Removing the maps and simply packing data and channels together in a struct and passing it through one single channel makes the program simpler in terms of concurrency and, additionally, enables the garbage collector to free the memory gradually without any additional provisions. --- bridge/github/import.go | 35 +++++++----- bridge/github/import_mediator.go | 120 ++++++++++++++------------------------- 2 files changed, 62 insertions(+), 93 deletions(-) (limited to 'bridge') diff --git a/bridge/github/import.go b/bridge/github/import.go index 09a39586..d492488b 100644 --- a/bridge/github/import.go +++ b/bridge/github/import.go @@ -56,9 +56,12 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, defer close(gi.out) // Loop over all matching issues - for issue := range gi.mediator.Issues() { + for bundle := range gi.mediator.Issues() { + issue := bundle.issue + issueEdits := bundle.issueEdits + timelineBundles := bundle.timelineBundles // create issue - b, err := gi.ensureIssue(ctx, repo, &issue) + b, err := gi.ensureIssue(ctx, repo, &issue, issueEdits) if err != nil { err := fmt.Errorf("issue creation: %v", err) out <- core.NewImportError(err, "") @@ -66,8 +69,10 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, } // loop over timeline items - for item := range gi.mediator.TimelineItems(&issue) { - err := gi.ensureTimelineItem(ctx, repo, b, item) + for bundle := range timelineBundles { + item := bundle.timelineItem + edits := bundle.userContentEdits + err := gi.ensureTimelineItem(ctx, repo, b, &item, edits) if err != nil { err = fmt.Errorf("timeline item creation: %v", err) out <- core.NewImportError(err, "") @@ -93,7 +98,7 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, return out, nil } -func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue) (*cache.BugCache, error) { +func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue, issueEdits <-chan userContentEdit) (*cache.BugCache, error) { author, err := gi.ensurePerson(ctx, repo, issue.Author) if err != nil { return nil, err @@ -110,7 +115,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache // get first issue edit // if it exists, then it holds the bug creation - firstEdit, hasEdit := <-gi.mediator.IssueEdits(issue) + firstEdit, hasEdit := <-issueEdits // At Github there exist issues with seemingly empty titles. An example is // https://github.com/NixOS/nixpkgs/issues/72730 . @@ -157,7 +162,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache return nil, fmt.Errorf("finding or creating issue") } // process remaining issue edits, if they exist - for edit := range gi.mediator.IssueEdits(issue) { + for edit := range issueEdits { // other edits will be added as CommentEdit operations target, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(issue.Id)) if err == cache.ErrNoMatchingOp { @@ -169,7 +174,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache return nil, err } - err = gi.ensureCommentEdit(ctx, repo, b, target, edit) + err = gi.ensureCommentEdit(ctx, repo, b, target, &edit) if err != nil { return nil, err } @@ -177,11 +182,11 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache return b, nil } -func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item timelineItem) error { +func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item *timelineItem, commentEdits <-chan userContentEdit) error { switch item.Typename { case "IssueComment": - err := gi.ensureComment(ctx, repo, b, &item.IssueComment) + err := gi.ensureComment(ctx, repo, b, &item.IssueComment, commentEdits) if err != nil { return fmt.Errorf("timeline comment creation: %v", err) } @@ -340,7 +345,7 @@ func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.Re return nil } -func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment) error { +func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment, commentEdits <-chan userContentEdit) error { author, err := gi.ensurePerson(ctx, repo, comment.Author) if err != nil { return err @@ -351,7 +356,7 @@ func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCac // real error return err } - firstEdit, hasEdit := <-gi.mediator.CommentEdits(comment) + firstEdit, hasEdit := <-commentEdits if err == cache.ErrNoMatchingOp { var textInput string if hasEdit { @@ -388,14 +393,14 @@ func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCac return fmt.Errorf("finding or creating issue comment") } // process remaining comment edits, if they exist - for edit := range gi.mediator.CommentEdits(comment) { + for edit := range commentEdits { // ensure editor identity _, err := gi.ensurePerson(ctx, repo, edit.Editor) if err != nil { return err } - err = gi.ensureCommentEdit(ctx, repo, b, targetOpID, edit) + err = gi.ensureCommentEdit(ctx, repo, b, targetOpID, &edit) if err != nil { return err } @@ -403,7 +408,7 @@ func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCac return nil } -func (gi *githubImporter) ensureCommentEdit(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit userContentEdit) error { +func (gi *githubImporter) ensureCommentEdit(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit *userContentEdit) error { _, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(edit.Id)) if err == nil { return nil diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go index 8bd33adb..02067286 100644 --- a/bridge/github/import_mediator.go +++ b/bridge/github/import_mediator.go @@ -19,9 +19,7 @@ const ( // These values influence how fast the github graphql rate limit is exha CHAN_CAPACITY = 128 ) -type varmap map[string]interface{} - -// importMediator provides an interface to retrieve Github issues. +// importMediator provides a convenient interface to retrieve issues from the Github GraphQL API. type importMediator struct { // Github graphql client gc *githubv4.Client @@ -32,28 +30,30 @@ type importMediator struct { // name of the Github repository project string - // The importMediator will only query issues updated or created after the date given in - // the variable since. + // since specifies which issues to import. Issues that have been updated at or after the + // given date should be imported. since time.Time - // channel for the issues - issues chan issue + // issues is a channel holding bundles of issues to be imported. Each bundle holds the data + // associated with one issue. + issues chan issueBundle - // channel for issue edits - issueEdits map[githubv4.ID]chan userContentEdit - issueEditsMut sync.Mutex + // Sticky error + err error - // channel for timeline items - timelineItems map[githubv4.ID]chan timelineItem - timelineItemsMut sync.Mutex + // errMut is a mutex to synchronize access to the sticky error variable err. + errMut sync.Mutex +} - // channel for comment edits - commentEdits map[githubv4.ID]chan userContentEdit - commentEditsMut sync.Mutex +type issueBundle struct { + issue issue + issueEdits <-chan userContentEdit + timelineBundles <-chan timelineBundle +} - // Sticky error - err error - errMut sync.Mutex +type timelineBundle struct { + timelineItem timelineItem + userContentEdits <-chan userContentEdit } func (mm *importMediator) setError(err error) { @@ -64,18 +64,12 @@ func (mm *importMediator) setError(err error) { func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator { mm := importMediator{ - gc: client, - owner: owner, - project: project, - since: since, - issues: make(chan issue, CHAN_CAPACITY), - issueEdits: make(map[githubv4.ID]chan userContentEdit), - issueEditsMut: sync.Mutex{}, - timelineItems: make(map[githubv4.ID]chan timelineItem), - timelineItemsMut: sync.Mutex{}, - commentEdits: make(map[githubv4.ID]chan userContentEdit), - commentEditsMut: sync.Mutex{}, - err: nil, + gc: client, + owner: owner, + project: project, + since: since, + issues: make(chan issueBundle, CHAN_CAPACITY), + err: nil, } go func() { mm.fillIssues(ctx) @@ -84,6 +78,8 @@ func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, proj return &mm } +type varmap map[string]interface{} + func newIssueVars(owner, project string, since time.Time) varmap { return varmap{ "owner": githubv4.String(owner), @@ -119,31 +115,10 @@ func newCommentEditVars() varmap { } } -func (mm *importMediator) Issues() <-chan issue { +func (mm *importMediator) Issues() <-chan issueBundle { return mm.issues } -func (mm *importMediator) IssueEdits(issue *issue) <-chan userContentEdit { - mm.issueEditsMut.Lock() - channel := mm.issueEdits[issue.Id] - mm.issueEditsMut.Unlock() - return channel -} - -func (mm *importMediator) TimelineItems(issue *issue) <-chan timelineItem { - mm.timelineItemsMut.Lock() - channel := mm.timelineItems[issue.Id] - mm.timelineItemsMut.Unlock() - return channel -} - -func (mm *importMediator) CommentEdits(comment *issueComment) <-chan userContentEdit { - mm.commentEditsMut.Lock() - channel := mm.commentEdits[comment.Id] - mm.commentEditsMut.Unlock() - return channel -} - func (mm *importMediator) Error() error { mm.errMut.Lock() err := mm.err @@ -165,26 +140,14 @@ func (mm *importMediator) fillIssues(ctx context.Context) { issues, hasIssues := mm.queryIssue(ctx, initialCursor) for hasIssues { for _, node := range issues.Nodes { - // The order of statements in this loop is crucial for the correct concurrent - // execution. - // - // The issue edit channel and the timeline channel need to be added to the - // corresponding maps before the issue is sent in the issue channel. - // Otherwise, the client could try to retrieve issue edits and timeline itmes - // before these channels are even created. In this case the client would - // receive a nil channel. + // We need to send an issue-bundle over the issue channel before we start + // filling the issue edits and the timeline items to avoid deadlocks. issueEditChan := make(chan userContentEdit, CHAN_CAPACITY) - timelineChan := make(chan timelineItem, CHAN_CAPACITY) - mm.issueEditsMut.Lock() - mm.issueEdits[node.issue.Id] = issueEditChan - mm.issueEditsMut.Unlock() - mm.timelineItemsMut.Lock() - mm.timelineItems[node.issue.Id] = timelineChan - mm.timelineItemsMut.Unlock() + timelineBundleChan := make(chan timelineBundle, CHAN_CAPACITY) select { case <-ctx.Done(): return - case mm.issues <- node.issue: + case mm.issues <- issueBundle{node.issue, issueEditChan, timelineBundleChan}: } // We do not know whether the client reads from the issue edit channel @@ -196,8 +159,8 @@ func (mm *importMediator) fillIssues(ctx context.Context) { close(issueEditChan) }(node) go func(node issueNode) { - mm.fillTimeline(ctx, &node, timelineChan) - close(timelineChan) + mm.fillTimeline(ctx, &node, timelineBundleChan) + close(timelineBundleChan) }(node) } if !issues.PageInfo.HasNextPage { @@ -231,21 +194,22 @@ func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNo } } -func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan timelineItem) { +func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan timelineBundle) { items := &issueNode.TimelineItems hasItems := true for hasItems { for _, item := range items.Nodes { if item.Typename == "IssueComment" { - // Here the order of statements is crucial for correct concurrency. + // Issue comments are different than other timeline items in that + // they may have associated user content edits. + // + // Send over the timeline-channel before starting to fill the comment + // edits. commentEditChan := make(chan userContentEdit, CHAN_CAPACITY) - mm.commentEditsMut.Lock() - mm.commentEdits[item.IssueComment.Id] = commentEditChan - mm.commentEditsMut.Unlock() select { case <-ctx.Done(): return - case channel <- item: + case channel <- timelineBundle{item, commentEditChan}: } // We need to create a new goroutine for filling the comment edit // channel. @@ -257,7 +221,7 @@ func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode select { case <-ctx.Done(): return - case channel <- item: + case channel <- timelineBundle{item, nil}: } } } -- cgit From d7f555b4374eee2ecdc144283a73327c931f09f1 Mon Sep 17 00:00:00 2001 From: Alexander Scharinger Date: Tue, 9 Mar 2021 15:31:58 +0100 Subject: Github bridge: try again in case of web API error --- bridge/github/import_mediator.go | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) (limited to 'bridge') diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go index 02067286..8d1796b0 100644 --- a/bridge/github/import_mediator.go +++ b/bridge/github/import_mediator.go @@ -359,8 +359,30 @@ type rateLimiter interface { // and it is used to populate the response into it. It should be a pointer to a struct that // corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If // there is a Github rate limiting error, then the function sleeps and retries after the rate limit -// is expired. +// is expired. If there is another error, then the method will retry before giving up. func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error { + if err := mm.queryOnce(ctx, query, vars); err == nil { + // success: done + return nil + } + // failure: we will retry + // This is important for importing projects with a big number of issues. + retries := 3 + var err error + for i := 0; i < retries; i++ { + // wait a few seconds before retry + sleepTime := 8 * (i + 1) + time.Sleep(time.Duration(sleepTime) * time.Second) + err = mm.queryOnce(ctx, query, vars) + if err == nil { + // success: done + return nil + } + } + return err +} + +func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}) error { // first: just send the query to the graphql api vars["dryRun"] = githubv4.Boolean(false) qctx, cancel := context.WithTimeout(ctx, defaultTimeout) -- cgit From 52fba350d6d127d5c50aca34aabcca1ef0d26d75 Mon Sep 17 00:00:00 2001 From: Alexander Scharinger Date: Wed, 17 Mar 2021 19:29:39 +0100 Subject: Github bridge: send message to user when waiting When the Github GraphQL API rate limit is exhausted print a message at the bottom of the terminal so the user knows why the import has been paused. --- bridge/github/import.go | 81 +++++++++++++---- bridge/github/import_mediator.go | 191 +++++++++++++++++++++++++++++---------- 2 files changed, 209 insertions(+), 63 deletions(-) (limited to 'bridge') diff --git a/bridge/github/import.go b/bridge/github/import.go index d492488b..5337c474 100644 --- a/bridge/github/import.go +++ b/bridge/github/import.go @@ -56,10 +56,21 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, defer close(gi.out) // Loop over all matching issues - for bundle := range gi.mediator.Issues() { - issue := bundle.issue - issueEdits := bundle.issueEdits - timelineBundles := bundle.timelineBundles + for event := range gi.mediator.Issues { + var issue issue + var issueEdits <-chan userContentEditEvent + var timelineItems <-chan timelineEvent + switch e := event.(type) { + case messageEvent: + fmt.Println(e.msg) + continue + case issueData: + issue = e.issue + issueEdits = e.issueEdits + timelineItems = e.timelineItems + default: + panic(fmt.Sprint("Unknown event type")) + } // create issue b, err := gi.ensureIssue(ctx, repo, &issue, issueEdits) if err != nil { @@ -69,9 +80,19 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, } // loop over timeline items - for bundle := range timelineBundles { - item := bundle.timelineItem - edits := bundle.userContentEdits + for event := range timelineItems { + var item timelineItem + var edits <-chan userContentEditEvent + switch e := event.(type) { + case messageEvent: + fmt.Println(e.msg) + continue + case timelineData: + item = e.timelineItem + edits = e.userContentEdits + default: + panic(fmt.Sprint("Unknown event type")) + } err := gi.ensureTimelineItem(ctx, repo, b, &item, edits) if err != nil { err = fmt.Errorf("timeline item creation: %v", err) @@ -98,7 +119,27 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, return out, nil } -func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue, issueEdits <-chan userContentEdit) (*cache.BugCache, error) { +// getNextUserContentEdit reads the input channel, handles messages, and returns the next +// userContentEditData. +func getNextUserContentEdit(in <-chan userContentEditEvent) (*userContentEditData, bool) { + for { + event, hasEvent := <-in + if !hasEvent { + return nil, false + } + switch e := event.(type) { + case messageEvent: + fmt.Println(e.msg) + continue + case userContentEditData: + return &e, true + default: + panic(fmt.Sprint("Unknown event type")) + } + } +} + +func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue, issueEditEvents <-chan userContentEditEvent) (*cache.BugCache, error) { author, err := gi.ensurePerson(ctx, repo, issue.Author) if err != nil { return nil, err @@ -115,7 +156,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache // get first issue edit // if it exists, then it holds the bug creation - firstEdit, hasEdit := <-issueEdits + firstEdit, hasEdit := getNextUserContentEdit(issueEditEvents) // At Github there exist issues with seemingly empty titles. An example is // https://github.com/NixOS/nixpkgs/issues/72730 . @@ -162,7 +203,11 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache return nil, fmt.Errorf("finding or creating issue") } // process remaining issue edits, if they exist - for edit := range issueEdits { + for { + edit, hasEdit := getNextUserContentEdit(issueEditEvents) + if !hasEdit { + break + } // other edits will be added as CommentEdit operations target, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(issue.Id)) if err == cache.ErrNoMatchingOp { @@ -174,7 +219,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache return nil, err } - err = gi.ensureCommentEdit(ctx, repo, b, target, &edit) + err = gi.ensureCommentEdit(ctx, repo, b, target, &edit.userContentEdit) if err != nil { return nil, err } @@ -182,7 +227,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache return b, nil } -func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item *timelineItem, commentEdits <-chan userContentEdit) error { +func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item *timelineItem, commentEdits <-chan userContentEditEvent) error { switch item.Typename { case "IssueComment": @@ -345,7 +390,7 @@ func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.Re return nil } -func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment, commentEdits <-chan userContentEdit) error { +func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment, commentEditEvents <-chan userContentEditEvent) error { author, err := gi.ensurePerson(ctx, repo, comment.Author) if err != nil { return err @@ -356,7 +401,7 @@ func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCac // real error return err } - firstEdit, hasEdit := <-commentEdits + firstEdit, hasEdit := getNextUserContentEdit(commentEditEvents) if err == cache.ErrNoMatchingOp { var textInput string if hasEdit { @@ -393,14 +438,18 @@ func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCac return fmt.Errorf("finding or creating issue comment") } // process remaining comment edits, if they exist - for edit := range commentEdits { + for { + edit, hasEdit := getNextUserContentEdit(commentEditEvents) + if !hasEdit { + break + } // ensure editor identity _, err := gi.ensurePerson(ctx, repo, edit.Editor) if err != nil { return err } - err = gi.ensureCommentEdit(ctx, repo, b, targetOpID, &edit) + err = gi.ensureCommentEdit(ctx, repo, b, targetOpID, &edit.userContentEdit) if err != nil { return err } diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go index 8d1796b0..7da62968 100644 --- a/bridge/github/import_mediator.go +++ b/bridge/github/import_mediator.go @@ -34,9 +34,10 @@ type importMediator struct { // given date should be imported. since time.Time - // issues is a channel holding bundles of issues to be imported. Each bundle holds the data - // associated with one issue. - issues chan issueBundle + // Issues is a channel holding bundles of Issues to be imported. Each issueEvent + // is either a message (type messageEvent) or a struct holding all the data associated with + // one issue (type issueData). + Issues chan issueEvent // Sticky error err error @@ -45,17 +46,46 @@ type importMediator struct { errMut sync.Mutex } -type issueBundle struct { - issue issue - issueEdits <-chan userContentEdit - timelineBundles <-chan timelineBundle +type issueEvent interface { + isIssueEvent() +} +type timelineEvent interface { + isTimelineEvent() +} +type userContentEditEvent interface { + isUserContentEditEvent() } -type timelineBundle struct { - timelineItem timelineItem - userContentEdits <-chan userContentEdit +type messageEvent struct { + msg string } +func (messageEvent) isIssueEvent() {} +func (messageEvent) isUserContentEditEvent() {} +func (messageEvent) isTimelineEvent() {} + +type issueData struct { + issue + issueEdits <-chan userContentEditEvent + timelineItems <-chan timelineEvent +} + +func (issueData) isIssueEvent() {} + +type timelineData struct { + timelineItem + userContentEdits <-chan userContentEditEvent +} + +func (timelineData) isTimelineEvent() {} + +type userContentEditData struct { + userContentEdit +} + +// func (userContentEditData) isEvent() +func (userContentEditData) isUserContentEditEvent() {} + func (mm *importMediator) setError(err error) { mm.errMut.Lock() mm.err = err @@ -68,12 +98,12 @@ func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, proj owner: owner, project: project, since: since, - issues: make(chan issueBundle, CHAN_CAPACITY), + Issues: make(chan issueEvent, CHAN_CAPACITY), err: nil, } go func() { mm.fillIssues(ctx) - close(mm.issues) + close(mm.Issues) }() return &mm } @@ -115,10 +145,6 @@ func newCommentEditVars() varmap { } } -func (mm *importMediator) Issues() <-chan issueBundle { - return mm.issues -} - func (mm *importMediator) Error() error { mm.errMut.Lock() err := mm.err @@ -129,25 +155,49 @@ func (mm *importMediator) Error() error { func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) { query := userQuery{} vars := varmap{"login": githubv4.String(loginName)} - if err := mm.mQuery(ctx, &query, vars); err != nil { + // handle message events localy + channel := make(chan messageEvent) + defer close(channel) + // print all messages immediately + go func() { + for event := range channel { + fmt.Println(event.msg) + } + }() + if err := mm.mQuery(ctx, &query, vars, channel); err != nil { return nil, err } return &query.User, nil } func (mm *importMediator) fillIssues(ctx context.Context) { + // First: setup message handling while submitting GraphQL queries. + msgs := make(chan messageEvent) + defer close(msgs) + // forward all the messages to the issue channel. The message will be queued after + // all the issues which has been completed. + go func() { + for msg := range msgs { + select { + case <-ctx.Done(): + return + case mm.Issues <- msg: + } + } + }() + // start processing the actual issues initialCursor := githubv4.String("") - issues, hasIssues := mm.queryIssue(ctx, initialCursor) + issues, hasIssues := mm.queryIssue(ctx, initialCursor, msgs) for hasIssues { for _, node := range issues.Nodes { // We need to send an issue-bundle over the issue channel before we start // filling the issue edits and the timeline items to avoid deadlocks. - issueEditChan := make(chan userContentEdit, CHAN_CAPACITY) - timelineBundleChan := make(chan timelineBundle, CHAN_CAPACITY) + issueEditChan := make(chan userContentEditEvent, CHAN_CAPACITY) + timelineBundleChan := make(chan timelineEvent, CHAN_CAPACITY) select { case <-ctx.Done(): return - case mm.issues <- issueBundle{node.issue, issueEditChan, timelineBundleChan}: + case mm.Issues <- issueData{node.issue, issueEditChan, timelineBundleChan}: } // We do not know whether the client reads from the issue edit channel @@ -166,11 +216,25 @@ func (mm *importMediator) fillIssues(ctx context.Context) { if !issues.PageInfo.HasNextPage { break } - issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor) + issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor, msgs) } } -func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNode, channel chan userContentEdit) { +func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNode, channel chan<- userContentEditEvent) { + // First: setup message handling while submitting GraphQL queries. + msgs := make(chan messageEvent) + defer close(msgs) + // forward all the messages to the issue-edit channel. The message will be queued after + // all the issue edits which have been completed. + go func() { + for msg := range msgs { + select { + case <-ctx.Done(): + return + case channel <- msg: + } + } + }() edits := &issueNode.UserContentEdits hasEdits := true for hasEdits { @@ -184,17 +248,31 @@ func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNo select { case <-ctx.Done(): return - case channel <- edit: + case channel <- userContentEditData{edit}: } } if !edits.PageInfo.HasPreviousPage { break } - edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor) + edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor, msgs) } } -func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan timelineBundle) { +func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan<- timelineEvent) { + // First: setup message handling while submitting GraphQL queries. + msgs := make(chan messageEvent) + defer close(msgs) + // forward all the messages to the timeline channel. The message will be queued after + // all the timeline items which have been completed. + go func() { + for msg := range msgs { + select { + case <-ctx.Done(): + return + case channel <- msg: + } + } + }() items := &issueNode.TimelineItems hasItems := true for hasItems { @@ -205,11 +283,11 @@ func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode // // Send over the timeline-channel before starting to fill the comment // edits. - commentEditChan := make(chan userContentEdit, CHAN_CAPACITY) + commentEditChan := make(chan userContentEditEvent, CHAN_CAPACITY) select { case <-ctx.Done(): return - case channel <- timelineBundle{item, commentEditChan}: + case channel <- timelineData{item, commentEditChan}: } // We need to create a new goroutine for filling the comment edit // channel. @@ -221,22 +299,36 @@ func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode select { case <-ctx.Done(): return - case channel <- timelineBundle{item, nil}: + case channel <- timelineData{item, nil}: } } } if !items.PageInfo.HasNextPage { break } - items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor) + items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor, msgs) } } -func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem, channel chan userContentEdit) { +func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem, channel chan<- userContentEditEvent) { // Here we are only concerned with timeline items of type issueComment. if item.Typename != "IssueComment" { return } + // First: setup message handling while submitting GraphQL queries. + msgs := make(chan messageEvent) + defer close(msgs) + // forward all the messages to the user content edit channel. The message will be queued after + // all the user content edits which have been completed already. + go func() { + for msg := range msgs { + select { + case <-ctx.Done(): + return + case channel <- msg: + } + } + }() comment := &item.IssueComment edits := &comment.UserContentEdits hasEdits := true @@ -251,17 +343,17 @@ func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineIt select { case <-ctx.Done(): return - case channel <- edit: + case channel <- userContentEditData{edit}: } } if !edits.PageInfo.HasPreviousPage { break } - edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor) + edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor, msgs) } } -func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) { +func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*userContentEditConnection, bool) { vars := newCommentEditVars() vars["gqlNodeId"] = nid if cursor == "" { @@ -270,7 +362,7 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID vars["commentEditBefore"] = cursor } query := commentEditQuery{} - if err := mm.mQuery(ctx, &query, vars); err != nil { + if err := mm.mQuery(ctx, &query, vars, msgs); err != nil { mm.setError(err) return nil, false } @@ -281,7 +373,7 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID return connection, true } -func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) { +func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*timelineItemsConnection, bool) { vars := newTimelineVars() vars["gqlNodeId"] = nid if cursor == "" { @@ -290,7 +382,7 @@ func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cu vars["timelineAfter"] = cursor } query := timelineQuery{} - if err := mm.mQuery(ctx, &query, vars); err != nil { + if err := mm.mQuery(ctx, &query, vars, msgs); err != nil { mm.setError(err) return nil, false } @@ -301,7 +393,7 @@ func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cu return connection, true } -func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) { +func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*userContentEditConnection, bool) { vars := newIssueEditVars() vars["gqlNodeId"] = nid if cursor == "" { @@ -310,7 +402,7 @@ func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, vars["issueEditBefore"] = cursor } query := issueEditQuery{} - if err := mm.mQuery(ctx, &query, vars); err != nil { + if err := mm.mQuery(ctx, &query, vars, msgs); err != nil { mm.setError(err) return nil, false } @@ -321,7 +413,7 @@ func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, return connection, true } -func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) { +func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String, msgs chan<- messageEvent) (*issueConnection, bool) { vars := newIssueVars(mm.owner, mm.project, mm.since) if cursor == "" { vars["issueAfter"] = (*githubv4.String)(nil) @@ -329,7 +421,7 @@ func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String vars["issueAfter"] = githubv4.String(cursor) } query := issueQuery{} - if err := mm.mQuery(ctx, &query, vars); err != nil { + if err := mm.mQuery(ctx, &query, vars, msgs); err != nil { mm.setError(err) return nil, false } @@ -360,20 +452,20 @@ type rateLimiter interface { // corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If // there is a Github rate limiting error, then the function sleeps and retries after the rate limit // is expired. If there is another error, then the method will retry before giving up. -func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error { - if err := mm.queryOnce(ctx, query, vars); err == nil { +func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}, msgs chan<- messageEvent) error { + if err := mm.queryOnce(ctx, query, vars, msgs); err == nil { // success: done return nil } // failure: we will retry - // This is important for importing projects with a big number of issues. + // To retry is important for importing projects with a big number of issues. retries := 3 var err error for i := 0; i < retries; i++ { // wait a few seconds before retry sleepTime := 8 * (i + 1) time.Sleep(time.Duration(sleepTime) * time.Second) - err = mm.queryOnce(ctx, query, vars) + err = mm.queryOnce(ctx, query, vars, msgs) if err == nil { // success: done return nil @@ -382,7 +474,7 @@ func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars ma return err } -func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}) error { +func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}, msgs chan<- messageEvent) error { // first: just send the query to the graphql api vars["dryRun"] = githubv4.Boolean(false) qctx, cancel := context.WithTimeout(ctx, defaultTimeout) @@ -411,7 +503,12 @@ func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars resetTime := rateLimit.ResetAt.Time // Add a few seconds (8) for good measure resetTime = resetTime.Add(8 * time.Second) - fmt.Printf("Github rate limit exhausted. Sleeping until %s\n", resetTime.String()) + msg := fmt.Sprintf("Github GraphQL API rate limit exhausted. Sleeping until %s", resetTime.String()) + select { + case <-ctx.Done(): + return ctx.Err() + case msgs <- messageEvent{msg}: + } timer := time.NewTimer(time.Until(resetTime)) select { case <-ctx.Done(): -- cgit From 2646c63213cb4d1fa04e1b61051f4ac97c1978f0 Mon Sep 17 00:00:00 2001 From: Alexander Scharinger Date: Mon, 22 Mar 2021 19:26:59 +0100 Subject: Github bridge: Refactor --- bridge/github/import.go | 385 ++++++++++++++++++--------------------- bridge/github/import_mediator.go | 331 ++++++++++++--------------------- bridge/github/import_query.go | 22 ++- 3 files changed, 303 insertions(+), 435 deletions(-) (limited to 'bridge') diff --git a/bridge/github/import.go b/bridge/github/import.go index 5337c474..ceb35ef0 100644 --- a/bridge/github/import.go +++ b/bridge/github/import.go @@ -54,64 +54,80 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, go func() { defer close(gi.out) - - // Loop over all matching issues - for event := range gi.mediator.Issues { - var issue issue - var issueEdits <-chan userContentEditEvent - var timelineItems <-chan timelineEvent - switch e := event.(type) { - case messageEvent: - fmt.Println(e.msg) - continue - case issueData: - issue = e.issue - issueEdits = e.issueEdits - timelineItems = e.timelineItems - default: - panic(fmt.Sprint("Unknown event type")) + var currBug *cache.BugCache + var currEvent ImportEvent + var nextEvent ImportEvent + var err error + for { + // We need the current event and one look ahead event. + currEvent = nextEvent + if currEvent == nil { + currEvent = gi.mediator.NextImportEvent() } - // create issue - b, err := gi.ensureIssue(ctx, repo, &issue, issueEdits) - if err != nil { - err := fmt.Errorf("issue creation: %v", err) - out <- core.NewImportError(err, "") - return + if currEvent == nil { + break } - - // loop over timeline items - for event := range timelineItems { - var item timelineItem - var edits <-chan userContentEditEvent - switch e := event.(type) { - case messageEvent: - fmt.Println(e.msg) - continue - case timelineData: - item = e.timelineItem - edits = e.userContentEdits + nextEvent = gi.mediator.NextImportEvent() + + switch event := currEvent.(type) { + case MessageEvent: + fmt.Println(event.msg) + case IssueEvent: + // first: commit what is being held in currBug + if err = gi.commit(currBug, out); err != nil { + out <- core.NewImportError(err, "") + return + } + // second: create new issue + switch next := nextEvent.(type) { + case IssueEditEvent: + // consuming and using next event + nextEvent = nil + currBug, err = gi.ensureIssue(ctx, repo, &event.issue, &next.userContentEdit) default: - panic(fmt.Sprint("Unknown event type")) + currBug, err = gi.ensureIssue(ctx, repo, &event.issue, nil) + } + if err != nil { + err := fmt.Errorf("issue creation: %v", err) + out <- core.NewImportError(err, "") + return + } + case IssueEditEvent: + err = gi.ensureIssueEdit(ctx, repo, currBug, event.issueId, &event.userContentEdit) + if err != nil { + err = fmt.Errorf("issue edit: %v", err) + out <- core.NewImportError(err, "") + return + } + case TimelineEvent: + if next, ok := nextEvent.(CommentEditEvent); ok && event.Typename == "IssueComment" { + // consuming and using next event + nextEvent = nil + err = gi.ensureComment(ctx, repo, currBug, &event.timelineItem.IssueComment, &next.userContentEdit) + } else { + err = gi.ensureTimelineItem(ctx, repo, currBug, &event.timelineItem) } - err := gi.ensureTimelineItem(ctx, repo, b, &item, edits) if err != nil { err = fmt.Errorf("timeline item creation: %v", err) out <- core.NewImportError(err, "") return } - } - - if !b.NeedCommit() { - out <- core.NewImportNothing(b.Id(), "no imported operation") - } else if err := b.Commit(); err != nil { - // commit bug state - err = fmt.Errorf("bug commit: %v", err) - out <- core.NewImportError(err, "") - return + case CommentEditEvent: + err = gi.ensureCommentEdit(ctx, repo, currBug, event.commentId, &event.userContentEdit) + if err != nil { + err = fmt.Errorf("comment edit: %v", err) + out <- core.NewImportError(err, "") + return + } + default: + panic("Unknown event type") } } - - if err := gi.mediator.Error(); err != nil { + // commit what is being held in currBug before returning + if err = gi.commit(currBug, out); err != nil { + out <- core.NewImportError(err, "") + } + if err = gi.mediator.Error(); err != nil { gi.out <- core.NewImportError(err, "") } }() @@ -119,27 +135,21 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, return out, nil } -// getNextUserContentEdit reads the input channel, handles messages, and returns the next -// userContentEditData. -func getNextUserContentEdit(in <-chan userContentEditEvent) (*userContentEditData, bool) { - for { - event, hasEvent := <-in - if !hasEvent { - return nil, false - } - switch e := event.(type) { - case messageEvent: - fmt.Println(e.msg) - continue - case userContentEditData: - return &e, true - default: - panic(fmt.Sprint("Unknown event type")) - } +func (gi *githubImporter) commit(b *cache.BugCache, out chan<- core.ImportResult) error { + if b == nil { + return nil } + if !b.NeedCommit() { + out <- core.NewImportNothing(b.Id(), "no imported operation") + return nil + } else if err := b.Commit(); err != nil { + // commit bug state + return fmt.Errorf("bug commit: %v", err) + } + return nil } -func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue, issueEditEvents <-chan userContentEditEvent) (*cache.BugCache, error) { +func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache, issue *issue, issueEdit *userContentEdit) (*cache.BugCache, error) { author, err := gi.ensurePerson(ctx, repo, issue.Author) if err != nil { return nil, err @@ -150,14 +160,13 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache return excerpt.CreateMetadata[core.MetaKeyOrigin] == target && excerpt.CreateMetadata[metaKeyGithubId] == parseId(issue.Id) }) - if err != nil && err != bug.ErrBugNotExist { + if err == nil { + return b, nil + } + if err != bug.ErrBugNotExist { return nil, err } - // get first issue edit - // if it exists, then it holds the bug creation - firstEdit, hasEdit := getNextUserContentEdit(issueEditEvents) - // At Github there exist issues with seemingly empty titles. An example is // https://github.com/NixOS/nixpkgs/issues/72730 . // The title provided by the GraphQL API actually consists of a space followed by a @@ -168,70 +177,49 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache title = EMPTY_TITLE_PLACEHOLDER } - if err == bug.ErrBugNotExist { - var textInput string - if hasEdit { - // use the first issue edit: it represents the bug creation itself - textInput = string(*firstEdit.Diff) - } else { - // if there are no issue edits then the issue struct holds the bug creation - textInput = string(issue.Body) - } - cleanText, err := text.Cleanup(textInput) - if err != nil { - return nil, err - } - // create bug - b, _, err = repo.NewBugRaw( - author, - issue.CreatedAt.Unix(), - title, // TODO: this is the *current* title, not the original one - cleanText, - nil, - map[string]string{ - core.MetaKeyOrigin: target, - metaKeyGithubId: parseId(issue.Id), - metaKeyGithubUrl: issue.Url.String(), - }) - if err != nil { - return nil, err - } - // importing a new bug - gi.out <- core.NewImportBug(b.Id()) + var textInput string + if issueEdit != nil { + // use the first issue edit: it represents the bug creation itself + textInput = string(*issueEdit.Diff) + } else { + // if there are no issue edits then the issue struct holds the bug creation + textInput = string(issue.Body) } - if b == nil { - return nil, fmt.Errorf("finding or creating issue") + cleanText, err := text.Cleanup(textInput) + if err != nil { + return nil, err } - // process remaining issue edits, if they exist - for { - edit, hasEdit := getNextUserContentEdit(issueEditEvents) - if !hasEdit { - break - } - // other edits will be added as CommentEdit operations - target, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(issue.Id)) - if err == cache.ErrNoMatchingOp { - // original comment is missing somehow, issuing a warning - gi.out <- core.NewImportWarning(fmt.Errorf("comment ID %s to edit is missing", parseId(issue.Id)), b.Id()) - continue - } - if err != nil { - return nil, err - } - err = gi.ensureCommentEdit(ctx, repo, b, target, &edit.userContentEdit) - if err != nil { - return nil, err - } + // create bug + b, _, err = repo.NewBugRaw( + author, + issue.CreatedAt.Unix(), + title, // TODO: this is the *current* title, not the original one + cleanText, + nil, + map[string]string{ + core.MetaKeyOrigin: target, + metaKeyGithubId: parseId(issue.Id), + metaKeyGithubUrl: issue.Url.String(), + }) + if err != nil { + return nil, err } + // importing a new bug + gi.out <- core.NewImportBug(b.Id()) + return b, nil } -func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item *timelineItem, commentEdits <-chan userContentEditEvent) error { +func (gi *githubImporter) ensureIssueEdit(ctx context.Context, repo *cache.RepoCache, bug *cache.BugCache, ghIssueId githubv4.ID, edit *userContentEdit) error { + return gi.ensureCommentEdit(ctx, repo, bug, ghIssueId, edit) +} + +func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, item *timelineItem) error { switch item.Typename { case "IssueComment": - err := gi.ensureComment(ctx, repo, b, &item.IssueComment, commentEdits) + err := gi.ensureComment(ctx, repo, b, &item.IssueComment, nil) if err != nil { return fmt.Errorf("timeline comment creation: %v", err) } @@ -390,75 +378,62 @@ func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.Re return nil } -func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment, commentEditEvents <-chan userContentEditEvent) error { - author, err := gi.ensurePerson(ctx, repo, comment.Author) +func (gi *githubImporter) ensureCommentEdit(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, ghTargetId githubv4.ID, edit *userContentEdit) error { + // find comment + target, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(ghTargetId)) if err != nil { return err } - - targetOpID, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(comment.Id)) - if err != nil && err != cache.ErrNoMatchingOp { + _, err = b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(edit.Id)) + if err == nil { + return nil + } + if err != cache.ErrNoMatchingOp { // real error return err } - firstEdit, hasEdit := getNextUserContentEdit(commentEditEvents) - if err == cache.ErrNoMatchingOp { - var textInput string - if hasEdit { - // use the first comment edit: it represents the comment creation itself - textInput = string(*firstEdit.Diff) - } else { - // if there are not comment edits, then the comment struct holds the comment creation - textInput = string(comment.Body) - } - cleanText, err := text.Cleanup(textInput) - if err != nil { - return err - } - // add comment operation - op, err := b.AddCommentRaw( - author, - comment.CreatedAt.Unix(), - cleanText, - nil, - map[string]string{ - metaKeyGithubId: parseId(comment.Id), - metaKeyGithubUrl: comment.Url.String(), - }, - ) - if err != nil { - return err - } + editor, err := gi.ensurePerson(ctx, repo, edit.Editor) + if err != nil { + return err + } - gi.out <- core.NewImportComment(op.Id()) - targetOpID = op.Id() + if edit.DeletedAt != nil { + // comment deletion, not supported yet + return nil } - if targetOpID == "" { - return fmt.Errorf("finding or creating issue comment") + + cleanText, err := text.Cleanup(string(*edit.Diff)) + if err != nil { + return err } - // process remaining comment edits, if they exist - for { - edit, hasEdit := getNextUserContentEdit(commentEditEvents) - if !hasEdit { - break - } - // ensure editor identity - _, err := gi.ensurePerson(ctx, repo, edit.Editor) - if err != nil { - return err - } - err = gi.ensureCommentEdit(ctx, repo, b, targetOpID, &edit.userContentEdit) - if err != nil { - return err - } + // comment edition + op, err := b.EditCommentRaw( + editor, + edit.CreatedAt.Unix(), + target, + cleanText, + map[string]string{ + metaKeyGithubId: parseId(edit.Id), + }, + ) + + if err != nil { + return err } + + gi.out <- core.NewImportCommentEdition(op.Id()) return nil } -func (gi *githubImporter) ensureCommentEdit(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit *userContentEdit) error { - _, err := b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(edit.Id)) +func (gi *githubImporter) ensureComment(ctx context.Context, repo *cache.RepoCache, b *cache.BugCache, comment *issueComment, firstEdit *userContentEdit) error { + author, err := gi.ensurePerson(ctx, repo, comment.Author) + if err != nil { + return err + } + + _, err = b.ResolveOperationWithMetadata(metaKeyGithubId, parseId(comment.Id)) if err == nil { return nil } @@ -467,41 +442,35 @@ func (gi *githubImporter) ensureCommentEdit(ctx context.Context, repo *cache.Rep return err } - editor, err := gi.ensurePerson(ctx, repo, edit.Editor) + var textInput string + if firstEdit != nil { + // use the first comment edit: it represents the comment creation itself + textInput = string(*firstEdit.Diff) + } else { + // if there are not comment edits, then the comment struct holds the comment creation + textInput = string(comment.Body) + } + cleanText, err := text.Cleanup(textInput) if err != nil { return err } - switch { - case edit.DeletedAt != nil: - // comment deletion, not supported yet - return nil - - case edit.DeletedAt == nil: - - cleanText, err := text.Cleanup(string(*edit.Diff)) - if err != nil { - return err - } - - // comment edition - op, err := b.EditCommentRaw( - editor, - edit.CreatedAt.Unix(), - target, - cleanText, - map[string]string{ - metaKeyGithubId: parseId(edit.Id), - }, - ) - - if err != nil { - return err - } - - gi.out <- core.NewImportCommentEdition(op.Id()) - return nil + // add comment operation + op, err := b.AddCommentRaw( + author, + comment.CreatedAt.Unix(), + cleanText, + nil, + map[string]string{ + metaKeyGithubId: parseId(comment.Id), + metaKeyGithubUrl: comment.Url.String(), + }, + ) + if err != nil { + return err } + + gi.out <- core.NewImportComment(op.Id()) return nil } diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go index 7da62968..25d9c312 100644 --- a/bridge/github/import_mediator.go +++ b/bridge/github/import_mediator.go @@ -4,13 +4,13 @@ import ( "context" "fmt" "strings" - "sync" "time" "github.com/shurcooL/githubv4" ) -const ( // These values influence how fast the github graphql rate limit is exhausted. +const ( + // These values influence how fast the github graphql rate limit is exhausted. NUM_ISSUES = 40 NUM_ISSUE_EDITS = 100 NUM_TIMELINE_ITEMS = 100 @@ -34,76 +34,68 @@ type importMediator struct { // given date should be imported. since time.Time - // Issues is a channel holding bundles of Issues to be imported. Each issueEvent - // is either a message (type messageEvent) or a struct holding all the data associated with - // one issue (type issueData). - Issues chan issueEvent + // importEvents holds events representing issues, comments, edits, ... + // In this channel issues are immediately followed by their issue edits and comments are + // immediately followed by their comment edits. + importEvents chan ImportEvent // Sticky error err error - - // errMut is a mutex to synchronize access to the sticky error variable err. - errMut sync.Mutex } -type issueEvent interface { - isIssueEvent() -} -type timelineEvent interface { - isTimelineEvent() -} -type userContentEditEvent interface { - isUserContentEditEvent() +type ImportEvent interface { + isImportEvent() } -type messageEvent struct { +type MessageEvent struct { msg string } -func (messageEvent) isIssueEvent() {} -func (messageEvent) isUserContentEditEvent() {} -func (messageEvent) isTimelineEvent() {} +func (MessageEvent) isImportEvent() {} -type issueData struct { +type IssueEvent struct { issue - issueEdits <-chan userContentEditEvent - timelineItems <-chan timelineEvent } -func (issueData) isIssueEvent() {} +func (IssueEvent) isImportEvent() {} + +type IssueEditEvent struct { + issueId githubv4.ID + userContentEdit +} + +func (IssueEditEvent) isImportEvent() {} -type timelineData struct { +type TimelineEvent struct { + issueId githubv4.ID timelineItem - userContentEdits <-chan userContentEditEvent } -func (timelineData) isTimelineEvent() {} +func (TimelineEvent) isImportEvent() {} -type userContentEditData struct { +type CommentEditEvent struct { + commentId githubv4.ID userContentEdit } -// func (userContentEditData) isEvent() -func (userContentEditData) isUserContentEditEvent() {} +func (CommentEditEvent) isImportEvent() {} -func (mm *importMediator) setError(err error) { - mm.errMut.Lock() - mm.err = err - mm.errMut.Unlock() +func (mm *importMediator) NextImportEvent() ImportEvent { + return <-mm.importEvents } func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator { mm := importMediator{ - gc: client, - owner: owner, - project: project, - since: since, - Issues: make(chan issueEvent, CHAN_CAPACITY), - err: nil, + gc: client, + owner: owner, + project: project, + since: since, + importEvents: make(chan ImportEvent, CHAN_CAPACITY), + err: nil, } go func() { - mm.fillIssues(ctx) - close(mm.Issues) + mm.fillImportEvents(ctx) + close(mm.importEvents) }() return &mm } @@ -146,95 +138,42 @@ func newCommentEditVars() varmap { } func (mm *importMediator) Error() error { - mm.errMut.Lock() - err := mm.err - mm.errMut.Unlock() - return err + return mm.err } func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) { query := userQuery{} vars := varmap{"login": githubv4.String(loginName)} - // handle message events localy - channel := make(chan messageEvent) - defer close(channel) - // print all messages immediately - go func() { - for event := range channel { - fmt.Println(event.msg) - } - }() - if err := mm.mQuery(ctx, &query, vars, channel); err != nil { + if err := mm.mQuery(ctx, &query, vars); err != nil { return nil, err } return &query.User, nil } -func (mm *importMediator) fillIssues(ctx context.Context) { - // First: setup message handling while submitting GraphQL queries. - msgs := make(chan messageEvent) - defer close(msgs) - // forward all the messages to the issue channel. The message will be queued after - // all the issues which has been completed. - go func() { - for msg := range msgs { - select { - case <-ctx.Done(): - return - case mm.Issues <- msg: - } - } - }() - // start processing the actual issues +func (mm *importMediator) fillImportEvents(ctx context.Context) { initialCursor := githubv4.String("") - issues, hasIssues := mm.queryIssue(ctx, initialCursor, msgs) + issues, hasIssues := mm.queryIssue(ctx, initialCursor) for hasIssues { for _, node := range issues.Nodes { - // We need to send an issue-bundle over the issue channel before we start - // filling the issue edits and the timeline items to avoid deadlocks. - issueEditChan := make(chan userContentEditEvent, CHAN_CAPACITY) - timelineBundleChan := make(chan timelineEvent, CHAN_CAPACITY) select { case <-ctx.Done(): return - case mm.Issues <- issueData{node.issue, issueEditChan, timelineBundleChan}: + case mm.importEvents <- IssueEvent{node.issue}: } - // We do not know whether the client reads from the issue edit channel - // or the timeline channel first. Since the capacity of any channel is limited - // any send operation may block. Hence, in order to avoid deadlocks we need - // to send over both these channels concurrently. - go func(node issueNode) { - mm.fillIssueEdits(ctx, &node, issueEditChan) - close(issueEditChan) - }(node) - go func(node issueNode) { - mm.fillTimeline(ctx, &node, timelineBundleChan) - close(timelineBundleChan) - }(node) + // issue edit events follow the issue event + mm.fillIssueEditEvents(ctx, &node) + // last come the timeline events + mm.fillTimelineEvents(ctx, &node) } if !issues.PageInfo.HasNextPage { break } - issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor, msgs) + issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor) } } -func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNode, channel chan<- userContentEditEvent) { - // First: setup message handling while submitting GraphQL queries. - msgs := make(chan messageEvent) - defer close(msgs) - // forward all the messages to the issue-edit channel. The message will be queued after - // all the issue edits which have been completed. - go func() { - for msg := range msgs { - select { - case <-ctx.Done(): - return - case channel <- msg: - } - } - }() +func (mm *importMediator) fillIssueEditEvents(ctx context.Context, issueNode *issueNode) { edits := &issueNode.UserContentEdits hasEdits := true for hasEdits { @@ -248,87 +187,86 @@ func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNo select { case <-ctx.Done(): return - case channel <- userContentEditData{edit}: + case mm.importEvents <- IssueEditEvent{issueId: issueNode.issue.Id, userContentEdit: edit}: } } if !edits.PageInfo.HasPreviousPage { break } - edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor, msgs) + edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor) } } -func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan<- timelineEvent) { - // First: setup message handling while submitting GraphQL queries. - msgs := make(chan messageEvent) - defer close(msgs) - // forward all the messages to the timeline channel. The message will be queued after - // all the timeline items which have been completed. - go func() { - for msg := range msgs { - select { - case <-ctx.Done(): - return - case channel <- msg: - } - } - }() +func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) { + vars := newIssueEditVars() + vars["gqlNodeId"] = nid + if cursor == "" { + vars["issueEditBefore"] = (*githubv4.String)(nil) + } else { + vars["issueEditBefore"] = cursor + } + query := issueEditQuery{} + if err := mm.mQuery(ctx, &query, vars); err != nil { + mm.err = err + return nil, false + } + connection := &query.Node.Issue.UserContentEdits + if len(connection.Nodes) <= 0 { + return nil, false + } + return connection, true +} + +func (mm *importMediator) fillTimelineEvents(ctx context.Context, issueNode *issueNode) { items := &issueNode.TimelineItems hasItems := true for hasItems { for _, item := range items.Nodes { + select { + case <-ctx.Done(): + return + case mm.importEvents <- TimelineEvent{issueId: issueNode.issue.Id, timelineItem: item}: + } if item.Typename == "IssueComment" { // Issue comments are different than other timeline items in that // they may have associated user content edits. - // - // Send over the timeline-channel before starting to fill the comment - // edits. - commentEditChan := make(chan userContentEditEvent, CHAN_CAPACITY) - select { - case <-ctx.Done(): - return - case channel <- timelineData{item, commentEditChan}: - } - // We need to create a new goroutine for filling the comment edit - // channel. - go func(item timelineItem) { - mm.fillCommentEdits(ctx, &item, commentEditChan) - close(commentEditChan) - }(item) - } else { - select { - case <-ctx.Done(): - return - case channel <- timelineData{item, nil}: - } + // Right after the comment we send the comment edits. + mm.fillCommentEdits(ctx, &item) } } if !items.PageInfo.HasNextPage { break } - items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor, msgs) + items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor) + } +} + +func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) { + vars := newTimelineVars() + vars["gqlNodeId"] = nid + if cursor == "" { + vars["timelineAfter"] = (*githubv4.String)(nil) + } else { + vars["timelineAfter"] = cursor } + query := timelineQuery{} + if err := mm.mQuery(ctx, &query, vars); err != nil { + mm.err = err + return nil, false + } + connection := &query.Node.Issue.TimelineItems + if len(connection.Nodes) <= 0 { + return nil, false + } + return connection, true } -func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem, channel chan<- userContentEditEvent) { +func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem) { // Here we are only concerned with timeline items of type issueComment. if item.Typename != "IssueComment" { return } // First: setup message handling while submitting GraphQL queries. - msgs := make(chan messageEvent) - defer close(msgs) - // forward all the messages to the user content edit channel. The message will be queued after - // all the user content edits which have been completed already. - go func() { - for msg := range msgs { - select { - case <-ctx.Done(): - return - case channel <- msg: - } - } - }() comment := &item.IssueComment edits := &comment.UserContentEdits hasEdits := true @@ -343,17 +281,17 @@ func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineIt select { case <-ctx.Done(): return - case channel <- userContentEditData{edit}: + case mm.importEvents <- CommentEditEvent{commentId: comment.Id, userContentEdit: edit}: } } if !edits.PageInfo.HasPreviousPage { break } - edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor, msgs) + edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor) } } -func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*userContentEditConnection, bool) { +func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) { vars := newCommentEditVars() vars["gqlNodeId"] = nid if cursor == "" { @@ -362,8 +300,8 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID vars["commentEditBefore"] = cursor } query := commentEditQuery{} - if err := mm.mQuery(ctx, &query, vars, msgs); err != nil { - mm.setError(err) + if err := mm.mQuery(ctx, &query, vars); err != nil { + mm.err = err return nil, false } connection := &query.Node.IssueComment.UserContentEdits @@ -373,47 +311,7 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID return connection, true } -func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*timelineItemsConnection, bool) { - vars := newTimelineVars() - vars["gqlNodeId"] = nid - if cursor == "" { - vars["timelineAfter"] = (*githubv4.String)(nil) - } else { - vars["timelineAfter"] = cursor - } - query := timelineQuery{} - if err := mm.mQuery(ctx, &query, vars, msgs); err != nil { - mm.setError(err) - return nil, false - } - connection := &query.Node.Issue.TimelineItems - if len(connection.Nodes) <= 0 { - return nil, false - } - return connection, true -} - -func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String, msgs chan<- messageEvent) (*userContentEditConnection, bool) { - vars := newIssueEditVars() - vars["gqlNodeId"] = nid - if cursor == "" { - vars["issueEditBefore"] = (*githubv4.String)(nil) - } else { - vars["issueEditBefore"] = cursor - } - query := issueEditQuery{} - if err := mm.mQuery(ctx, &query, vars, msgs); err != nil { - mm.setError(err) - return nil, false - } - connection := &query.Node.Issue.UserContentEdits - if len(connection.Nodes) <= 0 { - return nil, false - } - return connection, true -} - -func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String, msgs chan<- messageEvent) (*issueConnection, bool) { +func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) { vars := newIssueVars(mm.owner, mm.project, mm.since) if cursor == "" { vars["issueAfter"] = (*githubv4.String)(nil) @@ -421,8 +319,8 @@ func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String vars["issueAfter"] = githubv4.String(cursor) } query := issueQuery{} - if err := mm.mQuery(ctx, &query, vars, msgs); err != nil { - mm.setError(err) + if err := mm.mQuery(ctx, &query, vars); err != nil { + mm.err = err return nil, false } connection := &query.Repository.Issues @@ -443,29 +341,26 @@ func reverse(eds []userContentEdit) chan userContentEdit { return ret } -type rateLimiter interface { - rateLimit() rateLimit -} - // mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL query // and it is used to populate the response into it. It should be a pointer to a struct that // corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If // there is a Github rate limiting error, then the function sleeps and retries after the rate limit // is expired. If there is another error, then the method will retry before giving up. -func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}, msgs chan<- messageEvent) error { - if err := mm.queryOnce(ctx, query, vars, msgs); err == nil { +func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error { + if err := mm.queryOnce(ctx, query, vars); err == nil { // success: done return nil } // failure: we will retry - // To retry is important for importing projects with a big number of issues. + // To retry is important for importing projects with a big number of issues, because + // there may be temporary network errors or momentary internal errors of the github servers. retries := 3 var err error for i := 0; i < retries; i++ { // wait a few seconds before retry sleepTime := 8 * (i + 1) time.Sleep(time.Duration(sleepTime) * time.Second) - err = mm.queryOnce(ctx, query, vars, msgs) + err = mm.queryOnce(ctx, query, vars) if err == nil { // success: done return nil @@ -474,7 +369,7 @@ func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars ma return err } -func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}, msgs chan<- messageEvent) error { +func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars map[string]interface{}) error { // first: just send the query to the graphql api vars["dryRun"] = githubv4.Boolean(false) qctx, cancel := context.WithTimeout(ctx, defaultTimeout) @@ -507,7 +402,7 @@ func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars select { case <-ctx.Done(): return ctx.Err() - case msgs <- messageEvent{msg}: + case mm.importEvents <- MessageEvent{msg}: } timer := time.NewTimer(time.Until(resetTime)) select { diff --git a/bridge/github/import_query.go b/bridge/github/import_query.go index c4ab2aa9..461daf94 100644 --- a/bridge/github/import_query.go +++ b/bridge/github/import_query.go @@ -2,6 +2,19 @@ package github import "github.com/shurcooL/githubv4" +type rateLimit struct { + Cost githubv4.Int + Limit githubv4.Int + NodeCount githubv4.Int + Remaining githubv4.Int + ResetAt githubv4.DateTime + Used githubv4.Int +} + +type rateLimiter interface { + rateLimit() rateLimit +} + type userQuery struct { RateLimit rateLimit `graphql:"rateLimit(dryRun: $dryRun)"` User user `graphql:"user(login: $login)"` @@ -211,12 +224,3 @@ type pageInfo struct { StartCursor githubv4.String HasPreviousPage bool } - -type rateLimit struct { - Cost githubv4.Int - Limit githubv4.Int - NodeCount githubv4.Int - Remaining githubv4.Int - ResetAt githubv4.DateTime - Used githubv4.Int -} -- cgit From 21b330dad19585ec6000d4ce385ebd8619dd07de Mon Sep 17 00:00:00 2001 From: Alexander Scharinger Date: Sat, 27 Mar 2021 22:45:49 +0100 Subject: Github bridge: fix message about timeout --- bridge/github/import.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'bridge') diff --git a/bridge/github/import.go b/bridge/github/import.go index ceb35ef0..d2e5d659 100644 --- a/bridge/github/import.go +++ b/bridge/github/import.go @@ -67,7 +67,6 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, if currEvent == nil { break } - nextEvent = gi.mediator.NextImportEvent() switch event := currEvent.(type) { case MessageEvent: @@ -79,6 +78,7 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, return } // second: create new issue + nextEvent = gi.mediator.NextImportEvent() switch next := nextEvent.(type) { case IssueEditEvent: // consuming and using next event @@ -100,6 +100,7 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, return } case TimelineEvent: + nextEvent = gi.mediator.NextImportEvent() if next, ok := nextEvent.(CommentEditEvent); ok && event.Typename == "IssueComment" { // consuming and using next event nextEvent = nil -- cgit From db57227ae53d3e8bba8a5cf939fb2b165d9ceaaf Mon Sep 17 00:00:00 2001 From: Alexander Scharinger Date: Sat, 27 Mar 2021 22:54:06 +0100 Subject: Github bridge: stop sleep-timer on SIGINT --- bridge/github/import_mediator.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) (limited to 'bridge') diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go index 25d9c312..ac094bbb 100644 --- a/bridge/github/import_mediator.go +++ b/bridge/github/import_mediator.go @@ -358,8 +358,14 @@ func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars ma var err error for i := 0; i < retries; i++ { // wait a few seconds before retry - sleepTime := 8 * (i + 1) - time.Sleep(time.Duration(sleepTime) * time.Second) + sleepTime := time.Duration(8*(i+1)) * time.Second + timer := time.NewTimer(sleepTime) + select { + case <-ctx.Done(): + stop(timer) + return ctx.Err() + case <-timer.C: + } err = mm.queryOnce(ctx, query, vars) if err == nil { // success: done -- cgit From b2e98ef07f3083db303099fbcedbd98a33ad1164 Mon Sep 17 00:00:00 2001 From: Alexander Scharinger Date: Sun, 28 Mar 2021 12:16:13 +0200 Subject: Github bridge: refactor message handling --- bridge/github/import.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) (limited to 'bridge') diff --git a/bridge/github/import.go b/bridge/github/import.go index d2e5d659..619c1c1e 100644 --- a/bridge/github/import.go +++ b/bridge/github/import.go @@ -62,11 +62,12 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, // We need the current event and one look ahead event. currEvent = nextEvent if currEvent == nil { - currEvent = gi.mediator.NextImportEvent() + currEvent = gi.getEventHandleMsgs() } if currEvent == nil { break } + nextEvent = gi.getEventHandleMsgs() switch event := currEvent.(type) { case MessageEvent: @@ -78,7 +79,6 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, return } // second: create new issue - nextEvent = gi.mediator.NextImportEvent() switch next := nextEvent.(type) { case IssueEditEvent: // consuming and using next event @@ -100,7 +100,6 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, return } case TimelineEvent: - nextEvent = gi.mediator.NextImportEvent() if next, ok := nextEvent.(CommentEditEvent); ok && event.Typename == "IssueComment" { // consuming and using next event nextEvent = nil @@ -136,6 +135,19 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, return out, nil } +func (gi *githubImporter) getEventHandleMsgs() ImportEvent { + for { + // read event from import mediator + event := gi.mediator.NextImportEvent() + // consume (and use) all message events + if e, ok := event.(MessageEvent); ok { + fmt.Println(e.msg) + continue + } + return event + } +} + func (gi *githubImporter) commit(b *cache.BugCache, out chan<- core.ImportResult) error { if b == nil { return nil -- cgit From 3d14e2e67c4985c429471ea6643f013ade2c2692 Mon Sep 17 00:00:00 2001 From: Alexander Scharinger Date: Thu, 8 Apr 2021 22:48:31 +0200 Subject: Bridges: move credential loading and client creation Gitlab and Jira bridge: move credential loading and client creation from `Init` to `ImportAll` in order to harmonize the behaviour of the different bridges. --- bridge/gitlab/import.go | 25 +++++++++++------------- bridge/jira/import.go | 52 +++++++++++++++++++++++++------------------------ 2 files changed, 38 insertions(+), 39 deletions(-) (limited to 'bridge') diff --git a/bridge/gitlab/import.go b/bridge/gitlab/import.go index cf4f0039..93c6a1a9 100644 --- a/bridge/gitlab/import.go +++ b/bridge/gitlab/import.go @@ -33,32 +33,29 @@ type gitlabImporter struct { func (gi *gitlabImporter) Init(_ context.Context, repo *cache.RepoCache, conf core.Configuration) error { gi.conf = conf + return nil +} +// ImportAll iterate over all the configured repository issues (notes) and ensure the creation +// of the missing issues / comments / label events / title changes ... +func (gi *gitlabImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) { creds, err := auth.List(repo, auth.WithTarget(target), auth.WithKind(auth.KindToken), - auth.WithMeta(auth.MetaKeyBaseURL, conf[confKeyGitlabBaseUrl]), - auth.WithMeta(auth.MetaKeyLogin, conf[confKeyDefaultLogin]), + auth.WithMeta(auth.MetaKeyBaseURL, gi.conf[confKeyGitlabBaseUrl]), + auth.WithMeta(auth.MetaKeyLogin, gi.conf[confKeyDefaultLogin]), ) if err != nil { - return err + return nil, err } - if len(creds) == 0 { - return ErrMissingIdentityToken + return nil, ErrMissingIdentityToken } - - gi.client, err = buildClient(conf[confKeyGitlabBaseUrl], creds[0].(*auth.Token)) + gi.client, err = buildClient(gi.conf[confKeyGitlabBaseUrl], creds[0].(*auth.Token)) if err != nil { - return err + return nil, err } - return nil -} - -// ImportAll iterate over all the configured repository issues (notes) and ensure the creation -// of the missing issues / comments / label events / title changes ... -func (gi *gitlabImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) { gi.iterator = iterator.NewIterator(ctx, gi.client, 10, gi.conf[confKeyProjectID], since) out := make(chan core.ImportResult) gi.out = out diff --git a/bridge/jira/import.go b/bridge/jira/import.go index b66b0fa3..54c4ca35 100644 --- a/bridge/jira/import.go +++ b/bridge/jira/import.go @@ -34,6 +34,12 @@ type jiraImporter struct { // Init . func (ji *jiraImporter) Init(ctx context.Context, repo *cache.RepoCache, conf core.Configuration) error { ji.conf = conf + return nil +} + +// ImportAll iterate over all the configured repository issues and ensure the +// creation of the missing issues / timeline items / edits / label events ... +func (ji *jiraImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) { var cred auth.Credential @@ -41,44 +47,40 @@ func (ji *jiraImporter) Init(ctx context.Context, repo *cache.RepoCache, conf co creds, err := auth.List(repo, auth.WithTarget(target), auth.WithKind(auth.KindLoginPassword), - auth.WithMeta(auth.MetaKeyBaseURL, conf[confKeyBaseUrl]), - auth.WithMeta(auth.MetaKeyLogin, conf[confKeyDefaultLogin]), - ) - if err != nil { - return err - } - if len(creds) > 0 { - cred = creds[0] - goto end - } - - creds, err = auth.List(repo, - auth.WithTarget(target), - auth.WithKind(auth.KindLogin), - auth.WithMeta(auth.MetaKeyBaseURL, conf[confKeyBaseUrl]), - auth.WithMeta(auth.MetaKeyLogin, conf[confKeyDefaultLogin]), + auth.WithMeta(auth.MetaKeyBaseURL, ji.conf[confKeyBaseUrl]), + auth.WithMeta(auth.MetaKeyLogin, ji.conf[confKeyDefaultLogin]), ) if err != nil { - return err + return nil, err } if len(creds) > 0 { cred = creds[0] + } else { + creds, err = auth.List(repo, + auth.WithTarget(target), + auth.WithKind(auth.KindLogin), + auth.WithMeta(auth.MetaKeyBaseURL, ji.conf[confKeyBaseUrl]), + auth.WithMeta(auth.MetaKeyLogin, ji.conf[confKeyDefaultLogin]), + ) + if err != nil { + return nil, err + } + if len(creds) > 0 { + cred = creds[0] + } } -end: if cred == nil { - return fmt.Errorf("no credential for this bridge") + return nil, fmt.Errorf("no credential for this bridge") } // TODO(josh)[da52062]: Validate token and if it is expired then prompt for // credentials and generate a new one - ji.client, err = buildClient(ctx, conf[confKeyBaseUrl], conf[confKeyCredentialType], cred) - return err -} + ji.client, err = buildClient(ctx, ji.conf[confKeyBaseUrl], ji.conf[confKeyCredentialType], cred) + if err != nil { + return nil, err + } -// ImportAll iterate over all the configured repository issues and ensure the -// creation of the missing issues / timeline items / edits / label events ... -func (ji *jiraImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) { sinceStr := since.Format("2006-01-02 15:04") project := ji.conf[confKeyProject] -- cgit From ede5e218ac4ae492f06be9d3bb8a1e9463eda739 Mon Sep 17 00:00:00 2001 From: Alexander Scharinger Date: Thu, 8 Apr 2021 23:17:24 +0200 Subject: Add comment to clarify look ahead in import channel --- bridge/github/import.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) (limited to 'bridge') diff --git a/bridge/github/import.go b/bridge/github/import.go index 619c1c1e..66710d0f 100644 --- a/bridge/github/import.go +++ b/bridge/github/import.go @@ -59,7 +59,16 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, var nextEvent ImportEvent var err error for { - // We need the current event and one look ahead event. + // An IssueEvent contains the issue in its most recent state. If an issue + // has at least one issue edit, then the history of the issue edits is + // represented by IssueEditEvents. That is, the unedited (original) issue + // might be saved only in the IssueEditEvent following the IssueEvent. + // Since we replicate the edit history we need to either use the IssueEvent + // (if there are no edits) or the IssueEvent together with its first + // IssueEditEvent (if there are edits). + // Exactly the same is true for comments and comment edits. + // As a consequence we need to look at the current event and one look ahead + // event. currEvent = nextEvent if currEvent == nil { currEvent = gi.getEventHandleMsgs() -- cgit From 0fd570171d171aa574d7f01d6033a9c01d668465 Mon Sep 17 00:00:00 2001 From: Alexander Scharinger Date: Thu, 8 Apr 2021 23:57:25 +0200 Subject: Improve feedback for user when Github rate limiting The Github bridge itself should not write anything. This commit removes code writing to stdout and itroduces an event `ImportEventRateLimiting` to `core.ImportResult` in order to inform about a rate limiting situation of the Github GraphQL API. Now the communication with the user is delegated to the various user interfaces. --- bridge/core/import.go | 12 ++++++++++++ bridge/github/import.go | 10 +++++----- bridge/github/import_mediator.go | 8 ++++---- 3 files changed, 21 insertions(+), 9 deletions(-) (limited to 'bridge') diff --git a/bridge/core/import.go b/bridge/core/import.go index 0b0b4c68..c1965d4e 100644 --- a/bridge/core/import.go +++ b/bridge/core/import.go @@ -34,6 +34,9 @@ const ( // but not severe enough to consider the import a failure. ImportEventWarning + // The import system (web API) has reached the rate limit + ImportEventRateLimiting + // Error happened during import ImportEventError ) @@ -87,6 +90,8 @@ func (er ImportResult) String() string { parts = append(parts, fmt.Sprintf("err: %s", er.Err)) } return strings.Join(parts, " ") + case ImportEventRateLimiting: + return fmt.Sprintf("rate limiting: %s", er.Reason) default: panic("unknown import result") @@ -165,3 +170,10 @@ func NewImportIdentity(id entity.Id) ImportResult { Event: ImportEventIdentity, } } + +func NewImportRateLimiting(msg string) ImportResult { + return ImportResult{ + Reason: msg, + Event: ImportEventRateLimiting, + } +} diff --git a/bridge/github/import.go b/bridge/github/import.go index 66710d0f..e89f34c4 100644 --- a/bridge/github/import.go +++ b/bridge/github/import.go @@ -79,8 +79,8 @@ func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, nextEvent = gi.getEventHandleMsgs() switch event := currEvent.(type) { - case MessageEvent: - fmt.Println(event.msg) + case RateLimitingEvent: + out <- core.NewImportRateLimiting(event.msg) case IssueEvent: // first: commit what is being held in currBug if err = gi.commit(currBug, out); err != nil { @@ -148,9 +148,9 @@ func (gi *githubImporter) getEventHandleMsgs() ImportEvent { for { // read event from import mediator event := gi.mediator.NextImportEvent() - // consume (and use) all message events - if e, ok := event.(MessageEvent); ok { - fmt.Println(e.msg) + // consume (and use) all rate limiting events + if e, ok := event.(RateLimitingEvent); ok { + gi.out <- core.NewImportRateLimiting(e.msg) continue } return event diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go index ac094bbb..825a0f98 100644 --- a/bridge/github/import_mediator.go +++ b/bridge/github/import_mediator.go @@ -47,11 +47,11 @@ type ImportEvent interface { isImportEvent() } -type MessageEvent struct { +type RateLimitingEvent struct { msg string } -func (MessageEvent) isImportEvent() {} +func (RateLimitingEvent) isImportEvent() {} type IssueEvent struct { issue @@ -404,11 +404,11 @@ func (mm *importMediator) queryOnce(ctx context.Context, query rateLimiter, vars resetTime := rateLimit.ResetAt.Time // Add a few seconds (8) for good measure resetTime = resetTime.Add(8 * time.Second) - msg := fmt.Sprintf("Github GraphQL API rate limit exhausted. Sleeping until %s", resetTime.String()) + msg := fmt.Sprintf("Github GraphQL API: import will sleep until %s", resetTime.String()) select { case <-ctx.Done(): return ctx.Err() - case mm.importEvents <- MessageEvent{msg}: + case mm.importEvents <- RateLimitingEvent{msg}: } timer := time.NewTimer(time.Until(resetTime)) select { -- cgit From 10a80f188861157b5f58bb700fe7f1c84bb4da95 Mon Sep 17 00:00:00 2001 From: Michael Muré Date: Fri, 9 Apr 2021 13:07:45 +0200 Subject: github: minor cleanups --- bridge/github/config.go | 2 +- bridge/github/export.go | 4 ++-- bridge/github/export_test.go | 2 +- bridge/github/import.go | 9 ++++++--- bridge/github/import_mediator.go | 30 +++++++++++++++--------------- 5 files changed, 25 insertions(+), 22 deletions(-) (limited to 'bridge') diff --git a/bridge/github/config.go b/bridge/github/config.go index 2b5af7fb..1e23c8ee 100644 --- a/bridge/github/config.go +++ b/bridge/github/config.go @@ -251,7 +251,7 @@ func promptUserToGoToBrowser(url, userCode string) { fmt.Println("Please visit the following Github URL in a browser and enter your user authentication code.") fmt.Println() fmt.Println(" URL:", url) - fmt.Println(" user authentiation code:", userCode) + fmt.Println(" user authentication code:", userCode) fmt.Println() } diff --git a/bridge/github/export.go b/bridge/github/export.go index 1a59fbb3..264f2a23 100644 --- a/bridge/github/export.go +++ b/bridge/github/export.go @@ -504,7 +504,7 @@ func (ge *githubExporter) cacheGithubLabels(ctx context.Context, gc *githubv4.Cl return nil } -func (ge *githubExporter) getLabelID(gc *githubv4.Client, label string) (string, error) { +func (ge *githubExporter) getLabelID(label string) (string, error) { label = strings.ToLower(label) for cachedLabel, ID := range ge.cachedLabels { if label == strings.ToLower(cachedLabel) { @@ -598,7 +598,7 @@ func (ge *githubExporter) createGithubLabelV4(gc *githubv4.Client, label, labelC func (ge *githubExporter) getOrCreateGithubLabelID(ctx context.Context, gc *githubv4.Client, repositoryID string, label bug.Label) (string, error) { // try to get label id from cache - labelID, err := ge.getLabelID(gc, string(label)) + labelID, err := ge.getLabelID(string(label)) if err == nil { return labelID, nil } diff --git a/bridge/github/export_test.go b/bridge/github/export_test.go index b7a36bcf..78425e60 100644 --- a/bridge/github/export_test.go +++ b/bridge/github/export_test.go @@ -268,7 +268,7 @@ func TestGithubPushPull(t *testing.T) { require.True(t, ok) require.Equal(t, issueOrigin, target) - //TODO: maybe more tests to ensure bug final state + // TODO: maybe more tests to ensure bug final state }) } } diff --git a/bridge/github/import.go b/bridge/github/import.go index 47be6374..306ef087 100644 --- a/bridge/github/import.go +++ b/bridge/github/import.go @@ -15,7 +15,7 @@ import ( "github.com/MichaelMure/git-bug/util/text" ) -const EMPTY_TITLE_PLACEHOLDER = "" +const EmptyTitlePlaceholder = "" // githubImporter implement the Importer interface type githubImporter struct { @@ -196,7 +196,7 @@ func (gi *githubImporter) ensureIssue(ctx context.Context, repo *cache.RepoCache // return an error: empty title. title := string(issue.Title) if title == " \u200b" { // U+200B == zero width space - title = EMPTY_TITLE_PLACEHOLDER + title = EmptyTitlePlaceholder } var textInput string @@ -380,7 +380,7 @@ func (gi *githubImporter) ensureTimelineItem(ctx context.Context, repo *cache.Re // function to return an error: empty title. title := string(item.RenamedTitleEvent.CurrentTitle) if title == " \u200b" { // U+200B == zero width space - title = EMPTY_TITLE_PLACEHOLDER + title = EmptyTitlePlaceholder } op, err := b.SetTitleRaw( @@ -568,6 +568,9 @@ func (gi *githubImporter) getGhost(ctx context.Context, repo *cache.RepoCache) ( return nil, err } user, err := gi.mediator.User(ctx, loginName) + if err != nil { + return nil, err + } userName := "" if user.Name != nil { userName = string(*user.Name) diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go index 825a0f98..873d5f62 100644 --- a/bridge/github/import_mediator.go +++ b/bridge/github/import_mediator.go @@ -11,12 +11,12 @@ import ( const ( // These values influence how fast the github graphql rate limit is exhausted. - NUM_ISSUES = 40 - NUM_ISSUE_EDITS = 100 - NUM_TIMELINE_ITEMS = 100 - NUM_COMMENT_EDITS = 100 + NumIssues = 40 + NumIssueEdits = 100 + NumTimelineItems = 100 + NumCommentEdits = 100 - CHAN_CAPACITY = 128 + ChanCapacity = 128 ) // importMediator provides a convenient interface to retrieve issues from the Github GraphQL API. @@ -90,7 +90,7 @@ func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, proj owner: owner, project: project, since: since, - importEvents: make(chan ImportEvent, CHAN_CAPACITY), + importEvents: make(chan ImportEvent, ChanCapacity), err: nil, } go func() { @@ -107,33 +107,33 @@ func newIssueVars(owner, project string, since time.Time) varmap { "owner": githubv4.String(owner), "name": githubv4.String(project), "issueSince": githubv4.DateTime{Time: since}, - "issueFirst": githubv4.Int(NUM_ISSUES), - "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS), + "issueFirst": githubv4.Int(NumIssues), + "issueEditLast": githubv4.Int(NumIssueEdits), "issueEditBefore": (*githubv4.String)(nil), - "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS), + "timelineFirst": githubv4.Int(NumTimelineItems), "timelineAfter": (*githubv4.String)(nil), - "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), + "commentEditLast": githubv4.Int(NumCommentEdits), "commentEditBefore": (*githubv4.String)(nil), } } func newIssueEditVars() varmap { return varmap{ - "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS), + "issueEditLast": githubv4.Int(NumIssueEdits), } } func newTimelineVars() varmap { return varmap{ - "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS), - "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), + "timelineFirst": githubv4.Int(NumTimelineItems), + "commentEditLast": githubv4.Int(NumCommentEdits), "commentEditBefore": (*githubv4.String)(nil), } } func newCommentEditVars() varmap { return varmap{ - "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), + "commentEditLast": githubv4.Int(NumCommentEdits), } } @@ -316,7 +316,7 @@ func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String if cursor == "" { vars["issueAfter"] = (*githubv4.String)(nil) } else { - vars["issueAfter"] = githubv4.String(cursor) + vars["issueAfter"] = cursor } query := issueQuery{} if err := mm.mQuery(ctx, &query, vars); err != nil { -- cgit From be24cdfec43fe9adcd937ed4bdac28250e214ec6 Mon Sep 17 00:00:00 2001 From: Alexander Scharinger Date: Fri, 9 Apr 2021 15:41:27 +0200 Subject: Revert "Bridges: move credential loading and client creation" This reverts commit 3d14e2e67c4985c429471ea6643f013ade2c2692. --- bridge/gitlab/import.go | 25 +++++++++++++----------- bridge/jira/import.go | 52 ++++++++++++++++++++++++------------------------- 2 files changed, 39 insertions(+), 38 deletions(-) (limited to 'bridge') diff --git a/bridge/gitlab/import.go b/bridge/gitlab/import.go index 7dc99056..7939f4e4 100644 --- a/bridge/gitlab/import.go +++ b/bridge/gitlab/import.go @@ -33,29 +33,32 @@ type gitlabImporter struct { func (gi *gitlabImporter) Init(_ context.Context, repo *cache.RepoCache, conf core.Configuration) error { gi.conf = conf - return nil -} -// ImportAll iterate over all the configured repository issues (notes) and ensure the creation -// of the missing issues / comments / label events / title changes ... -func (gi *gitlabImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) { creds, err := auth.List(repo, auth.WithTarget(target), auth.WithKind(auth.KindToken), - auth.WithMeta(auth.MetaKeyBaseURL, gi.conf[confKeyGitlabBaseUrl]), - auth.WithMeta(auth.MetaKeyLogin, gi.conf[confKeyDefaultLogin]), + auth.WithMeta(auth.MetaKeyBaseURL, conf[confKeyGitlabBaseUrl]), + auth.WithMeta(auth.MetaKeyLogin, conf[confKeyDefaultLogin]), ) if err != nil { - return nil, err + return err } + if len(creds) == 0 { - return nil, ErrMissingIdentityToken + return ErrMissingIdentityToken } - gi.client, err = buildClient(gi.conf[confKeyGitlabBaseUrl], creds[0].(*auth.Token)) + + gi.client, err = buildClient(conf[confKeyGitlabBaseUrl], creds[0].(*auth.Token)) if err != nil { - return nil, err + return err } + return nil +} + +// ImportAll iterate over all the configured repository issues (notes) and ensure the creation +// of the missing issues / comments / label events / title changes ... +func (gi *gitlabImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) { gi.iterator = iterator.NewIterator(ctx, gi.client, 10, gi.conf[confKeyProjectID], since) out := make(chan core.ImportResult) gi.out = out diff --git a/bridge/jira/import.go b/bridge/jira/import.go index f9daadd8..00148bb6 100644 --- a/bridge/jira/import.go +++ b/bridge/jira/import.go @@ -34,12 +34,6 @@ type jiraImporter struct { // Init . func (ji *jiraImporter) Init(ctx context.Context, repo *cache.RepoCache, conf core.Configuration) error { ji.conf = conf - return nil -} - -// ImportAll iterate over all the configured repository issues and ensure the -// creation of the missing issues / timeline items / edits / label events ... -func (ji *jiraImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) { var cred auth.Credential @@ -47,40 +41,44 @@ func (ji *jiraImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, si creds, err := auth.List(repo, auth.WithTarget(target), auth.WithKind(auth.KindLoginPassword), - auth.WithMeta(auth.MetaKeyBaseURL, ji.conf[confKeyBaseUrl]), - auth.WithMeta(auth.MetaKeyLogin, ji.conf[confKeyDefaultLogin]), + auth.WithMeta(auth.MetaKeyBaseURL, conf[confKeyBaseUrl]), + auth.WithMeta(auth.MetaKeyLogin, conf[confKeyDefaultLogin]), ) if err != nil { - return nil, err + return err } if len(creds) > 0 { cred = creds[0] - } else { - creds, err = auth.List(repo, - auth.WithTarget(target), - auth.WithKind(auth.KindLogin), - auth.WithMeta(auth.MetaKeyBaseURL, ji.conf[confKeyBaseUrl]), - auth.WithMeta(auth.MetaKeyLogin, ji.conf[confKeyDefaultLogin]), - ) - if err != nil { - return nil, err - } - if len(creds) > 0 { - cred = creds[0] - } + goto end } + creds, err = auth.List(repo, + auth.WithTarget(target), + auth.WithKind(auth.KindLogin), + auth.WithMeta(auth.MetaKeyBaseURL, conf[confKeyBaseUrl]), + auth.WithMeta(auth.MetaKeyLogin, conf[confKeyDefaultLogin]), + ) + if err != nil { + return err + } + if len(creds) > 0 { + cred = creds[0] + } + +end: if cred == nil { - return nil, fmt.Errorf("no credential for this bridge") + return fmt.Errorf("no credential for this bridge") } // TODO(josh)[da52062]: Validate token and if it is expired then prompt for // credentials and generate a new one - ji.client, err = buildClient(ctx, ji.conf[confKeyBaseUrl], ji.conf[confKeyCredentialType], cred) - if err != nil { - return nil, err - } + ji.client, err = buildClient(ctx, conf[confKeyBaseUrl], conf[confKeyCredentialType], cred) + return err +} +// ImportAll iterate over all the configured repository issues and ensure the +// creation of the missing issues / timeline items / edits / label events ... +func (ji *jiraImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) { sinceStr := since.Format("2006-01-02 15:04") project := ji.conf[confKeyProject] -- cgit From 8fb6ea0d9578bc1cf94649091ddd03d038dddc47 Mon Sep 17 00:00:00 2001 From: Alexander Scharinger Date: Fri, 9 Apr 2021 15:56:03 +0200 Subject: Github brdige: move credential loading and client creation back Reason: failing integration tests --- bridge/github/import.go | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) (limited to 'bridge') diff --git a/bridge/github/import.go b/bridge/github/import.go index 306ef087..bf43a877 100644 --- a/bridge/github/import.go +++ b/bridge/github/import.go @@ -21,6 +21,9 @@ const EmptyTitlePlaceholder = "" type githubImporter struct { conf core.Configuration + // default client + client *githubv4.Client + // mediator to access the Github API mediator *importMediator @@ -28,27 +31,28 @@ type githubImporter struct { out chan<- core.ImportResult } -func (gi *githubImporter) Init(_ context.Context, _ *cache.RepoCache, conf core.Configuration) error { +func (gi *githubImporter) Init(_ context.Context, repo *cache.RepoCache, conf core.Configuration) error { gi.conf = conf - return nil -} - -// ImportAll iterate over all the configured repository issues and ensure the creation of the -// missing issues / timeline items / edits / label events ... -func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) { creds, err := auth.List(repo, auth.WithTarget(target), auth.WithKind(auth.KindToken), - auth.WithMeta(auth.MetaKeyLogin, gi.conf[confKeyDefaultLogin]), + auth.WithMeta(auth.MetaKeyLogin, conf[confKeyDefaultLogin]), ) if err != nil { - return nil, err + return err } if len(creds) <= 0 { - return nil, ErrMissingIdentityToken + return ErrMissingIdentityToken } - client := buildClient(creds[0].(*auth.Token)) - gi.mediator = NewImportMediator(ctx, client, gi.conf[confKeyOwner], gi.conf[confKeyProject], since) + gi.client = buildClient(creds[0].(*auth.Token)) + + return nil +} + +// ImportAll iterate over all the configured repository issues and ensure the creation of the +// missing issues / timeline items / edits / label events ... +func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) { + gi.mediator = NewImportMediator(ctx, gi.client, gi.conf[confKeyOwner], gi.conf[confKeyProject], since) out := make(chan core.ImportResult) gi.out = out -- cgit