diff options
Diffstat (limited to 'bridge/github/import_mediator.go')
-rw-r--r-- | bridge/github/import_mediator.go | 370 |
1 files changed, 214 insertions, 156 deletions
diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go index 428c5d36..8bd33adb 100644 --- a/bridge/github/import_mediator.go +++ b/bridge/github/import_mediator.go @@ -3,53 +3,63 @@ package github import ( "context" "fmt" - "runtime" + "strings" "sync" "time" "github.com/shurcooL/githubv4" ) -type varmap map[string]interface{} - -func trace() { - pc := make([]uintptr, 15) - n := runtime.Callers(2, pc) - frames := runtime.CallersFrames(pc[:n]) - frame, _ := frames.Next() - fmt.Printf("%s:%d %s\n", frame.File, frame.Line, frame.Function) -} - -const ( - NUM_ISSUES = 50 - NUM_ISSUE_EDITS = 99 - NUM_TIMELINE_ITEMS = 99 - NUM_COMMENT_EDITS = 99 +const ( // These values influence how fast the github graphql rate limit is exhausted. + NUM_ISSUES = 40 + NUM_ISSUE_EDITS = 100 + NUM_TIMELINE_ITEMS = 100 + NUM_COMMENT_EDITS = 100 CHAN_CAPACITY = 128 ) -// TODO: remove all debug output and trace() in all files. Use ag +type varmap map[string]interface{} +// importMediator provides an interface to retrieve Github issues. type importMediator struct { // Github graphql client - gc *githubv4.Client - owner string + gc *githubv4.Client + + // name of the repository owner on Github + owner string + + // name of the Github repository project string - // The iterator will only query issues updated or created after the date given in + + // The importMediator will only query issues updated or created after the date given in // the variable since. since time.Time - issues chan issue - issueEditsMut sync.Mutex - timelineItemsMut sync.Mutex - commentEditsMut sync.Mutex - issueEdits map[githubv4.ID]chan userContentEdit + // channel for the issues + issues chan issue + + // channel for issue edits + issueEdits map[githubv4.ID]chan userContentEdit + issueEditsMut sync.Mutex + + // channel for timeline items timelineItems map[githubv4.ID]chan timelineItem - commentEdits map[githubv4.ID]chan userContentEdit + timelineItemsMut sync.Mutex + + // channel for comment edits + commentEdits map[githubv4.ID]chan userContentEdit + commentEditsMut sync.Mutex // Sticky error - err error + err error + errMut sync.Mutex +} + +func (mm *importMediator) setError(err error) { + mm.errMut.Lock() + mm.err = err + mm.errMut.Unlock() } func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator { @@ -59,21 +69,56 @@ func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, proj project: project, since: since, issues: make(chan issue, CHAN_CAPACITY), - issueEditsMut: sync.Mutex{}, - timelineItemsMut: sync.Mutex{}, - commentEditsMut: sync.Mutex{}, issueEdits: make(map[githubv4.ID]chan userContentEdit), + issueEditsMut: sync.Mutex{}, timelineItems: make(map[githubv4.ID]chan timelineItem), + timelineItemsMut: sync.Mutex{}, commentEdits: make(map[githubv4.ID]chan userContentEdit), + commentEditsMut: sync.Mutex{}, err: nil, } go func() { - defer close(mm.issues) - mm.fillChannels(ctx) + mm.fillIssues(ctx) + close(mm.issues) }() return &mm } +func newIssueVars(owner, project string, since time.Time) varmap { + return varmap{ + "owner": githubv4.String(owner), + "name": githubv4.String(project), + "issueSince": githubv4.DateTime{Time: since}, + "issueFirst": githubv4.Int(NUM_ISSUES), + "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS), + "issueEditBefore": (*githubv4.String)(nil), + "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS), + "timelineAfter": (*githubv4.String)(nil), + "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), + "commentEditBefore": (*githubv4.String)(nil), + } +} + +func newIssueEditVars() varmap { + return varmap{ + "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS), + } +} + +func newTimelineVars() varmap { + return varmap{ + "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS), + "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), + "commentEditBefore": (*githubv4.String)(nil), + } +} + +func newCommentEditVars() varmap { + return varmap{ + "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), + } +} + func (mm *importMediator) Issues() <-chan issue { return mm.issues } @@ -100,64 +145,85 @@ func (mm *importMediator) CommentEdits(comment *issueComment) <-chan userContent } func (mm *importMediator) Error() error { - return mm.err + mm.errMut.Lock() + err := mm.err + mm.errMut.Unlock() + return err } func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) { query := userQuery{} vars := varmap{"login": githubv4.String(loginName)} - c, cancel := context.WithTimeout(ctx, defaultTimeout) - defer cancel() - if err := mm.mQuery(c, &query, vars); err != nil { + if err := mm.mQuery(ctx, &query, vars); err != nil { return nil, err } return &query.User, nil } -func (mm *importMediator) fillChannels(ctx context.Context) { - issueCursor := githubv4.String("") - for { - issues, hasIssues := mm.queryIssue(ctx, issueCursor) - if !hasIssues { - break +func (mm *importMediator) fillIssues(ctx context.Context) { + initialCursor := githubv4.String("") + issues, hasIssues := mm.queryIssue(ctx, initialCursor) + for hasIssues { + for _, node := range issues.Nodes { + // The order of statements in this loop is crucial for the correct concurrent + // execution. + // + // The issue edit channel and the timeline channel need to be added to the + // corresponding maps before the issue is sent in the issue channel. + // Otherwise, the client could try to retrieve issue edits and timeline itmes + // before these channels are even created. In this case the client would + // receive a nil channel. + issueEditChan := make(chan userContentEdit, CHAN_CAPACITY) + timelineChan := make(chan timelineItem, CHAN_CAPACITY) + mm.issueEditsMut.Lock() + mm.issueEdits[node.issue.Id] = issueEditChan + mm.issueEditsMut.Unlock() + mm.timelineItemsMut.Lock() + mm.timelineItems[node.issue.Id] = timelineChan + mm.timelineItemsMut.Unlock() + select { + case <-ctx.Done(): + return + case mm.issues <- node.issue: + } + + // We do not know whether the client reads from the issue edit channel + // or the timeline channel first. Since the capacity of any channel is limited + // any send operation may block. Hence, in order to avoid deadlocks we need + // to send over both these channels concurrently. + go func(node issueNode) { + mm.fillIssueEdits(ctx, &node, issueEditChan) + close(issueEditChan) + }(node) + go func(node issueNode) { + mm.fillTimeline(ctx, &node, timelineChan) + close(timelineChan) + }(node) } - issueCursor = issues.PageInfo.EndCursor - for _, issueNode := range issues.Nodes { - // fmt.Printf(">>> issue: %v\n", issueNode.issue.Title) - mm.fillChannelIssueEdits(ctx, &issueNode) - mm.fillChannelTimeline(ctx, &issueNode) - // To avoid race conditions add the issue only after all its edits, - // timeline times, etc. are added to their respective channels. - mm.issues <- issueNode.issue + if !issues.PageInfo.HasNextPage { + break } + issues, hasIssues = mm.queryIssue(ctx, issues.PageInfo.EndCursor) } } -func (mm *importMediator) fillChannelIssueEdits(ctx context.Context, issueNode *issueNode) { - // fmt.Printf("fillChannelIssueEdit() issue id == %v\n", issueNode.issue.Id) - // fmt.Printf("%v\n", issueNode) - channel := make(chan userContentEdit, CHAN_CAPACITY) - defer close(channel) - mm.issueEditsMut.Lock() - mm.issueEdits[issueNode.issue.Id] = channel - mm.issueEditsMut.Unlock() +func (mm *importMediator) fillIssueEdits(ctx context.Context, issueNode *issueNode, channel chan userContentEdit) { edits := &issueNode.UserContentEdits hasEdits := true for hasEdits { - // fmt.Println("before the reversed loop") for edit := range reverse(edits.Nodes) { - // fmt.Println("in the reversed loop") if edit.Diff == nil || string(*edit.Diff) == "" { - // issueEdit.Diff == nil happen if the event is older than - // early 2018, Github doesn't have the data before that. - // Best we can do is to ignore the event. + // issueEdit.Diff == nil happen if the event is older than early + // 2018, Github doesn't have the data before that. Best we can do is + // to ignore the event. continue } - // fmt.Printf("about to push issue edit\n") - channel <- edit + select { + case <-ctx.Done(): + return + case channel <- edit: + } } - // fmt.Printf("has next ? %v\n", edits.PageInfo.HasNextPage) - // fmt.Printf("has previous ? %v\n", edits.PageInfo.HasPreviousPage) if !edits.PageInfo.HasPreviousPage { break } @@ -165,51 +231,64 @@ func (mm *importMediator) fillChannelIssueEdits(ctx context.Context, issueNode * } } -func (mm *importMediator) fillChannelTimeline(ctx context.Context, issueNode *issueNode) { - // fmt.Printf("fullChannelTimeline()\n") - channel := make(chan timelineItem, CHAN_CAPACITY) - defer close(channel) - mm.timelineItemsMut.Lock() - mm.timelineItems[issueNode.issue.Id] = channel - mm.timelineItemsMut.Unlock() +func (mm *importMediator) fillTimeline(ctx context.Context, issueNode *issueNode, channel chan timelineItem) { items := &issueNode.TimelineItems hasItems := true for hasItems { for _, item := range items.Nodes { - channel <- item - mm.fillChannelCommentEdits(ctx, &item) + if item.Typename == "IssueComment" { + // Here the order of statements is crucial for correct concurrency. + commentEditChan := make(chan userContentEdit, CHAN_CAPACITY) + mm.commentEditsMut.Lock() + mm.commentEdits[item.IssueComment.Id] = commentEditChan + mm.commentEditsMut.Unlock() + select { + case <-ctx.Done(): + return + case channel <- item: + } + // We need to create a new goroutine for filling the comment edit + // channel. + go func(item timelineItem) { + mm.fillCommentEdits(ctx, &item, commentEditChan) + close(commentEditChan) + }(item) + } else { + select { + case <-ctx.Done(): + return + case channel <- item: + } + } } - // fmt.Printf("has next ? %v\n", items.PageInfo.HasNextPage) - // fmt.Printf("has previous ? %v\n", items.PageInfo.HasPreviousPage) if !items.PageInfo.HasNextPage { break } - items, hasItems = mm.queryTimelineItems(ctx, issueNode.issue.Id, items.PageInfo.EndCursor) + items, hasItems = mm.queryTimeline(ctx, issueNode.issue.Id, items.PageInfo.EndCursor) } } -func (mm *importMediator) fillChannelCommentEdits(ctx context.Context, item *timelineItem) { - // This concerns only timeline items of type comment +func (mm *importMediator) fillCommentEdits(ctx context.Context, item *timelineItem, channel chan userContentEdit) { + // Here we are only concerned with timeline items of type issueComment. if item.Typename != "IssueComment" { return } comment := &item.IssueComment - channel := make(chan userContentEdit, CHAN_CAPACITY) - defer close(channel) - mm.commentEditsMut.Lock() - mm.commentEdits[comment.Id] = channel - mm.commentEditsMut.Unlock() edits := &comment.UserContentEdits hasEdits := true for hasEdits { for edit := range reverse(edits.Nodes) { if edit.Diff == nil || string(*edit.Diff) == "" { - // issueEdit.Diff == nil happen if the event is older than - // early 2018, Github doesn't have the data before that. - // Best we can do is to ignore the event. + // issueEdit.Diff == nil happen if the event is older than early + // 2018, Github doesn't have the data before that. Best we can do is + // to ignore the event. continue } - channel <- edit + select { + case <-ctx.Done(): + return + case channel <- edit: + } } if !edits.PageInfo.HasPreviousPage { break @@ -219,21 +298,16 @@ func (mm *importMediator) fillChannelCommentEdits(ctx context.Context, item *tim } func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) { - // trace() - vars := varmap{ - "gqlNodeId": nid, - "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), - } + vars := newCommentEditVars() + vars["gqlNodeId"] = nid if cursor == "" { vars["commentEditBefore"] = (*githubv4.String)(nil) } else { vars["commentEditBefore"] = cursor } - c, cancel := context.WithTimeout(ctx, defaultTimeout) - defer cancel() query := commentEditQuery{} - if err := mm.mQuery(c, &query, vars); err != nil { - mm.err = err + if err := mm.mQuery(ctx, &query, vars); err != nil { + mm.setError(err) return nil, false } connection := &query.Node.IssueComment.UserContentEdits @@ -243,24 +317,17 @@ func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID return connection, true } -func (mm *importMediator) queryTimelineItems(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) { - // trace() - vars := varmap{ - "gqlNodeId": nid, - "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS), - "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), - "commentEditBefore": (*githubv4.String)(nil), - } +func (mm *importMediator) queryTimeline(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) { + vars := newTimelineVars() + vars["gqlNodeId"] = nid if cursor == "" { vars["timelineAfter"] = (*githubv4.String)(nil) } else { vars["timelineAfter"] = cursor } - c, cancel := context.WithTimeout(ctx, defaultTimeout) - defer cancel() query := timelineQuery{} - if err := mm.mQuery(c, &query, vars); err != nil { - mm.err = err + if err := mm.mQuery(ctx, &query, vars); err != nil { + mm.setError(err) return nil, false } connection := &query.Node.Issue.TimelineItems @@ -271,21 +338,16 @@ func (mm *importMediator) queryTimelineItems(ctx context.Context, nid githubv4.I } func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) { - // trace() - vars := varmap{ - "gqlNodeId": nid, - "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS), - } + vars := newIssueEditVars() + vars["gqlNodeId"] = nid if cursor == "" { vars["issueEditBefore"] = (*githubv4.String)(nil) } else { vars["issueEditBefore"] = cursor } - c, cancel := context.WithTimeout(ctx, defaultTimeout) - defer cancel() query := issueEditQuery{} - if err := mm.mQuery(c, &query, vars); err != nil { - mm.err = err + if err := mm.mQuery(ctx, &query, vars); err != nil { + mm.setError(err) return nil, false } connection := &query.Node.Issue.UserContentEdits @@ -296,29 +358,15 @@ func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, } func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) { - // trace() - vars := varmap{ - "owner": githubv4.String(mm.owner), - "name": githubv4.String(mm.project), - "issueSince": githubv4.DateTime{Time: mm.since}, - "issueFirst": githubv4.Int(NUM_ISSUES), - "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS), - "issueEditBefore": (*githubv4.String)(nil), - "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS), - "timelineAfter": (*githubv4.String)(nil), - "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS), - "commentEditBefore": (*githubv4.String)(nil), - } + vars := newIssueVars(mm.owner, mm.project, mm.since) if cursor == "" { vars["issueAfter"] = (*githubv4.String)(nil) } else { vars["issueAfter"] = githubv4.String(cursor) } - c, cancel := context.WithTimeout(ctx, defaultTimeout) - defer cancel() query := issueQuery{} - if err := mm.mQuery(c, &query, vars); err != nil { - mm.err = err + if err := mm.mQuery(ctx, &query, vars); err != nil { + mm.setError(err) return nil, false } connection := &query.Repository.Issues @@ -343,30 +391,42 @@ type rateLimiter interface { rateLimit() rateLimit } -// TODO: move that into its own file -// -// mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL -// query and it is used to populate the response into it. It should be a pointer to a struct -// that corresponds to the Github graphql schema and it should implement the rateLimiter -// interface. This function queries Github for the remaining rate limit points before -// executing the actual query. The function waits, if there are not enough rate limiting -// points left. +// mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL query +// and it is used to populate the response into it. It should be a pointer to a struct that +// corresponds to the Github graphql schema and it has to implement the rateLimiter interface. If +// there is a Github rate limiting error, then the function sleeps and retries after the rate limit +// is expired. func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error { - // First: check the cost of the query and wait if necessary - vars["dryRun"] = githubv4.Boolean(true) + // first: just send the query to the graphql api + vars["dryRun"] = githubv4.Boolean(false) qctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() + err := mm.gc.Query(qctx, query, vars) + if err == nil { + // no error: done + return nil + } + // matching the error string + if !strings.Contains(err.Error(), "API rate limit exceeded") { + // an error, but not the API rate limit error: done + return err + } + // a rate limit error + // ask the graphql api for rate limiting information + vars["dryRun"] = githubv4.Boolean(true) + qctx, cancel = context.WithTimeout(ctx, defaultTimeout) + defer cancel() if err := mm.gc.Query(qctx, query, vars); err != nil { return err } - fmt.Printf("%v\n", query) rateLimit := query.rateLimit() if rateLimit.Cost > rateLimit.Remaining { + // sleep resetTime := rateLimit.ResetAt.Time - fmt.Println("Github rate limit exhausted") - fmt.Printf("Sleeping until %s\n", resetTime.String()) // Add a few seconds (8) for good measure - timer := time.NewTimer(time.Until(resetTime.Add(8 * time.Second))) + resetTime = resetTime.Add(8 * time.Second) + fmt.Printf("Github rate limit exhausted. Sleeping until %s\n", resetTime.String()) + timer := time.NewTimer(time.Until(resetTime)) select { case <-ctx.Done(): stop(timer) @@ -374,14 +434,12 @@ func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars ma case <-timer.C: } } - // Second: Do the actual query + // run the original query again vars["dryRun"] = githubv4.Boolean(false) qctx, cancel = context.WithTimeout(ctx, defaultTimeout) defer cancel() - if err := mm.gc.Query(qctx, query, vars); err != nil { - return err - } - return nil + err = mm.gc.Query(qctx, query, vars) + return err // might be nil } func stop(t *time.Timer) { |