aboutsummaryrefslogtreecommitdiffstats
path: root/bridge/github/client.go
blob: 00981a569789d2d18f8f2ef29973fd6025b2a520 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package github

import (
	"context"
	"fmt"
	"net/http"
	"strings"
	"time"

	"github.com/shurcooL/githubv4"

	"github.com/MichaelMure/git-bug/bridge/core"
)

var _ Client = &githubv4.Client{}

// Client is an interface conforming with githubv4.Client
type Client interface {
	Mutate(context.Context, interface{}, githubv4.Input, map[string]interface{}) error
	Query(context.Context, interface{}, map[string]interface{}) error
}

// rateLimitHandlerClient wrapps the Github client and adds improved error handling and handling of
// Github's GraphQL rate limit.
type rateLimitHandlerClient struct {
	sc Client
}

func newRateLimitHandlerClient(httpClient *http.Client) *rateLimitHandlerClient {
	return &rateLimitHandlerClient{sc: githubv4.NewClient(httpClient)}
}

// mutate calls the github api with a graphql mutation and sends a core.ExportResult for each rate limiting event
func (c *rateLimitHandlerClient) mutate(ctx context.Context, m interface{}, input githubv4.Input, vars map[string]interface{}, out chan<- core.ExportResult) error {
	// prepare a closure for the mutation
	mutFun := func(ctx context.Context) error {
		return c.sc.Mutate(ctx, m, input, vars)
	}
	callback := func(msg string) {
		select {
		case <-ctx.Done():
		case out <- core.NewExportRateLimiting(msg):
		}
	}
	return c.callAPIAndRetry(ctx, mutFun, callback)
}

// queryImport calls the github api with a graphql query, and sends an ImportEvent for each rate limiting event
func (c *rateLimitHandlerClient) queryImport(ctx context.Context, query interface{}, vars map[string]interface{}, importEvents chan<- ImportEvent) error {
	// prepare a closure for the query
	queryFun := func(ctx context.Context) error {
		return c.sc.Query(ctx, query, vars)
	}
	callback := func(msg string) {
		select {
		case <-ctx.Done():
		case importEvents <- RateLimitingEvent{msg}:
		}
	}
	return c.callAPIAndRetry(ctx, queryFun, callback)
}

// queryImport calls the github api with a graphql query, and sends a core.ExportResult for each rate limiting event
func (c *rateLimitHandlerClient) queryExport(ctx context.Context, query interface{}, vars map[string]interface{}, out chan<- core.ExportResult) error {
	// prepare a closure for the query
	queryFun := func(ctx context.Context) error {
		return c.sc.Query(ctx, query, vars)
	}
	callback := func(msg string) {
		select {
		case <-ctx.Done():
		case out <- core.NewExportRateLimiting(msg):
		}
	}
	return c.callAPIAndRetry(ctx, queryFun, callback)
}

// queryPrintMsgs calls the github api with a graphql query, and prints a message to stdout for every rate limiting event .
func (c *rateLimitHandlerClient) queryPrintMsgs(ctx context.Context, query interface{}, vars map[string]interface{}) error {
	// prepare a closure for the query
	queryFun := func(ctx context.Context) error {
		return c.sc.Query(ctx, query, vars)
	}
	callback := func(msg string) {
		fmt.Println(msg)
	}
	return c.callAPIAndRetry(ctx, queryFun, callback)
}

// callAPIAndRetry calls the Github GraphQL API (inderectely through callAPIDealWithLimit) and in
// case of error it repeats the request to the Github API. The parameter `apiCall` is intended to be
// a closure containing a query or a mutation to the Github GraphQL API.
func (c *rateLimitHandlerClient) callAPIAndRetry(ctx context.Context, apiCall func(context.Context) error, rateLimitEvent func(msg string)) error {
	var err error
	if err = c.callAPIDealWithLimit(ctx, apiCall, rateLimitEvent); err == nil {
		return nil
	}
	// failure; the reason may be temporary network problems or internal errors
	// on the github servers. Internal errors on the github servers are quite common.
	// Retry
	retries := 3
	for i := 0; i < retries; i++ {
		// wait a few seconds before retry
		sleepTime := time.Duration(8*(i+1)) * time.Second
		timer := time.NewTimer(sleepTime)
		select {
		case <-ctx.Done():
			stop(timer)
			return ctx.Err()
		case <-timer.C:
			err = c.callAPIDealWithLimit(ctx, apiCall, rateLimitEvent)
			if err == nil {
				return nil
			}
		}
	}
	return err
}

// callAPIDealWithLimit calls the Github GraphQL API and if the Github API returns a rate limiting
// error, then it waits until the rate limit is reset, and it repeats the request to the API. The
// parameter `apiCall` is intended to be a closure containing a query or a mutation to the Github
// GraphQL API.
func (c *rateLimitHandlerClient) callAPIDealWithLimit(ctx context.Context, apiCall func(context.Context) error, rateLimitCallback func(msg string)) error {
	qctx, cancel := context.WithTimeout(ctx, defaultTimeout)
	defer cancel()
	// call the function fun()
	err := apiCall(qctx)
	if err == nil {
		return nil
	}
	// matching the error string
	if strings.Contains(err.Error(), "API rate limit exceeded") {
		// a rate limit error
		qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
		defer cancel()
		// Use a separate query to get Github rate limiting information.
		limitQuery := rateLimitQuery{}
		if err := c.sc.Query(qctx, &limitQuery, map[string]interface{}{}); err != nil {
			return err
		}
		// Get the time when Github will reset the rate limit of their API.
		resetTime := limitQuery.RateLimit.ResetAt.Time
		msg := fmt.Sprintf(
			"Github GraphQL API rate limit. This process will sleep until %s.",
			resetTime.String(),
		)
		// Send message about rate limiting event.
		rateLimitCallback(msg)

		// Pause current goroutine
		timer := time.NewTimer(time.Until(resetTime))
		select {
		case <-ctx.Done():
			stop(timer)
			return ctx.Err()
		case <-timer.C:
		}
		// call the function apiCall() again
		qctx, cancel = context.WithTimeout(ctx, defaultTimeout)
		defer cancel()
		err = apiCall(qctx)
		return err // might be nil
	} else {
		return err
	}
}

func stop(t *time.Timer) {
	if !t.Stop() {
		select {
		case <-t.C:
		default:
		}
	}
}