aboutsummaryrefslogtreecommitdiffstats
path: root/bridge/github
diff options
context:
space:
mode:
authorAmine <hilalyamine@gmail.com>2019-08-19 00:37:54 +0200
committerGitHub <noreply@github.com>2019-08-19 00:37:54 +0200
commit36c91adddfc16b8c5d04eb66dbb4cf8c25cea321 (patch)
tree50484f3531e33c2c63c026fcb1c76f60f8b13862 /bridge/github
parent6428352bd14828f670206b60862de7f71c52d235 (diff)
parente6931aaf6f3173c634b03d515287e4a12fd20f15 (diff)
downloadgit-bug-36c91adddfc16b8c5d04eb66dbb4cf8c25cea321.tar.gz
Merge pull request #190 from MichaelMure/bridge-ctx
Bridge: pull/push enhancements
Diffstat (limited to 'bridge/github')
-rw-r--r--bridge/github/export.go181
-rw-r--r--bridge/github/export_test.go12
-rw-r--r--bridge/github/import.go198
-rw-r--r--bridge/github/import_test.go8
-rw-r--r--bridge/github/iterator.go58
5 files changed, 266 insertions, 191 deletions
diff --git a/bridge/github/export.go b/bridge/github/export.go
index 34c88310..976c5a05 100644
--- a/bridge/github/export.go
+++ b/bridge/github/export.go
@@ -7,12 +7,11 @@ import (
"fmt"
"io/ioutil"
"net/http"
- "strings"
- "sync"
"time"
"github.com/pkg/errors"
"github.com/shurcooL/githubv4"
+ "golang.org/x/sync/errgroup"
"github.com/MichaelMure/git-bug/bridge/core"
"github.com/MichaelMure/git-bug/bug"
@@ -79,7 +78,7 @@ func (ge *githubExporter) getIdentityClient(id entity.Id) (*githubv4.Client, err
}
// ExportAll export all event made by the current user to Github
-func (ge *githubExporter) ExportAll(repo *cache.RepoCache, since time.Time) (<-chan core.ExportResult, error) {
+func (ge *githubExporter) ExportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ExportResult, error) {
out := make(chan core.ExportResult)
user, err := repo.GetUserIdentity()
@@ -91,6 +90,7 @@ func (ge *githubExporter) ExportAll(repo *cache.RepoCache, since time.Time) (<-c
// get repository node id
ge.repositoryID, err = getRepositoryNodeID(
+ ctx,
ge.conf[keyOwner],
ge.conf[keyProject],
ge.conf[keyToken],
@@ -117,20 +117,28 @@ func (ge *githubExporter) ExportAll(repo *cache.RepoCache, since time.Time) (<-c
return
}
- snapshot := b.Snapshot()
+ select {
- // ignore issues created before since date
- // TODO: compare the Lamport time instead of using the unix time
- if snapshot.CreatedAt.Before(since) {
- out <- core.NewExportNothing(b.Id(), "bug created before the since date")
- continue
- }
+ case <-ctx.Done():
+ // stop iterating if context cancel function is called
+ return
- if snapshot.HasAnyActor(allIdentitiesIds...) {
- // try to export the bug and it associated events
- ge.exportBug(b, since, out)
- } else {
- out <- core.NewExportNothing(id, "not an actor")
+ default:
+ snapshot := b.Snapshot()
+
+ // ignore issues created before since date
+ // TODO: compare the Lamport time instead of using the unix time
+ if snapshot.CreatedAt.Before(since) {
+ out <- core.NewExportNothing(b.Id(), "bug created before the since date")
+ continue
+ }
+
+ if snapshot.HasAnyActor(allIdentitiesIds...) {
+ // try to export the bug and it associated events
+ ge.exportBug(ctx, b, since, out)
+ } else {
+ out <- core.NewExportNothing(id, "not an actor")
+ }
}
}
}()
@@ -139,7 +147,7 @@ func (ge *githubExporter) ExportAll(repo *cache.RepoCache, since time.Time) (<-c
}
// exportBug publish bugs and related events
-func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan<- core.ExportResult) {
+func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, since time.Time, out chan<- core.ExportResult) {
snapshot := b.Snapshot()
var bugGithubID string
@@ -199,7 +207,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan
}
// create bug
- id, url, err := createGithubIssue(client, ge.repositoryID, createOp.Title, createOp.Message)
+ id, url, err := createGithubIssue(ctx, client, ge.repositoryID, createOp.Title, createOp.Message)
if err != nil {
err := errors.Wrap(err, "exporting github issue")
out <- core.NewExportError(err, b.Id())
@@ -257,7 +265,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan
opr := op.(*bug.AddCommentOperation)
// send operation to github
- id, url, err = addCommentGithubIssue(client, bugGithubID, opr.Message)
+ id, url, err = addCommentGithubIssue(ctx, client, bugGithubID, opr.Message)
if err != nil {
err := errors.Wrap(err, "adding comment")
out <- core.NewExportError(err, b.Id())
@@ -277,7 +285,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan
if opr.Target == createOp.Id() {
// case bug creation operation: we need to edit the Github issue
- if err := updateGithubIssueBody(client, bugGithubID, opr.Message); err != nil {
+ if err := updateGithubIssueBody(ctx, client, bugGithubID, opr.Message); err != nil {
err := errors.Wrap(err, "editing issue")
out <- core.NewExportError(err, b.Id())
return
@@ -296,7 +304,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan
panic("unexpected error: comment id not found")
}
- eid, eurl, err := editCommentGithubIssue(client, commentID, opr.Message)
+ eid, eurl, err := editCommentGithubIssue(ctx, client, commentID, opr.Message)
if err != nil {
err := errors.Wrap(err, "editing comment")
out <- core.NewExportError(err, b.Id())
@@ -312,7 +320,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan
case *bug.SetStatusOperation:
opr := op.(*bug.SetStatusOperation)
- if err := updateGithubIssueStatus(client, bugGithubID, opr.Status); err != nil {
+ if err := updateGithubIssueStatus(ctx, client, bugGithubID, opr.Status); err != nil {
err := errors.Wrap(err, "editing status")
out <- core.NewExportError(err, b.Id())
return
@@ -325,7 +333,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan
case *bug.SetTitleOperation:
opr := op.(*bug.SetTitleOperation)
- if err := updateGithubIssueTitle(client, bugGithubID, opr.Title); err != nil {
+ if err := updateGithubIssueTitle(ctx, client, bugGithubID, opr.Title); err != nil {
err := errors.Wrap(err, "editing title")
out <- core.NewExportError(err, b.Id())
return
@@ -338,7 +346,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan
case *bug.LabelChangeOperation:
opr := op.(*bug.LabelChangeOperation)
- if err := ge.updateGithubIssueLabels(client, bugGithubID, opr.Added, opr.Removed); err != nil {
+ if err := ge.updateGithubIssueLabels(ctx, client, bugGithubID, opr.Added, opr.Removed); err != nil {
err := errors.Wrap(err, "updating labels")
out <- core.NewExportError(err, b.Id())
return
@@ -370,12 +378,9 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan
}
// getRepositoryNodeID request github api v3 to get repository node id
-func getRepositoryNodeID(owner, project, token string) (string, error) {
+func getRepositoryNodeID(ctx context.Context, owner, project, token string) (string, error) {
url := fmt.Sprintf("%s/repos/%s/%s", githubV3Url, owner, project)
-
- client := &http.Client{
- Timeout: defaultTimeout,
- }
+ client := &http.Client{}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
@@ -385,6 +390,10 @@ func getRepositoryNodeID(owner, project, token string) (string, error) {
// need the token for private repositories
req.Header.Set("Authorization", fmt.Sprintf("token %s", token))
+ ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
+ defer cancel()
+ req = req.WithContext(ctx)
+
resp, err := client.Do(req)
if err != nil {
return "", err
@@ -425,7 +434,7 @@ func markOperationAsExported(b *cache.BugCache, target entity.Id, githubID, gith
}
// get label from github
-func (ge *githubExporter) getGithubLabelID(gc *githubv4.Client, label string) (string, error) {
+func (ge *githubExporter) getGithubLabelID(ctx context.Context, gc *githubv4.Client, label string) (string, error) {
q := &labelQuery{}
variables := map[string]interface{}{
"label": githubv4.String(label),
@@ -433,8 +442,7 @@ func (ge *githubExporter) getGithubLabelID(gc *githubv4.Client, label string) (s
"name": githubv4.String(ge.conf[keyProject]),
}
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+ ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
if err := gc.Query(ctx, q, variables); err != nil {
@@ -452,12 +460,9 @@ func (ge *githubExporter) getGithubLabelID(gc *githubv4.Client, label string) (s
// create a new label and return it github id
// NOTE: since createLabel mutation is still in preview mode we use github api v3 to create labels
// see https://developer.github.com/v4/mutation/createlabel/ and https://developer.github.com/v4/previews/#labels-preview
-func (ge *githubExporter) createGithubLabel(label, color string) (string, error) {
+func (ge *githubExporter) createGithubLabel(ctx context.Context, label, color string) (string, error) {
url := fmt.Sprintf("%s/repos/%s/%s/labels", githubV3Url, ge.conf[keyOwner], ge.conf[keyProject])
-
- client := &http.Client{
- Timeout: defaultTimeout,
- }
+ client := &http.Client{}
params := struct {
Name string `json:"name"`
@@ -478,6 +483,10 @@ func (ge *githubExporter) createGithubLabel(label, color string) (string, error)
return "", err
}
+ ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
+ defer cancel()
+ req = req.WithContext(ctx)
+
// need the token for private repositories
req.Header.Set("Authorization", fmt.Sprintf("token %s", ge.conf[keyToken]))
@@ -529,9 +538,9 @@ func (ge *githubExporter) createGithubLabelV4(gc *githubv4.Client, label, labelC
}
*/
-func (ge *githubExporter) getOrCreateGithubLabelID(gc *githubv4.Client, repositoryID string, label bug.Label) (string, error) {
+func (ge *githubExporter) getOrCreateGithubLabelID(ctx context.Context, gc *githubv4.Client, repositoryID string, label bug.Label) (string, error) {
// try to get label id
- labelID, err := ge.getGithubLabelID(gc, string(label))
+ labelID, err := ge.getGithubLabelID(ctx, gc, string(label))
if err == nil {
return labelID, nil
}
@@ -540,7 +549,10 @@ func (ge *githubExporter) getOrCreateGithubLabelID(gc *githubv4.Client, reposito
rgba := label.RGBA()
hexColor := fmt.Sprintf("%.2x%.2x%.2x", rgba.R, rgba.G, rgba.B)
- labelID, err = ge.createGithubLabel(string(label), hexColor)
+ ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
+ defer cancel()
+
+ labelID, err = ge.createGithubLabel(ctx, string(label), hexColor)
if err != nil {
return "", err
}
@@ -548,7 +560,7 @@ func (ge *githubExporter) getOrCreateGithubLabelID(gc *githubv4.Client, reposito
return labelID, nil
}
-func (ge *githubExporter) getLabelsIDs(gc *githubv4.Client, repositoryID string, labels []bug.Label) ([]githubv4.ID, error) {
+func (ge *githubExporter) getLabelsIDs(ctx context.Context, gc *githubv4.Client, repositoryID string, labels []bug.Label) ([]githubv4.ID, error) {
ids := make([]githubv4.ID, 0, len(labels))
var err error
@@ -557,7 +569,7 @@ func (ge *githubExporter) getLabelsIDs(gc *githubv4.Client, repositoryID string,
id, ok := ge.cachedLabels[string(label)]
if !ok {
// try to query label id
- id, err = ge.getOrCreateGithubLabelID(gc, repositoryID, label)
+ id, err = ge.getOrCreateGithubLabelID(ctx, gc, repositoryID, label)
if err != nil {
return nil, errors.Wrap(err, "get or create github label")
}
@@ -573,7 +585,7 @@ func (ge *githubExporter) getLabelsIDs(gc *githubv4.Client, repositoryID string,
}
// create a github issue and return it ID
-func createGithubIssue(gc *githubv4.Client, repositoryID, title, body string) (string, string, error) {
+func createGithubIssue(ctx context.Context, gc *githubv4.Client, repositoryID, title, body string) (string, string, error) {
m := &createIssueMutation{}
input := githubv4.CreateIssueInput{
RepositoryID: repositoryID,
@@ -581,8 +593,7 @@ func createGithubIssue(gc *githubv4.Client, repositoryID, title, body string) (s
Body: (*githubv4.String)(&body),
}
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+ ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
if err := gc.Mutate(ctx, m, input, nil); err != nil {
@@ -594,15 +605,14 @@ func createGithubIssue(gc *githubv4.Client, repositoryID, title, body string) (s
}
// add a comment to an issue and return it ID
-func addCommentGithubIssue(gc *githubv4.Client, subjectID string, body string) (string, string, error) {
+func addCommentGithubIssue(ctx context.Context, gc *githubv4.Client, subjectID string, body string) (string, string, error) {
m := &addCommentToIssueMutation{}
input := githubv4.AddCommentInput{
SubjectID: subjectID,
Body: githubv4.String(body),
}
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+ ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
if err := gc.Mutate(ctx, m, input, nil); err != nil {
@@ -613,15 +623,14 @@ func addCommentGithubIssue(gc *githubv4.Client, subjectID string, body string) (
return node.ID, node.URL, nil
}
-func editCommentGithubIssue(gc *githubv4.Client, commentID, body string) (string, string, error) {
+func editCommentGithubIssue(ctx context.Context, gc *githubv4.Client, commentID, body string) (string, string, error) {
m := &updateIssueCommentMutation{}
input := githubv4.UpdateIssueCommentInput{
ID: commentID,
Body: githubv4.String(body),
}
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+ ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
if err := gc.Mutate(ctx, m, input, nil); err != nil {
@@ -631,7 +640,7 @@ func editCommentGithubIssue(gc *githubv4.Client, commentID, body string) (string
return commentID, m.UpdateIssueComment.IssueComment.URL, nil
}
-func updateGithubIssueStatus(gc *githubv4.Client, id string, status bug.Status) error {
+func updateGithubIssueStatus(ctx context.Context, gc *githubv4.Client, id string, status bug.Status) error {
m := &updateIssueMutation{}
// set state
@@ -651,8 +660,7 @@ func updateGithubIssueStatus(gc *githubv4.Client, id string, status bug.Status)
State: &state,
}
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+ ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
if err := gc.Mutate(ctx, m, input, nil); err != nil {
@@ -662,15 +670,14 @@ func updateGithubIssueStatus(gc *githubv4.Client, id string, status bug.Status)
return nil
}
-func updateGithubIssueBody(gc *githubv4.Client, id string, body string) error {
+func updateGithubIssueBody(ctx context.Context, gc *githubv4.Client, id string, body string) error {
m := &updateIssueMutation{}
input := githubv4.UpdateIssueInput{
ID: id,
Body: (*githubv4.String)(&body),
}
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+ ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
if err := gc.Mutate(ctx, m, input, nil); err != nil {
@@ -680,15 +687,14 @@ func updateGithubIssueBody(gc *githubv4.Client, id string, body string) error {
return nil
}
-func updateGithubIssueTitle(gc *githubv4.Client, id, title string) error {
+func updateGithubIssueTitle(ctx context.Context, gc *githubv4.Client, id, title string) error {
m := &updateIssueMutation{}
input := githubv4.UpdateIssueInput{
ID: id,
Title: (*githubv4.String)(&title),
}
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+ ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
if err := gc.Mutate(ctx, m, input, nil); err != nil {
@@ -699,21 +705,16 @@ func updateGithubIssueTitle(gc *githubv4.Client, id, title string) error {
}
// update github issue labels
-func (ge *githubExporter) updateGithubIssueLabels(gc *githubv4.Client, labelableID string, added, removed []bug.Label) error {
- var errs []string
- var wg sync.WaitGroup
-
- parentCtx := context.Background()
+func (ge *githubExporter) updateGithubIssueLabels(ctx context.Context, gc *githubv4.Client, labelableID string, added, removed []bug.Label) error {
+ reqCtx, cancel := context.WithTimeout(ctx, defaultTimeout)
+ defer cancel()
+ wg, ctx := errgroup.WithContext(ctx)
if len(added) > 0 {
- wg.Add(1)
- go func() {
- defer wg.Done()
-
- addedIDs, err := ge.getLabelsIDs(gc, labelableID, added)
+ wg.Go(func() error {
+ addedIDs, err := ge.getLabelsIDs(ctx, gc, labelableID, added)
if err != nil {
- errs = append(errs, errors.Wrap(err, "getting added labels ids").Error())
- return
+ return err
}
m := &addLabelsToLabelableMutation{}
@@ -722,25 +723,19 @@ func (ge *githubExporter) updateGithubIssueLabels(gc *githubv4.Client, labelable
LabelIDs: addedIDs,
}
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
- defer cancel()
-
// add labels
- if err := gc.Mutate(ctx, m, inputAdd, nil); err != nil {
- errs = append(errs, err.Error())
+ if err := gc.Mutate(reqCtx, m, inputAdd, nil); err != nil {
+ return err
}
- }()
+ return nil
+ })
}
if len(removed) > 0 {
- wg.Add(1)
- go func() {
- defer wg.Done()
-
- removedIDs, err := ge.getLabelsIDs(gc, labelableID, removed)
+ wg.Go(func() error {
+ removedIDs, err := ge.getLabelsIDs(ctx, gc, labelableID, removed)
if err != nil {
- errs = append(errs, errors.Wrap(err, "getting added labels ids").Error())
- return
+ return err
}
m2 := &removeLabelsFromLabelableMutation{}
@@ -749,21 +744,13 @@ func (ge *githubExporter) updateGithubIssueLabels(gc *githubv4.Client, labelable
LabelIDs: removedIDs,
}
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
- defer cancel()
-
// remove label labels
- if err := gc.Mutate(ctx, m2, inputRemove, nil); err != nil {
- errs = append(errs, err.Error())
+ if err := gc.Mutate(reqCtx, m2, inputRemove, nil); err != nil {
+ return err
}
- }()
- }
-
- wg.Wait()
-
- if len(errs) == 0 {
- return nil
+ return nil
+ })
}
- return fmt.Errorf("label change error: %v", strings.Join(errs, "\n"))
+ return wg.Wait()
}
diff --git a/bridge/github/export_test.go b/bridge/github/export_test.go
index 107fe63b..a0be7cff 100644
--- a/bridge/github/export_test.go
+++ b/bridge/github/export_test.go
@@ -2,6 +2,7 @@ package github
import (
"bytes"
+ "context"
"encoding/json"
"fmt"
"math/rand"
@@ -177,13 +178,14 @@ func TestPushPull(t *testing.T) {
})
require.NoError(t, err)
+ ctx := context.Background()
start := time.Now()
// export all bugs
- events, err := exporter.ExportAll(backend, time.Time{})
+ exportEvents, err := exporter.ExportAll(ctx, backend, time.Time{})
require.NoError(t, err)
- for result := range events {
+ for result := range exportEvents {
require.NoError(t, result.Err)
}
require.NoError(t, err)
@@ -206,9 +208,13 @@ func TestPushPull(t *testing.T) {
require.NoError(t, err)
// import all exported bugs to the second backend
- err = importer.ImportAll(backendTwo, time.Time{})
+ importEvents, err := importer.ImportAll(ctx, backendTwo, time.Time{})
require.NoError(t, err)
+ for result := range importEvents {
+ require.NoError(t, result.Err)
+ }
+
require.Len(t, backendTwo.AllBugsIds(), len(tests))
for _, tt := range tests {
diff --git a/bridge/github/import.go b/bridge/github/import.go
index dcaf2d05..7c4deb50 100644
--- a/bridge/github/import.go
+++ b/bridge/github/import.go
@@ -28,11 +28,8 @@ type githubImporter struct {
// iterator
iterator *iterator
- // number of imported issues
- importedIssues int
-
- // number of imported identities
- importedIdentities int
+ // send only channel
+ out chan<- core.ImportResult
}
func (gi *githubImporter) Init(conf core.Configuration) error {
@@ -42,40 +39,49 @@ func (gi *githubImporter) Init(conf core.Configuration) error {
// ImportAll iterate over all the configured repository issues and ensure the creation of the
// missing issues / timeline items / edits / label events ...
-func (gi *githubImporter) ImportAll(repo *cache.RepoCache, since time.Time) error {
- gi.iterator = NewIterator(gi.conf[keyOwner], gi.conf[keyProject], gi.conf[keyToken], since)
-
- // Loop over all matching issues
- for gi.iterator.NextIssue() {
- issue := gi.iterator.IssueValue()
- fmt.Printf("importing issue: %v\n", issue.Title)
+func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) {
+ gi.iterator = NewIterator(ctx, 10, gi.conf[keyOwner], gi.conf[keyProject], gi.conf[keyToken], since)
+ out := make(chan core.ImportResult)
+ gi.out = out
+
+ go func() {
+ defer close(gi.out)
+
+ // Loop over all matching issues
+ for gi.iterator.NextIssue() {
+ issue := gi.iterator.IssueValue()
+ // create issue
+ b, err := gi.ensureIssue(repo, issue)
+ if err != nil {
+ err := fmt.Errorf("issue creation: %v", err)
+ out <- core.NewImportError(err, "")
+ return
+ }
- // create issue
- b, err := gi.ensureIssue(repo, issue)
- if err != nil {
- return fmt.Errorf("issue creation: %v", err)
- }
+ // loop over timeline items
+ for gi.iterator.NextTimelineItem() {
+ item := gi.iterator.TimelineItemValue()
+ if err := gi.ensureTimelineItem(repo, b, item); err != nil {
+ err := fmt.Errorf("timeline item creation: %v", err)
+ out <- core.NewImportError(err, "")
+ return
+ }
+ }
- // loop over timeline items
- for gi.iterator.NextTimelineItem() {
- if err := gi.ensureTimelineItem(repo, b, gi.iterator.TimelineItemValue()); err != nil {
- return fmt.Errorf("timeline item creation: %v", err)
+ // commit bug state
+ if err := b.CommitAsNeeded(); err != nil {
+ err = fmt.Errorf("bug commit: %v", err)
+ out <- core.NewImportError(err, "")
+ return
}
}
- // commit bug state
- if err := b.CommitAsNeeded(); err != nil {
- return fmt.Errorf("bug commit: %v", err)
+ if err := gi.iterator.Error(); err != nil && err != context.Canceled {
+ gi.out <- core.NewImportError(err, "")
}
- }
+ }()
- if err := gi.iterator.Error(); err != nil {
- fmt.Printf("import error: %v\n", err)
- return err
- }
-
- fmt.Printf("Successfully imported %d issues and %d identities from Github\n", gi.importedIssues, gi.importedIdentities)
- return nil
+ return out, nil
}
func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline) (*cache.BugCache, error) {
@@ -122,7 +128,9 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline
}
// importing a new bug
- gi.importedIssues++
+ gi.out <- core.NewImportBug(b.Id())
+ } else {
+ gi.out <- core.NewImportNothing("", "bug already imported")
}
} else {
@@ -130,6 +138,7 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline
for i, edit := range issueEdits {
if i == 0 && b != nil {
// The first edit in the github result is the issue creation itself, we already have that
+ gi.out <- core.NewImportNothing("", "bug already imported")
continue
}
@@ -159,13 +168,12 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline
}
// importing a new bug
- gi.importedIssues++
-
+ gi.out <- core.NewImportBug(b.Id())
continue
}
// other edits will be added as CommentEdit operations
- target, err := b.ResolveOperationWithMetadata(keyGithubUrl, issue.Url.String())
+ target, err := b.ResolveOperationWithMetadata(keyGithubId, parseId(issue.Id))
if err != nil {
return nil, err
}
@@ -181,16 +189,16 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline
}
func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.BugCache, item timelineItem) error {
- fmt.Printf("import event item: %s\n", item.Typename)
switch item.Typename {
case "IssueComment":
// collect all comment edits
- commentEdits := []userContentEdit{}
+ var commentEdits []userContentEdit
for gi.iterator.NextCommentEdit() {
commentEdits = append(commentEdits, gi.iterator.CommentEditValue())
}
+ // ensureTimelineComment send import events over out chanel
err := gi.ensureTimelineComment(repo, b, item.IssueComment, commentEdits)
if err != nil {
return fmt.Errorf("timeline comment creation: %v", err)
@@ -199,6 +207,12 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
case "LabeledEvent":
id := parseId(item.LabeledEvent.Id)
_, err := b.ResolveOperationWithMetadata(keyGithubId, id)
+ if err == nil {
+ reason := fmt.Sprintf("operation already imported: %v", item.Typename)
+ gi.out <- core.NewImportNothing("", reason)
+ return nil
+ }
+
if err != cache.ErrNoMatchingOp {
return err
}
@@ -206,7 +220,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
if err != nil {
return err
}
- _, err = b.ForceChangeLabelsRaw(
+ op, err := b.ForceChangeLabelsRaw(
author,
item.LabeledEvent.CreatedAt.Unix(),
[]string{
@@ -215,12 +229,21 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
nil,
map[string]string{keyGithubId: id},
)
+ if err != nil {
+ return err
+ }
- return err
+ gi.out <- core.NewImportLabelChange(op.Id())
+ return nil
case "UnlabeledEvent":
id := parseId(item.UnlabeledEvent.Id)
_, err := b.ResolveOperationWithMetadata(keyGithubId, id)
+ if err == nil {
+ reason := fmt.Sprintf("operation already imported: %v", item.Typename)
+ gi.out <- core.NewImportNothing("", reason)
+ return nil
+ }
if err != cache.ErrNoMatchingOp {
return err
}
@@ -229,7 +252,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
return err
}
- _, err = b.ForceChangeLabelsRaw(
+ op, err := b.ForceChangeLabelsRaw(
author,
item.UnlabeledEvent.CreatedAt.Unix(),
nil,
@@ -238,7 +261,12 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
},
map[string]string{keyGithubId: id},
)
- return err
+ if err != nil {
+ return err
+ }
+
+ gi.out <- core.NewImportLabelChange(op.Id())
+ return nil
case "ClosedEvent":
id := parseId(item.ClosedEvent.Id)
@@ -246,16 +274,27 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
if err != cache.ErrNoMatchingOp {
return err
}
+ if err == nil {
+ reason := fmt.Sprintf("operation already imported: %v", item.Typename)
+ gi.out <- core.NewImportNothing("", reason)
+ return nil
+ }
author, err := gi.ensurePerson(repo, item.ClosedEvent.Actor)
if err != nil {
return err
}
- _, err = b.CloseRaw(
+ op, err := b.CloseRaw(
author,
item.ClosedEvent.CreatedAt.Unix(),
map[string]string{keyGithubId: id},
)
- return err
+
+ if err != nil {
+ return err
+ }
+
+ gi.out <- core.NewImportStatusChange(op.Id())
+ return nil
case "ReopenedEvent":
id := parseId(item.ReopenedEvent.Id)
@@ -263,16 +302,27 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
if err != cache.ErrNoMatchingOp {
return err
}
+ if err == nil {
+ reason := fmt.Sprintf("operation already imported: %v", item.Typename)
+ gi.out <- core.NewImportNothing("", reason)
+ return nil
+ }
author, err := gi.ensurePerson(repo, item.ReopenedEvent.Actor)
if err != nil {
return err
}
- _, err = b.OpenRaw(
+ op, err := b.OpenRaw(
author,
item.ReopenedEvent.CreatedAt.Unix(),
map[string]string{keyGithubId: id},
)
- return err
+
+ if err != nil {
+ return err
+ }
+
+ gi.out <- core.NewImportStatusChange(op.Id())
+ return nil
case "RenamedTitleEvent":
id := parseId(item.RenamedTitleEvent.Id)
@@ -280,20 +330,31 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
if err != cache.ErrNoMatchingOp {
return err
}
+ if err == nil {
+ reason := fmt.Sprintf("operation already imported: %v", item.Typename)
+ gi.out <- core.NewImportNothing("", reason)
+ return nil
+ }
author, err := gi.ensurePerson(repo, item.RenamedTitleEvent.Actor)
if err != nil {
return err
}
- _, err = b.SetTitleRaw(
+ op, err := b.SetTitleRaw(
author,
item.RenamedTitleEvent.CreatedAt.Unix(),
string(item.RenamedTitleEvent.CurrentTitle),
map[string]string{keyGithubId: id},
)
- return err
+ if err != nil {
+ return err
+ }
+
+ gi.out <- core.NewImportTitleEdition(op.Id())
+ return nil
default:
- fmt.Printf("ignore event: %v\n", item.Typename)
+ reason := fmt.Sprintf("ignoring timeline type: %v", item.Typename)
+ gi.out <- core.NewImportNothing("", reason)
}
return nil
@@ -307,14 +368,15 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache.
}
targetOpID, err := b.ResolveOperationWithMetadata(keyGithubId, parseId(item.Id))
- if err != nil && err != cache.ErrNoMatchingOp {
+ if err == nil {
+ gi.out <- core.NewImportNothing("", "comment already imported")
+ } else if err != cache.ErrNoMatchingOp {
// real error
return err
}
// if no edits are given we create the comment
if len(edits) == 0 {
- // if comment doesn't exist
if err == cache.ErrNoMatchingOp {
cleanText, err := text.Cleanup(string(item.Body))
if err != nil {
@@ -322,7 +384,7 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache.
}
// add comment operation
- _, err = b.AddCommentRaw(
+ op, err := b.AddCommentRaw(
author,
item.CreatedAt.Unix(),
cleanText,
@@ -332,12 +394,18 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache.
keyGithubUrl: parseId(item.Url.String()),
},
)
- return err
+ if err != nil {
+ return err
+ }
+
+ gi.out <- core.NewImportComment(op.Id())
}
+
} else {
for i, edit := range edits {
if i == 0 && targetOpID != "" {
// The first edit in the github result is the comment creation itself, we already have that
+ gi.out <- core.NewImportNothing("", "comment already imported")
continue
}
@@ -370,7 +438,6 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache.
// set target for the nexr edit now that the comment is created
targetOpID = op.Id()
-
continue
}
@@ -386,7 +453,7 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache.
func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit userContentEdit) error {
_, err := b.ResolveOperationWithMetadata(keyGithubId, parseId(edit.Id))
if err == nil {
- // already imported
+ gi.out <- core.NewImportNothing(b.Id(), "edition already imported")
return nil
}
if err != cache.ErrNoMatchingOp {
@@ -394,8 +461,6 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC
return err
}
- fmt.Println("import edition")
-
editor, err := gi.ensurePerson(repo, edit.Editor)
if err != nil {
return err
@@ -404,7 +469,7 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC
switch {
case edit.DeletedAt != nil:
// comment deletion, not supported yet
- fmt.Println("comment deletion is not supported yet")
+ gi.out <- core.NewImportNothing(b.Id(), "comment deletion is not supported yet")
case edit.DeletedAt == nil:
@@ -414,7 +479,7 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC
}
// comment edition
- _, err = b.EditCommentRaw(
+ op, err := b.EditCommentRaw(
editor,
edit.CreatedAt.Unix(),
target,
@@ -427,6 +492,8 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC
if err != nil {
return err
}
+
+ gi.out <- core.NewImportCommentEdition(op.Id())
}
return nil
@@ -450,7 +517,6 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca
}
// importing a new identity
- gi.importedIdentities++
var name string
var email string
@@ -471,7 +537,7 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca
case "Bot":
}
- return repo.NewIdentityRaw(
+ i, err = repo.NewIdentityRaw(
name,
email,
string(actor.Login),
@@ -480,6 +546,13 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca
keyGithubLogin: string(actor.Login),
},
)
+
+ if err != nil {
+ return nil, err
+ }
+
+ gi.out <- core.NewImportIdentity(i.Id())
+ return i, nil
}
func (gi *githubImporter) getGhost(repo *cache.RepoCache) (*cache.IdentityCache, error) {
@@ -500,8 +573,7 @@ func (gi *githubImporter) getGhost(repo *cache.RepoCache) (*cache.IdentityCache,
gc := buildClient(gi.conf[keyToken])
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+ ctx, cancel := context.WithTimeout(gi.iterator.ctx, defaultTimeout)
defer cancel()
err = gc.Query(ctx, &q, variables)
diff --git a/bridge/github/import_test.go b/bridge/github/import_test.go
index 5ba87993..41bcb58d 100644
--- a/bridge/github/import_test.go
+++ b/bridge/github/import_test.go
@@ -1,6 +1,7 @@
package github
import (
+ "context"
"fmt"
"os"
"testing"
@@ -146,11 +147,16 @@ func Test_Importer(t *testing.T) {
})
require.NoError(t, err)
+ ctx := context.Background()
start := time.Now()
- err = importer.ImportAll(backend, time.Time{})
+ events, err := importer.ImportAll(ctx, backend, time.Time{})
require.NoError(t, err)
+ for result := range events {
+ require.NoError(t, result.Err)
+ }
+
fmt.Printf("test repository imported in %f seconds\n", time.Since(start).Seconds())
require.Len(t, backend.AllBugsIds(), len(tests))
diff --git a/bridge/github/iterator.go b/bridge/github/iterator.go
index 6d63cf42..a97ed036 100644
--- a/bridge/github/iterator.go
+++ b/bridge/github/iterator.go
@@ -46,6 +46,9 @@ type iterator struct {
// to make
capacity int
+ // shared context used for all graphql queries
+ ctx context.Context
+
// sticky error
err error
@@ -60,11 +63,12 @@ type iterator struct {
}
// NewIterator create and initialize a new iterator
-func NewIterator(owner, project, token string, since time.Time) *iterator {
+func NewIterator(ctx context.Context, capacity int, owner, project, token string, since time.Time) *iterator {
i := &iterator{
gc: buildClient(token),
since: since,
- capacity: 10,
+ capacity: capacity,
+ ctx: ctx,
timeline: timelineIterator{
index: -1,
issueEdit: indexer{-1},
@@ -147,8 +151,7 @@ func (i *iterator) Error() error {
}
func (i *iterator) queryIssue() bool {
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+ ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout)
defer cancel()
if err := i.gc.Query(ctx, &i.timeline.query, i.timeline.variables); err != nil {
@@ -167,6 +170,10 @@ func (i *iterator) queryIssue() bool {
// NextIssue try to query the next issue and return true. Only one issue is
// queried at each call.
func (i *iterator) NextIssue() bool {
+ if i.err != nil {
+ return false
+ }
+
// if $issueAfter variable is nil we can directly make the first query
if i.timeline.variables["issueAfter"] == (*githubv4.String)(nil) {
nextIssue := i.queryIssue()
@@ -175,10 +182,6 @@ func (i *iterator) NextIssue() bool {
return nextIssue
}
- if i.err != nil {
- return false
- }
-
if !i.timeline.query.Repository.Issues.PageInfo.HasNextPage {
return false
}
@@ -207,11 +210,15 @@ func (i *iterator) NextTimelineItem() bool {
return false
}
+ if i.ctx.Err() != nil {
+ return false
+ }
+
if len(i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges) == 0 {
return false
}
- if i.timeline.index < min(i.capacity, len(i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges))-1 {
+ if i.timeline.index < len(i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges)-1 {
i.timeline.index++
return true
}
@@ -225,8 +232,7 @@ func (i *iterator) NextTimelineItem() bool {
// more timelines, query them
i.timeline.variables["timelineAfter"] = i.timeline.query.Repository.Issues.Nodes[0].Timeline.PageInfo.EndCursor
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+ ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout)
defer cancel()
if err := i.gc.Query(ctx, &i.timeline.query, i.timeline.variables); err != nil {
@@ -245,8 +251,7 @@ func (i *iterator) TimelineItemValue() timelineItem {
}
func (i *iterator) queryIssueEdit() bool {
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+ ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout)
defer cancel()
if err := i.gc.Query(ctx, &i.issueEdit.query, i.issueEdit.variables); err != nil {
@@ -285,10 +290,14 @@ func (i *iterator) NextIssueEdit() bool {
return false
}
+ if i.ctx.Err() != nil {
+ return false
+ }
+
// this mean we looped over all available issue edits in the timeline.
// now we have to use i.issueEditQuery
if i.timeline.issueEdit.index == -2 {
- if i.issueEdit.index < min(i.capacity, len(i.issueEdit.query.Repository.Issues.Nodes[0].UserContentEdits.Nodes))-1 {
+ if i.issueEdit.index < len(i.issueEdit.query.Repository.Issues.Nodes[0].UserContentEdits.Nodes)-1 {
i.issueEdit.index++
return i.nextValidIssueEdit()
}
@@ -319,7 +328,7 @@ func (i *iterator) NextIssueEdit() bool {
}
// loop over them timeline comment edits
- if i.timeline.issueEdit.index < min(i.capacity, len(i.timeline.query.Repository.Issues.Nodes[0].UserContentEdits.Nodes))-1 {
+ if i.timeline.issueEdit.index < len(i.timeline.query.Repository.Issues.Nodes[0].UserContentEdits.Nodes)-1 {
i.timeline.issueEdit.index++
return i.nextValidIssueEdit()
}
@@ -347,8 +356,7 @@ func (i *iterator) IssueEditValue() userContentEdit {
}
func (i *iterator) queryCommentEdit() bool {
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+ ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout)
defer cancel()
if err := i.gc.Query(ctx, &i.commentEdit.query, i.commentEdit.variables); err != nil {
@@ -384,10 +392,14 @@ func (i *iterator) NextCommentEdit() bool {
return false
}
+ if i.ctx.Err() != nil {
+ return false
+ }
+
// same as NextIssueEdit
if i.timeline.commentEdit.index == -2 {
- if i.commentEdit.index < min(i.capacity, len(i.commentEdit.query.Repository.Issues.Nodes[0].Timeline.Nodes[0].IssueComment.UserContentEdits.Nodes))-1 {
+ if i.commentEdit.index < len(i.commentEdit.query.Repository.Issues.Nodes[0].Timeline.Nodes[0].IssueComment.UserContentEdits.Nodes)-1 {
i.commentEdit.index++
return i.nextValidCommentEdit()
}
@@ -409,7 +421,7 @@ func (i *iterator) NextCommentEdit() bool {
}
// loop over them timeline comment edits
- if i.timeline.commentEdit.index < min(i.capacity, len(i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges[i.timeline.index].Node.IssueComment.UserContentEdits.Nodes))-1 {
+ if i.timeline.commentEdit.index < len(i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges[i.timeline.index].Node.IssueComment.UserContentEdits.Nodes)-1 {
i.timeline.commentEdit.index++
return i.nextValidCommentEdit()
}
@@ -440,14 +452,6 @@ func (i *iterator) CommentEditValue() userContentEdit {
return i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges[i.timeline.index].Node.IssueComment.UserContentEdits.Nodes[i.timeline.commentEdit.index]
}
-func min(a, b int) int {
- if a > b {
- return b
- }
-
- return a
-}
-
func reverseEdits(edits []userContentEdit) {
for i, j := 0, len(edits)-1; i < j; i, j = i+1, j-1 {
edits[i], edits[j] = edits[j], edits[i]