aboutsummaryrefslogtreecommitdiffstats
path: root/bridge/github/import_mediator.go
diff options
context:
space:
mode:
Diffstat (limited to 'bridge/github/import_mediator.go')
-rw-r--r--bridge/github/import_mediator.go394
1 files changed, 394 insertions, 0 deletions
diff --git a/bridge/github/import_mediator.go b/bridge/github/import_mediator.go
new file mode 100644
index 00000000..428c5d36
--- /dev/null
+++ b/bridge/github/import_mediator.go
@@ -0,0 +1,394 @@
+package github
+
+import (
+ "context"
+ "fmt"
+ "runtime"
+ "sync"
+ "time"
+
+ "github.com/shurcooL/githubv4"
+)
+
+type varmap map[string]interface{}
+
+func trace() {
+ pc := make([]uintptr, 15)
+ n := runtime.Callers(2, pc)
+ frames := runtime.CallersFrames(pc[:n])
+ frame, _ := frames.Next()
+ fmt.Printf("%s:%d %s\n", frame.File, frame.Line, frame.Function)
+}
+
+const (
+ NUM_ISSUES = 50
+ NUM_ISSUE_EDITS = 99
+ NUM_TIMELINE_ITEMS = 99
+ NUM_COMMENT_EDITS = 99
+
+ CHAN_CAPACITY = 128
+)
+
+// TODO: remove all debug output and trace() in all files. Use ag
+
+type importMediator struct {
+ // Github graphql client
+ gc *githubv4.Client
+ owner string
+ project string
+ // The iterator will only query issues updated or created after the date given in
+ // the variable since.
+ since time.Time
+
+ issues chan issue
+ issueEditsMut sync.Mutex
+ timelineItemsMut sync.Mutex
+ commentEditsMut sync.Mutex
+ issueEdits map[githubv4.ID]chan userContentEdit
+ timelineItems map[githubv4.ID]chan timelineItem
+ commentEdits map[githubv4.ID]chan userContentEdit
+
+ // Sticky error
+ err error
+}
+
+func NewImportMediator(ctx context.Context, client *githubv4.Client, owner, project string, since time.Time) *importMediator {
+ mm := importMediator{
+ gc: client,
+ owner: owner,
+ project: project,
+ since: since,
+ issues: make(chan issue, CHAN_CAPACITY),
+ issueEditsMut: sync.Mutex{},
+ timelineItemsMut: sync.Mutex{},
+ commentEditsMut: sync.Mutex{},
+ issueEdits: make(map[githubv4.ID]chan userContentEdit),
+ timelineItems: make(map[githubv4.ID]chan timelineItem),
+ commentEdits: make(map[githubv4.ID]chan userContentEdit),
+ err: nil,
+ }
+ go func() {
+ defer close(mm.issues)
+ mm.fillChannels(ctx)
+ }()
+ return &mm
+}
+
+func (mm *importMediator) Issues() <-chan issue {
+ return mm.issues
+}
+
+func (mm *importMediator) IssueEdits(issue *issue) <-chan userContentEdit {
+ mm.issueEditsMut.Lock()
+ channel := mm.issueEdits[issue.Id]
+ mm.issueEditsMut.Unlock()
+ return channel
+}
+
+func (mm *importMediator) TimelineItems(issue *issue) <-chan timelineItem {
+ mm.timelineItemsMut.Lock()
+ channel := mm.timelineItems[issue.Id]
+ mm.timelineItemsMut.Unlock()
+ return channel
+}
+
+func (mm *importMediator) CommentEdits(comment *issueComment) <-chan userContentEdit {
+ mm.commentEditsMut.Lock()
+ channel := mm.commentEdits[comment.Id]
+ mm.commentEditsMut.Unlock()
+ return channel
+}
+
+func (mm *importMediator) Error() error {
+ return mm.err
+}
+
+func (mm *importMediator) User(ctx context.Context, loginName string) (*user, error) {
+ query := userQuery{}
+ vars := varmap{"login": githubv4.String(loginName)}
+ c, cancel := context.WithTimeout(ctx, defaultTimeout)
+ defer cancel()
+ if err := mm.mQuery(c, &query, vars); err != nil {
+ return nil, err
+ }
+ return &query.User, nil
+}
+
+func (mm *importMediator) fillChannels(ctx context.Context) {
+ issueCursor := githubv4.String("")
+ for {
+ issues, hasIssues := mm.queryIssue(ctx, issueCursor)
+ if !hasIssues {
+ break
+ }
+ issueCursor = issues.PageInfo.EndCursor
+ for _, issueNode := range issues.Nodes {
+ // fmt.Printf(">>> issue: %v\n", issueNode.issue.Title)
+ mm.fillChannelIssueEdits(ctx, &issueNode)
+ mm.fillChannelTimeline(ctx, &issueNode)
+ // To avoid race conditions add the issue only after all its edits,
+ // timeline times, etc. are added to their respective channels.
+ mm.issues <- issueNode.issue
+ }
+ }
+}
+
+func (mm *importMediator) fillChannelIssueEdits(ctx context.Context, issueNode *issueNode) {
+ // fmt.Printf("fillChannelIssueEdit() issue id == %v\n", issueNode.issue.Id)
+ // fmt.Printf("%v\n", issueNode)
+ channel := make(chan userContentEdit, CHAN_CAPACITY)
+ defer close(channel)
+ mm.issueEditsMut.Lock()
+ mm.issueEdits[issueNode.issue.Id] = channel
+ mm.issueEditsMut.Unlock()
+ edits := &issueNode.UserContentEdits
+ hasEdits := true
+ for hasEdits {
+ // fmt.Println("before the reversed loop")
+ for edit := range reverse(edits.Nodes) {
+ // fmt.Println("in the reversed loop")
+ if edit.Diff == nil || string(*edit.Diff) == "" {
+ // issueEdit.Diff == nil happen if the event is older than
+ // early 2018, Github doesn't have the data before that.
+ // Best we can do is to ignore the event.
+ continue
+ }
+ // fmt.Printf("about to push issue edit\n")
+ channel <- edit
+ }
+ // fmt.Printf("has next ? %v\n", edits.PageInfo.HasNextPage)
+ // fmt.Printf("has previous ? %v\n", edits.PageInfo.HasPreviousPage)
+ if !edits.PageInfo.HasPreviousPage {
+ break
+ }
+ edits, hasEdits = mm.queryIssueEdits(ctx, issueNode.issue.Id, edits.PageInfo.EndCursor)
+ }
+}
+
+func (mm *importMediator) fillChannelTimeline(ctx context.Context, issueNode *issueNode) {
+ // fmt.Printf("fullChannelTimeline()\n")
+ channel := make(chan timelineItem, CHAN_CAPACITY)
+ defer close(channel)
+ mm.timelineItemsMut.Lock()
+ mm.timelineItems[issueNode.issue.Id] = channel
+ mm.timelineItemsMut.Unlock()
+ items := &issueNode.TimelineItems
+ hasItems := true
+ for hasItems {
+ for _, item := range items.Nodes {
+ channel <- item
+ mm.fillChannelCommentEdits(ctx, &item)
+ }
+ // fmt.Printf("has next ? %v\n", items.PageInfo.HasNextPage)
+ // fmt.Printf("has previous ? %v\n", items.PageInfo.HasPreviousPage)
+ if !items.PageInfo.HasNextPage {
+ break
+ }
+ items, hasItems = mm.queryTimelineItems(ctx, issueNode.issue.Id, items.PageInfo.EndCursor)
+ }
+}
+
+func (mm *importMediator) fillChannelCommentEdits(ctx context.Context, item *timelineItem) {
+ // This concerns only timeline items of type comment
+ if item.Typename != "IssueComment" {
+ return
+ }
+ comment := &item.IssueComment
+ channel := make(chan userContentEdit, CHAN_CAPACITY)
+ defer close(channel)
+ mm.commentEditsMut.Lock()
+ mm.commentEdits[comment.Id] = channel
+ mm.commentEditsMut.Unlock()
+ edits := &comment.UserContentEdits
+ hasEdits := true
+ for hasEdits {
+ for edit := range reverse(edits.Nodes) {
+ if edit.Diff == nil || string(*edit.Diff) == "" {
+ // issueEdit.Diff == nil happen if the event is older than
+ // early 2018, Github doesn't have the data before that.
+ // Best we can do is to ignore the event.
+ continue
+ }
+ channel <- edit
+ }
+ if !edits.PageInfo.HasPreviousPage {
+ break
+ }
+ edits, hasEdits = mm.queryCommentEdits(ctx, comment.Id, edits.PageInfo.EndCursor)
+ }
+}
+
+func (mm *importMediator) queryCommentEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
+ // trace()
+ vars := varmap{
+ "gqlNodeId": nid,
+ "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS),
+ }
+ if cursor == "" {
+ vars["commentEditBefore"] = (*githubv4.String)(nil)
+ } else {
+ vars["commentEditBefore"] = cursor
+ }
+ c, cancel := context.WithTimeout(ctx, defaultTimeout)
+ defer cancel()
+ query := commentEditQuery{}
+ if err := mm.mQuery(c, &query, vars); err != nil {
+ mm.err = err
+ return nil, false
+ }
+ connection := &query.Node.IssueComment.UserContentEdits
+ if len(connection.Nodes) <= 0 {
+ return nil, false
+ }
+ return connection, true
+}
+
+func (mm *importMediator) queryTimelineItems(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*timelineItemsConnection, bool) {
+ // trace()
+ vars := varmap{
+ "gqlNodeId": nid,
+ "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS),
+ "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS),
+ "commentEditBefore": (*githubv4.String)(nil),
+ }
+ if cursor == "" {
+ vars["timelineAfter"] = (*githubv4.String)(nil)
+ } else {
+ vars["timelineAfter"] = cursor
+ }
+ c, cancel := context.WithTimeout(ctx, defaultTimeout)
+ defer cancel()
+ query := timelineQuery{}
+ if err := mm.mQuery(c, &query, vars); err != nil {
+ mm.err = err
+ return nil, false
+ }
+ connection := &query.Node.Issue.TimelineItems
+ if len(connection.Nodes) <= 0 {
+ return nil, false
+ }
+ return connection, true
+}
+
+func (mm *importMediator) queryIssueEdits(ctx context.Context, nid githubv4.ID, cursor githubv4.String) (*userContentEditConnection, bool) {
+ // trace()
+ vars := varmap{
+ "gqlNodeId": nid,
+ "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS),
+ }
+ if cursor == "" {
+ vars["issueEditBefore"] = (*githubv4.String)(nil)
+ } else {
+ vars["issueEditBefore"] = cursor
+ }
+ c, cancel := context.WithTimeout(ctx, defaultTimeout)
+ defer cancel()
+ query := issueEditQuery{}
+ if err := mm.mQuery(c, &query, vars); err != nil {
+ mm.err = err
+ return nil, false
+ }
+ connection := &query.Node.Issue.UserContentEdits
+ if len(connection.Nodes) <= 0 {
+ return nil, false
+ }
+ return connection, true
+}
+
+func (mm *importMediator) queryIssue(ctx context.Context, cursor githubv4.String) (*issueConnection, bool) {
+ // trace()
+ vars := varmap{
+ "owner": githubv4.String(mm.owner),
+ "name": githubv4.String(mm.project),
+ "issueSince": githubv4.DateTime{Time: mm.since},
+ "issueFirst": githubv4.Int(NUM_ISSUES),
+ "issueEditLast": githubv4.Int(NUM_ISSUE_EDITS),
+ "issueEditBefore": (*githubv4.String)(nil),
+ "timelineFirst": githubv4.Int(NUM_TIMELINE_ITEMS),
+ "timelineAfter": (*githubv4.String)(nil),
+ "commentEditLast": githubv4.Int(NUM_COMMENT_EDITS),
+ "commentEditBefore": (*githubv4.String)(nil),
+ }
+ if cursor == "" {
+ vars["issueAfter"] = (*githubv4.String)(nil)
+ } else {
+ vars["issueAfter"] = githubv4.String(cursor)
+ }
+ c, cancel := context.WithTimeout(ctx, defaultTimeout)
+ defer cancel()
+ query := issueQuery{}
+ if err := mm.mQuery(c, &query, vars); err != nil {
+ mm.err = err
+ return nil, false
+ }
+ connection := &query.Repository.Issues
+ if len(connection.Nodes) <= 0 {
+ return nil, false
+ }
+ return connection, true
+}
+
+func reverse(eds []userContentEdit) chan userContentEdit {
+ ret := make(chan userContentEdit)
+ go func() {
+ for i := range eds {
+ ret <- eds[len(eds)-1-i]
+ }
+ close(ret)
+ }()
+ return ret
+}
+
+type rateLimiter interface {
+ rateLimit() rateLimit
+}
+
+// TODO: move that into its own file
+//
+// mQuery executes a single GraphQL query. The variable query is used to derive the GraphQL
+// query and it is used to populate the response into it. It should be a pointer to a struct
+// that corresponds to the Github graphql schema and it should implement the rateLimiter
+// interface. This function queries Github for the remaining rate limit points before
+// executing the actual query. The function waits, if there are not enough rate limiting
+// points left.
+func (mm *importMediator) mQuery(ctx context.Context, query rateLimiter, vars map[string]interface{}) error {
+ // First: check the cost of the query and wait if necessary
+ vars["dryRun"] = githubv4.Boolean(true)
+ qctx, cancel := context.WithTimeout(ctx, defaultTimeout)
+ defer cancel()
+ if err := mm.gc.Query(qctx, query, vars); err != nil {
+ return err
+ }
+ fmt.Printf("%v\n", query)
+ rateLimit := query.rateLimit()
+ if rateLimit.Cost > rateLimit.Remaining {
+ resetTime := rateLimit.ResetAt.Time
+ fmt.Println("Github rate limit exhausted")
+ fmt.Printf("Sleeping until %s\n", resetTime.String())
+ // Add a few seconds (8) for good measure
+ timer := time.NewTimer(time.Until(resetTime.Add(8 * time.Second)))
+ select {
+ case <-ctx.Done():
+ stop(timer)
+ return ctx.Err()
+ case <-timer.C:
+ }
+ }
+ // Second: Do the actual query
+ vars["dryRun"] = githubv4.Boolean(false)
+ qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
+ defer cancel()
+ if err := mm.gc.Query(qctx, query, vars); err != nil {
+ return err
+ }
+ return nil
+}
+
+func stop(t *time.Timer) {
+ if !t.Stop() {
+ select {
+ case <-t.C:
+ default:
+ }
+ }
+}