From 5ca326af83b90531d4d0c502bb1beabbe1b48c55 Mon Sep 17 00:00:00 2001 From: Amine Hilaly Date: Tue, 13 Aug 2019 19:51:14 +0200 Subject: bridge/core: add context.Context to ImportAll and ExportAll signatures bridge/core: add ImportResult objects to stream import events bridge/core: launchpad support asynchronous import bridge/github: cancellable export and import functions bridge/gitlab: cancellable export and import functions commands: bridge pull/push gracefull kill bridge/github: fix github import bridge/github: use simple context for imports bridge/core: name parameters in interfaces github/core: Add EventError to export and import events types bridge/gitlab: add context support in gitlab requests functions bridge/gitlab: remove imported events count from importer logic bridge/github: remove imported events count from importer logic bridge/github: add context support in query and muration requets bridge/github: fix bug duplicate editions after multiple calls bridge/core: import import and export events String methods bridge/gitlab: fix error handling in note import events commands/bridge: Add statistics about imports and exports bridge/gitlab: properly handle context cancellation bridge/github: improve error handling bridge: break iterators on context cancel or timeout bridge: add context timeout support bridge: improve event formating and error handling commands: handle interrupt and switch cases bridge/github: add export mutation timeouts bridge: fix race condition bug in the github and gitlab importers bridge/github: improve context error handling --- bridge/github/export.go | 140 +++++++++++++++--------------- bridge/github/export_test.go | 12 ++- bridge/github/import.go | 199 +++++++++++++++++++++++++++++-------------- bridge/github/import_test.go | 8 +- bridge/github/iterator.go | 58 +++++++------ 5 files changed, 254 insertions(+), 163 deletions(-) (limited to 'bridge/github') diff --git a/bridge/github/export.go b/bridge/github/export.go index 34c88310..3aecdce0 100644 --- a/bridge/github/export.go +++ b/bridge/github/export.go @@ -79,7 +79,7 @@ func (ge *githubExporter) getIdentityClient(id entity.Id) (*githubv4.Client, err } // ExportAll export all event made by the current user to Github -func (ge *githubExporter) ExportAll(repo *cache.RepoCache, since time.Time) (<-chan core.ExportResult, error) { +func (ge *githubExporter) ExportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ExportResult, error) { out := make(chan core.ExportResult) user, err := repo.GetUserIdentity() @@ -91,6 +91,7 @@ func (ge *githubExporter) ExportAll(repo *cache.RepoCache, since time.Time) (<-c // get repository node id ge.repositoryID, err = getRepositoryNodeID( + ctx, ge.conf[keyOwner], ge.conf[keyProject], ge.conf[keyToken], @@ -117,20 +118,28 @@ func (ge *githubExporter) ExportAll(repo *cache.RepoCache, since time.Time) (<-c return } - snapshot := b.Snapshot() + select { - // ignore issues created before since date - // TODO: compare the Lamport time instead of using the unix time - if snapshot.CreatedAt.Before(since) { - out <- core.NewExportNothing(b.Id(), "bug created before the since date") - continue - } + case <-ctx.Done(): + // stop iterating if context cancel function is called + return - if snapshot.HasAnyActor(allIdentitiesIds...) { - // try to export the bug and it associated events - ge.exportBug(b, since, out) - } else { - out <- core.NewExportNothing(id, "not an actor") + default: + snapshot := b.Snapshot() + + // ignore issues created before since date + // TODO: compare the Lamport time instead of using the unix time + if snapshot.CreatedAt.Before(since) { + out <- core.NewExportNothing(b.Id(), "bug created before the since date") + continue + } + + if snapshot.HasAnyActor(allIdentitiesIds...) { + // try to export the bug and it associated events + ge.exportBug(ctx, b, since, out) + } else { + out <- core.NewExportNothing(id, "not an actor") + } } } }() @@ -139,7 +148,7 @@ func (ge *githubExporter) ExportAll(repo *cache.RepoCache, since time.Time) (<-c } // exportBug publish bugs and related events -func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan<- core.ExportResult) { +func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, since time.Time, out chan<- core.ExportResult) { snapshot := b.Snapshot() var bugGithubID string @@ -199,7 +208,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan } // create bug - id, url, err := createGithubIssue(client, ge.repositoryID, createOp.Title, createOp.Message) + id, url, err := createGithubIssue(ctx, client, ge.repositoryID, createOp.Title, createOp.Message) if err != nil { err := errors.Wrap(err, "exporting github issue") out <- core.NewExportError(err, b.Id()) @@ -257,7 +266,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan opr := op.(*bug.AddCommentOperation) // send operation to github - id, url, err = addCommentGithubIssue(client, bugGithubID, opr.Message) + id, url, err = addCommentGithubIssue(ctx, client, bugGithubID, opr.Message) if err != nil { err := errors.Wrap(err, "adding comment") out <- core.NewExportError(err, b.Id()) @@ -277,7 +286,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan if opr.Target == createOp.Id() { // case bug creation operation: we need to edit the Github issue - if err := updateGithubIssueBody(client, bugGithubID, opr.Message); err != nil { + if err := updateGithubIssueBody(ctx, client, bugGithubID, opr.Message); err != nil { err := errors.Wrap(err, "editing issue") out <- core.NewExportError(err, b.Id()) return @@ -296,7 +305,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan panic("unexpected error: comment id not found") } - eid, eurl, err := editCommentGithubIssue(client, commentID, opr.Message) + eid, eurl, err := editCommentGithubIssue(ctx, client, commentID, opr.Message) if err != nil { err := errors.Wrap(err, "editing comment") out <- core.NewExportError(err, b.Id()) @@ -312,7 +321,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan case *bug.SetStatusOperation: opr := op.(*bug.SetStatusOperation) - if err := updateGithubIssueStatus(client, bugGithubID, opr.Status); err != nil { + if err := updateGithubIssueStatus(ctx, client, bugGithubID, opr.Status); err != nil { err := errors.Wrap(err, "editing status") out <- core.NewExportError(err, b.Id()) return @@ -325,7 +334,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan case *bug.SetTitleOperation: opr := op.(*bug.SetTitleOperation) - if err := updateGithubIssueTitle(client, bugGithubID, opr.Title); err != nil { + if err := updateGithubIssueTitle(ctx, client, bugGithubID, opr.Title); err != nil { err := errors.Wrap(err, "editing title") out <- core.NewExportError(err, b.Id()) return @@ -338,7 +347,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan case *bug.LabelChangeOperation: opr := op.(*bug.LabelChangeOperation) - if err := ge.updateGithubIssueLabels(client, bugGithubID, opr.Added, opr.Removed); err != nil { + if err := ge.updateGithubIssueLabels(ctx, client, bugGithubID, opr.Added, opr.Removed); err != nil { err := errors.Wrap(err, "updating labels") out <- core.NewExportError(err, b.Id()) return @@ -370,12 +379,9 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan } // getRepositoryNodeID request github api v3 to get repository node id -func getRepositoryNodeID(owner, project, token string) (string, error) { +func getRepositoryNodeID(ctx context.Context, owner, project, token string) (string, error) { url := fmt.Sprintf("%s/repos/%s/%s", githubV3Url, owner, project) - - client := &http.Client{ - Timeout: defaultTimeout, - } + client := &http.Client{} req, err := http.NewRequest("GET", url, nil) if err != nil { @@ -385,6 +391,10 @@ func getRepositoryNodeID(owner, project, token string) (string, error) { // need the token for private repositories req.Header.Set("Authorization", fmt.Sprintf("token %s", token)) + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + req = req.WithContext(ctx) + resp, err := client.Do(req) if err != nil { return "", err @@ -425,7 +435,7 @@ func markOperationAsExported(b *cache.BugCache, target entity.Id, githubID, gith } // get label from github -func (ge *githubExporter) getGithubLabelID(gc *githubv4.Client, label string) (string, error) { +func (ge *githubExporter) getGithubLabelID(ctx context.Context, gc *githubv4.Client, label string) (string, error) { q := &labelQuery{} variables := map[string]interface{}{ "label": githubv4.String(label), @@ -433,8 +443,7 @@ func (ge *githubExporter) getGithubLabelID(gc *githubv4.Client, label string) (s "name": githubv4.String(ge.conf[keyProject]), } - parentCtx := context.Background() - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() if err := gc.Query(ctx, q, variables); err != nil { @@ -452,12 +461,9 @@ func (ge *githubExporter) getGithubLabelID(gc *githubv4.Client, label string) (s // create a new label and return it github id // NOTE: since createLabel mutation is still in preview mode we use github api v3 to create labels // see https://developer.github.com/v4/mutation/createlabel/ and https://developer.github.com/v4/previews/#labels-preview -func (ge *githubExporter) createGithubLabel(label, color string) (string, error) { +func (ge *githubExporter) createGithubLabel(ctx context.Context, label, color string) (string, error) { url := fmt.Sprintf("%s/repos/%s/%s/labels", githubV3Url, ge.conf[keyOwner], ge.conf[keyProject]) - - client := &http.Client{ - Timeout: defaultTimeout, - } + client := &http.Client{} params := struct { Name string `json:"name"` @@ -478,6 +484,10 @@ func (ge *githubExporter) createGithubLabel(label, color string) (string, error) return "", err } + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + req = req.WithContext(ctx) + // need the token for private repositories req.Header.Set("Authorization", fmt.Sprintf("token %s", ge.conf[keyToken])) @@ -529,9 +539,9 @@ func (ge *githubExporter) createGithubLabelV4(gc *githubv4.Client, label, labelC } */ -func (ge *githubExporter) getOrCreateGithubLabelID(gc *githubv4.Client, repositoryID string, label bug.Label) (string, error) { +func (ge *githubExporter) getOrCreateGithubLabelID(ctx context.Context, gc *githubv4.Client, repositoryID string, label bug.Label) (string, error) { // try to get label id - labelID, err := ge.getGithubLabelID(gc, string(label)) + labelID, err := ge.getGithubLabelID(ctx, gc, string(label)) if err == nil { return labelID, nil } @@ -540,7 +550,10 @@ func (ge *githubExporter) getOrCreateGithubLabelID(gc *githubv4.Client, reposito rgba := label.RGBA() hexColor := fmt.Sprintf("%.2x%.2x%.2x", rgba.R, rgba.G, rgba.B) - labelID, err = ge.createGithubLabel(string(label), hexColor) + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + + labelID, err = ge.createGithubLabel(ctx, string(label), hexColor) if err != nil { return "", err } @@ -548,7 +561,7 @@ func (ge *githubExporter) getOrCreateGithubLabelID(gc *githubv4.Client, reposito return labelID, nil } -func (ge *githubExporter) getLabelsIDs(gc *githubv4.Client, repositoryID string, labels []bug.Label) ([]githubv4.ID, error) { +func (ge *githubExporter) getLabelsIDs(ctx context.Context, gc *githubv4.Client, repositoryID string, labels []bug.Label) ([]githubv4.ID, error) { ids := make([]githubv4.ID, 0, len(labels)) var err error @@ -557,7 +570,7 @@ func (ge *githubExporter) getLabelsIDs(gc *githubv4.Client, repositoryID string, id, ok := ge.cachedLabels[string(label)] if !ok { // try to query label id - id, err = ge.getOrCreateGithubLabelID(gc, repositoryID, label) + id, err = ge.getOrCreateGithubLabelID(ctx, gc, repositoryID, label) if err != nil { return nil, errors.Wrap(err, "get or create github label") } @@ -573,7 +586,7 @@ func (ge *githubExporter) getLabelsIDs(gc *githubv4.Client, repositoryID string, } // create a github issue and return it ID -func createGithubIssue(gc *githubv4.Client, repositoryID, title, body string) (string, string, error) { +func createGithubIssue(ctx context.Context, gc *githubv4.Client, repositoryID, title, body string) (string, string, error) { m := &createIssueMutation{} input := githubv4.CreateIssueInput{ RepositoryID: repositoryID, @@ -581,8 +594,7 @@ func createGithubIssue(gc *githubv4.Client, repositoryID, title, body string) (s Body: (*githubv4.String)(&body), } - parentCtx := context.Background() - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() if err := gc.Mutate(ctx, m, input, nil); err != nil { @@ -594,15 +606,14 @@ func createGithubIssue(gc *githubv4.Client, repositoryID, title, body string) (s } // add a comment to an issue and return it ID -func addCommentGithubIssue(gc *githubv4.Client, subjectID string, body string) (string, string, error) { +func addCommentGithubIssue(ctx context.Context, gc *githubv4.Client, subjectID string, body string) (string, string, error) { m := &addCommentToIssueMutation{} input := githubv4.AddCommentInput{ SubjectID: subjectID, Body: githubv4.String(body), } - parentCtx := context.Background() - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() if err := gc.Mutate(ctx, m, input, nil); err != nil { @@ -613,15 +624,14 @@ func addCommentGithubIssue(gc *githubv4.Client, subjectID string, body string) ( return node.ID, node.URL, nil } -func editCommentGithubIssue(gc *githubv4.Client, commentID, body string) (string, string, error) { +func editCommentGithubIssue(ctx context.Context, gc *githubv4.Client, commentID, body string) (string, string, error) { m := &updateIssueCommentMutation{} input := githubv4.UpdateIssueCommentInput{ ID: commentID, Body: githubv4.String(body), } - parentCtx := context.Background() - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() if err := gc.Mutate(ctx, m, input, nil); err != nil { @@ -631,7 +641,7 @@ func editCommentGithubIssue(gc *githubv4.Client, commentID, body string) (string return commentID, m.UpdateIssueComment.IssueComment.URL, nil } -func updateGithubIssueStatus(gc *githubv4.Client, id string, status bug.Status) error { +func updateGithubIssueStatus(ctx context.Context, gc *githubv4.Client, id string, status bug.Status) error { m := &updateIssueMutation{} // set state @@ -651,8 +661,7 @@ func updateGithubIssueStatus(gc *githubv4.Client, id string, status bug.Status) State: &state, } - parentCtx := context.Background() - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() if err := gc.Mutate(ctx, m, input, nil); err != nil { @@ -662,15 +671,14 @@ func updateGithubIssueStatus(gc *githubv4.Client, id string, status bug.Status) return nil } -func updateGithubIssueBody(gc *githubv4.Client, id string, body string) error { +func updateGithubIssueBody(ctx context.Context, gc *githubv4.Client, id string, body string) error { m := &updateIssueMutation{} input := githubv4.UpdateIssueInput{ ID: id, Body: (*githubv4.String)(&body), } - parentCtx := context.Background() - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() if err := gc.Mutate(ctx, m, input, nil); err != nil { @@ -680,15 +688,14 @@ func updateGithubIssueBody(gc *githubv4.Client, id string, body string) error { return nil } -func updateGithubIssueTitle(gc *githubv4.Client, id, title string) error { +func updateGithubIssueTitle(ctx context.Context, gc *githubv4.Client, id, title string) error { m := &updateIssueMutation{} input := githubv4.UpdateIssueInput{ ID: id, Title: (*githubv4.String)(&title), } - parentCtx := context.Background() - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() if err := gc.Mutate(ctx, m, input, nil); err != nil { @@ -699,18 +706,19 @@ func updateGithubIssueTitle(gc *githubv4.Client, id, title string) error { } // update github issue labels -func (ge *githubExporter) updateGithubIssueLabels(gc *githubv4.Client, labelableID string, added, removed []bug.Label) error { +func (ge *githubExporter) updateGithubIssueLabels(ctx context.Context, gc *githubv4.Client, labelableID string, added, removed []bug.Label) error { var errs []string var wg sync.WaitGroup - parentCtx := context.Background() + reqCtx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() if len(added) > 0 { wg.Add(1) go func() { defer wg.Done() - addedIDs, err := ge.getLabelsIDs(gc, labelableID, added) + addedIDs, err := ge.getLabelsIDs(ctx, gc, labelableID, added) if err != nil { errs = append(errs, errors.Wrap(err, "getting added labels ids").Error()) return @@ -722,11 +730,8 @@ func (ge *githubExporter) updateGithubIssueLabels(gc *githubv4.Client, labelable LabelIDs: addedIDs, } - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) - defer cancel() - // add labels - if err := gc.Mutate(ctx, m, inputAdd, nil); err != nil { + if err := gc.Mutate(reqCtx, m, inputAdd, nil); err != nil { errs = append(errs, err.Error()) } }() @@ -737,7 +742,7 @@ func (ge *githubExporter) updateGithubIssueLabels(gc *githubv4.Client, labelable go func() { defer wg.Done() - removedIDs, err := ge.getLabelsIDs(gc, labelableID, removed) + removedIDs, err := ge.getLabelsIDs(ctx, gc, labelableID, removed) if err != nil { errs = append(errs, errors.Wrap(err, "getting added labels ids").Error()) return @@ -749,11 +754,8 @@ func (ge *githubExporter) updateGithubIssueLabels(gc *githubv4.Client, labelable LabelIDs: removedIDs, } - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) - defer cancel() - // remove label labels - if err := gc.Mutate(ctx, m2, inputRemove, nil); err != nil { + if err := gc.Mutate(reqCtx, m2, inputRemove, nil); err != nil { errs = append(errs, err.Error()) } }() diff --git a/bridge/github/export_test.go b/bridge/github/export_test.go index 107fe63b..a0be7cff 100644 --- a/bridge/github/export_test.go +++ b/bridge/github/export_test.go @@ -2,6 +2,7 @@ package github import ( "bytes" + "context" "encoding/json" "fmt" "math/rand" @@ -177,13 +178,14 @@ func TestPushPull(t *testing.T) { }) require.NoError(t, err) + ctx := context.Background() start := time.Now() // export all bugs - events, err := exporter.ExportAll(backend, time.Time{}) + exportEvents, err := exporter.ExportAll(ctx, backend, time.Time{}) require.NoError(t, err) - for result := range events { + for result := range exportEvents { require.NoError(t, result.Err) } require.NoError(t, err) @@ -206,9 +208,13 @@ func TestPushPull(t *testing.T) { require.NoError(t, err) // import all exported bugs to the second backend - err = importer.ImportAll(backendTwo, time.Time{}) + importEvents, err := importer.ImportAll(ctx, backendTwo, time.Time{}) require.NoError(t, err) + for result := range importEvents { + require.NoError(t, result.Err) + } + require.Len(t, backendTwo.AllBugsIds(), len(tests)) for _, tt := range tests { diff --git a/bridge/github/import.go b/bridge/github/import.go index dcaf2d05..9fef9cb5 100644 --- a/bridge/github/import.go +++ b/bridge/github/import.go @@ -28,11 +28,8 @@ type githubImporter struct { // iterator iterator *iterator - // number of imported issues - importedIssues int - - // number of imported identities - importedIdentities int + // send only channel + out chan<- core.ImportResult } func (gi *githubImporter) Init(conf core.Configuration) error { @@ -42,40 +39,49 @@ func (gi *githubImporter) Init(conf core.Configuration) error { // ImportAll iterate over all the configured repository issues and ensure the creation of the // missing issues / timeline items / edits / label events ... -func (gi *githubImporter) ImportAll(repo *cache.RepoCache, since time.Time) error { - gi.iterator = NewIterator(gi.conf[keyOwner], gi.conf[keyProject], gi.conf[keyToken], since) - - // Loop over all matching issues - for gi.iterator.NextIssue() { - issue := gi.iterator.IssueValue() - fmt.Printf("importing issue: %v\n", issue.Title) +func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) { + gi.iterator = NewIterator(ctx, 10, gi.conf[keyOwner], gi.conf[keyProject], gi.conf[keyToken], since) + out := make(chan core.ImportResult) + gi.out = out + + go func() { + defer close(gi.out) + + // Loop over all matching issues + for gi.iterator.NextIssue() { + issue := gi.iterator.IssueValue() + // create issue + b, err := gi.ensureIssue(repo, issue) + if err != nil { + err := fmt.Errorf("issue creation: %v", err) + out <- core.NewImportError(err, "") + return + } - // create issue - b, err := gi.ensureIssue(repo, issue) - if err != nil { - return fmt.Errorf("issue creation: %v", err) - } + // loop over timeline items + for gi.iterator.NextTimelineItem() { + item := gi.iterator.TimelineItemValue() + if err := gi.ensureTimelineItem(repo, b, item); err != nil { + err := fmt.Errorf("timeline item creation: %v", err) + out <- core.NewImportError(err, "") + return + } + } - // loop over timeline items - for gi.iterator.NextTimelineItem() { - if err := gi.ensureTimelineItem(repo, b, gi.iterator.TimelineItemValue()); err != nil { - return fmt.Errorf("timeline item creation: %v", err) + // commit bug state + if err := b.CommitAsNeeded(); err != nil { + err = fmt.Errorf("bug commit: %v", err) + out <- core.NewImportError(err, "") + return } } - // commit bug state - if err := b.CommitAsNeeded(); err != nil { - return fmt.Errorf("bug commit: %v", err) + if err := gi.iterator.Error(); err != nil && err != context.Canceled { + gi.out <- core.NewImportError(err, "") } - } + }() - if err := gi.iterator.Error(); err != nil { - fmt.Printf("import error: %v\n", err) - return err - } - - fmt.Printf("Successfully imported %d issues and %d identities from Github\n", gi.importedIssues, gi.importedIdentities) - return nil + return out, nil } func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline) (*cache.BugCache, error) { @@ -122,7 +128,9 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline } // importing a new bug - gi.importedIssues++ + gi.out <- core.NewImportBug(b.Id()) + } else { + gi.out <- core.NewImportNothing("", "bug already imported") } } else { @@ -130,6 +138,7 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline for i, edit := range issueEdits { if i == 0 && b != nil { // The first edit in the github result is the issue creation itself, we already have that + gi.out <- core.NewImportNothing("", "bug already imported") continue } @@ -159,13 +168,12 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline } // importing a new bug - gi.importedIssues++ - + gi.out <- core.NewImportBug(b.Id()) continue } // other edits will be added as CommentEdit operations - target, err := b.ResolveOperationWithMetadata(keyGithubUrl, issue.Url.String()) + target, err := b.ResolveOperationWithMetadata(keyGithubId, parseId(issue.Id)) if err != nil { return nil, err } @@ -181,16 +189,16 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline } func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.BugCache, item timelineItem) error { - fmt.Printf("import event item: %s\n", item.Typename) switch item.Typename { case "IssueComment": // collect all comment edits - commentEdits := []userContentEdit{} + var commentEdits []userContentEdit for gi.iterator.NextCommentEdit() { commentEdits = append(commentEdits, gi.iterator.CommentEditValue()) } + // ensureTimelineComment send import events over out chanel err := gi.ensureTimelineComment(repo, b, item.IssueComment, commentEdits) if err != nil { return fmt.Errorf("timeline comment creation: %v", err) @@ -199,6 +207,12 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug case "LabeledEvent": id := parseId(item.LabeledEvent.Id) _, err := b.ResolveOperationWithMetadata(keyGithubId, id) + if err == nil { + reason := fmt.Sprintf("operation already imported: %v", item.Typename) + gi.out <- core.NewImportNothing("", reason) + return nil + } + if err != cache.ErrNoMatchingOp { return err } @@ -206,7 +220,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug if err != nil { return err } - _, err = b.ForceChangeLabelsRaw( + op, err := b.ForceChangeLabelsRaw( author, item.LabeledEvent.CreatedAt.Unix(), []string{ @@ -215,12 +229,21 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug nil, map[string]string{keyGithubId: id}, ) + if err != nil { + return err + } - return err + gi.out <- core.NewImportLabelChange(op.Id()) + return nil case "UnlabeledEvent": id := parseId(item.UnlabeledEvent.Id) _, err := b.ResolveOperationWithMetadata(keyGithubId, id) + if err == nil { + reason := fmt.Sprintf("operation already imported: %v", item.Typename) + gi.out <- core.NewImportNothing("", reason) + return nil + } if err != cache.ErrNoMatchingOp { return err } @@ -229,7 +252,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug return err } - _, err = b.ForceChangeLabelsRaw( + op, err := b.ForceChangeLabelsRaw( author, item.UnlabeledEvent.CreatedAt.Unix(), nil, @@ -238,7 +261,12 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug }, map[string]string{keyGithubId: id}, ) - return err + if err != nil { + return err + } + + gi.out <- core.NewImportLabelChange(op.Id()) + return nil case "ClosedEvent": id := parseId(item.ClosedEvent.Id) @@ -246,16 +274,27 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug if err != cache.ErrNoMatchingOp { return err } + if err == nil { + reason := fmt.Sprintf("operation already imported: %v", item.Typename) + gi.out <- core.NewImportNothing("", reason) + return nil + } author, err := gi.ensurePerson(repo, item.ClosedEvent.Actor) if err != nil { return err } - _, err = b.CloseRaw( + op, err := b.CloseRaw( author, item.ClosedEvent.CreatedAt.Unix(), map[string]string{keyGithubId: id}, ) - return err + + if err != nil { + return err + } + + gi.out <- core.NewImportStatusChange(op.Id()) + return nil case "ReopenedEvent": id := parseId(item.ReopenedEvent.Id) @@ -263,16 +302,27 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug if err != cache.ErrNoMatchingOp { return err } + if err == nil { + reason := fmt.Sprintf("operation already imported: %v", item.Typename) + gi.out <- core.NewImportNothing("", reason) + return nil + } author, err := gi.ensurePerson(repo, item.ReopenedEvent.Actor) if err != nil { return err } - _, err = b.OpenRaw( + op, err := b.OpenRaw( author, item.ReopenedEvent.CreatedAt.Unix(), map[string]string{keyGithubId: id}, ) - return err + + if err != nil { + return err + } + + gi.out <- core.NewImportStatusChange(op.Id()) + return nil case "RenamedTitleEvent": id := parseId(item.RenamedTitleEvent.Id) @@ -280,20 +330,31 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug if err != cache.ErrNoMatchingOp { return err } + if err == nil { + reason := fmt.Sprintf("operation already imported: %v", item.Typename) + gi.out <- core.NewImportNothing("", reason) + return nil + } author, err := gi.ensurePerson(repo, item.RenamedTitleEvent.Actor) if err != nil { return err } - _, err = b.SetTitleRaw( + op, err := b.SetTitleRaw( author, item.RenamedTitleEvent.CreatedAt.Unix(), string(item.RenamedTitleEvent.CurrentTitle), map[string]string{keyGithubId: id}, ) - return err + if err != nil { + return err + } + + gi.out <- core.NewImportTitleEdition(op.Id()) + return nil default: - fmt.Printf("ignore event: %v\n", item.Typename) + reason := fmt.Sprintf("ignoring timeline type: %v", item.Typename) + gi.out <- core.NewImportNothing("", reason) } return nil @@ -307,14 +368,16 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache. } targetOpID, err := b.ResolveOperationWithMetadata(keyGithubId, parseId(item.Id)) - if err != nil && err != cache.ErrNoMatchingOp { + if err == nil { + reason := fmt.Sprintf("comment already imported") + gi.out <- core.NewImportNothing("", reason) + } else if err != cache.ErrNoMatchingOp { // real error return err } // if no edits are given we create the comment if len(edits) == 0 { - // if comment doesn't exist if err == cache.ErrNoMatchingOp { cleanText, err := text.Cleanup(string(item.Body)) if err != nil { @@ -322,7 +385,7 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache. } // add comment operation - _, err = b.AddCommentRaw( + op, err := b.AddCommentRaw( author, item.CreatedAt.Unix(), cleanText, @@ -332,12 +395,18 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache. keyGithubUrl: parseId(item.Url.String()), }, ) - return err + if err != nil { + return err + } + + gi.out <- core.NewImportComment(op.Id()) } + } else { for i, edit := range edits { if i == 0 && targetOpID != "" { // The first edit in the github result is the comment creation itself, we already have that + gi.out <- core.NewImportNothing("", "comment already imported") continue } @@ -370,7 +439,6 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache. // set target for the nexr edit now that the comment is created targetOpID = op.Id() - continue } @@ -386,7 +454,7 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache. func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit userContentEdit) error { _, err := b.ResolveOperationWithMetadata(keyGithubId, parseId(edit.Id)) if err == nil { - // already imported + gi.out <- core.NewImportNothing(b.Id(), "edition already imported") return nil } if err != cache.ErrNoMatchingOp { @@ -394,8 +462,6 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC return err } - fmt.Println("import edition") - editor, err := gi.ensurePerson(repo, edit.Editor) if err != nil { return err @@ -404,7 +470,7 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC switch { case edit.DeletedAt != nil: // comment deletion, not supported yet - fmt.Println("comment deletion is not supported yet") + gi.out <- core.NewImportNothing(b.Id(), "comment deletion is not supported yet") case edit.DeletedAt == nil: @@ -414,7 +480,7 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC } // comment edition - _, err = b.EditCommentRaw( + op, err := b.EditCommentRaw( editor, edit.CreatedAt.Unix(), target, @@ -427,6 +493,8 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC if err != nil { return err } + + gi.out <- core.NewImportCommentEdition(op.Id()) } return nil @@ -450,7 +518,6 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca } // importing a new identity - gi.importedIdentities++ var name string var email string @@ -471,7 +538,7 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca case "Bot": } - return repo.NewIdentityRaw( + i, err = repo.NewIdentityRaw( name, email, string(actor.Login), @@ -480,6 +547,13 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca keyGithubLogin: string(actor.Login), }, ) + + if err != nil { + return nil, err + } + + gi.out <- core.NewImportIdentity(i.Id()) + return i, nil } func (gi *githubImporter) getGhost(repo *cache.RepoCache) (*cache.IdentityCache, error) { @@ -500,8 +574,7 @@ func (gi *githubImporter) getGhost(repo *cache.RepoCache) (*cache.IdentityCache, gc := buildClient(gi.conf[keyToken]) - parentCtx := context.Background() - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) + ctx, cancel := context.WithTimeout(gi.iterator.ctx, defaultTimeout) defer cancel() err = gc.Query(ctx, &q, variables) diff --git a/bridge/github/import_test.go b/bridge/github/import_test.go index 5ba87993..41bcb58d 100644 --- a/bridge/github/import_test.go +++ b/bridge/github/import_test.go @@ -1,6 +1,7 @@ package github import ( + "context" "fmt" "os" "testing" @@ -146,11 +147,16 @@ func Test_Importer(t *testing.T) { }) require.NoError(t, err) + ctx := context.Background() start := time.Now() - err = importer.ImportAll(backend, time.Time{}) + events, err := importer.ImportAll(ctx, backend, time.Time{}) require.NoError(t, err) + for result := range events { + require.NoError(t, result.Err) + } + fmt.Printf("test repository imported in %f seconds\n", time.Since(start).Seconds()) require.Len(t, backend.AllBugsIds(), len(tests)) diff --git a/bridge/github/iterator.go b/bridge/github/iterator.go index 6d63cf42..a97ed036 100644 --- a/bridge/github/iterator.go +++ b/bridge/github/iterator.go @@ -46,6 +46,9 @@ type iterator struct { // to make capacity int + // shared context used for all graphql queries + ctx context.Context + // sticky error err error @@ -60,11 +63,12 @@ type iterator struct { } // NewIterator create and initialize a new iterator -func NewIterator(owner, project, token string, since time.Time) *iterator { +func NewIterator(ctx context.Context, capacity int, owner, project, token string, since time.Time) *iterator { i := &iterator{ gc: buildClient(token), since: since, - capacity: 10, + capacity: capacity, + ctx: ctx, timeline: timelineIterator{ index: -1, issueEdit: indexer{-1}, @@ -147,8 +151,7 @@ func (i *iterator) Error() error { } func (i *iterator) queryIssue() bool { - parentCtx := context.Background() - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) + ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout) defer cancel() if err := i.gc.Query(ctx, &i.timeline.query, i.timeline.variables); err != nil { @@ -167,6 +170,10 @@ func (i *iterator) queryIssue() bool { // NextIssue try to query the next issue and return true. Only one issue is // queried at each call. func (i *iterator) NextIssue() bool { + if i.err != nil { + return false + } + // if $issueAfter variable is nil we can directly make the first query if i.timeline.variables["issueAfter"] == (*githubv4.String)(nil) { nextIssue := i.queryIssue() @@ -175,10 +182,6 @@ func (i *iterator) NextIssue() bool { return nextIssue } - if i.err != nil { - return false - } - if !i.timeline.query.Repository.Issues.PageInfo.HasNextPage { return false } @@ -207,11 +210,15 @@ func (i *iterator) NextTimelineItem() bool { return false } + if i.ctx.Err() != nil { + return false + } + if len(i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges) == 0 { return false } - if i.timeline.index < min(i.capacity, len(i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges))-1 { + if i.timeline.index < len(i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges)-1 { i.timeline.index++ return true } @@ -225,8 +232,7 @@ func (i *iterator) NextTimelineItem() bool { // more timelines, query them i.timeline.variables["timelineAfter"] = i.timeline.query.Repository.Issues.Nodes[0].Timeline.PageInfo.EndCursor - parentCtx := context.Background() - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) + ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout) defer cancel() if err := i.gc.Query(ctx, &i.timeline.query, i.timeline.variables); err != nil { @@ -245,8 +251,7 @@ func (i *iterator) TimelineItemValue() timelineItem { } func (i *iterator) queryIssueEdit() bool { - parentCtx := context.Background() - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) + ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout) defer cancel() if err := i.gc.Query(ctx, &i.issueEdit.query, i.issueEdit.variables); err != nil { @@ -285,10 +290,14 @@ func (i *iterator) NextIssueEdit() bool { return false } + if i.ctx.Err() != nil { + return false + } + // this mean we looped over all available issue edits in the timeline. // now we have to use i.issueEditQuery if i.timeline.issueEdit.index == -2 { - if i.issueEdit.index < min(i.capacity, len(i.issueEdit.query.Repository.Issues.Nodes[0].UserContentEdits.Nodes))-1 { + if i.issueEdit.index < len(i.issueEdit.query.Repository.Issues.Nodes[0].UserContentEdits.Nodes)-1 { i.issueEdit.index++ return i.nextValidIssueEdit() } @@ -319,7 +328,7 @@ func (i *iterator) NextIssueEdit() bool { } // loop over them timeline comment edits - if i.timeline.issueEdit.index < min(i.capacity, len(i.timeline.query.Repository.Issues.Nodes[0].UserContentEdits.Nodes))-1 { + if i.timeline.issueEdit.index < len(i.timeline.query.Repository.Issues.Nodes[0].UserContentEdits.Nodes)-1 { i.timeline.issueEdit.index++ return i.nextValidIssueEdit() } @@ -347,8 +356,7 @@ func (i *iterator) IssueEditValue() userContentEdit { } func (i *iterator) queryCommentEdit() bool { - parentCtx := context.Background() - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) + ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout) defer cancel() if err := i.gc.Query(ctx, &i.commentEdit.query, i.commentEdit.variables); err != nil { @@ -384,10 +392,14 @@ func (i *iterator) NextCommentEdit() bool { return false } + if i.ctx.Err() != nil { + return false + } + // same as NextIssueEdit if i.timeline.commentEdit.index == -2 { - if i.commentEdit.index < min(i.capacity, len(i.commentEdit.query.Repository.Issues.Nodes[0].Timeline.Nodes[0].IssueComment.UserContentEdits.Nodes))-1 { + if i.commentEdit.index < len(i.commentEdit.query.Repository.Issues.Nodes[0].Timeline.Nodes[0].IssueComment.UserContentEdits.Nodes)-1 { i.commentEdit.index++ return i.nextValidCommentEdit() } @@ -409,7 +421,7 @@ func (i *iterator) NextCommentEdit() bool { } // loop over them timeline comment edits - if i.timeline.commentEdit.index < min(i.capacity, len(i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges[i.timeline.index].Node.IssueComment.UserContentEdits.Nodes))-1 { + if i.timeline.commentEdit.index < len(i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges[i.timeline.index].Node.IssueComment.UserContentEdits.Nodes)-1 { i.timeline.commentEdit.index++ return i.nextValidCommentEdit() } @@ -440,14 +452,6 @@ func (i *iterator) CommentEditValue() userContentEdit { return i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges[i.timeline.index].Node.IssueComment.UserContentEdits.Nodes[i.timeline.commentEdit.index] } -func min(a, b int) int { - if a > b { - return b - } - - return a -} - func reverseEdits(edits []userContentEdit) { for i, j := 0, len(edits)-1; i < j; i, j = i+1, j-1 { edits[i], edits[j] = edits[j], edits[i] -- cgit From 501a9310807d4c53cf9e28b84d88aebd64a04ead Mon Sep 17 00:00:00 2001 From: Amine Hilaly Date: Sat, 17 Aug 2019 23:45:31 +0200 Subject: bridge/gthub: use errgroup.Group instead of sync.WaitGroup --- bridge/github/export.go | 41 +++++++++++++---------------------------- 1 file changed, 13 insertions(+), 28 deletions(-) (limited to 'bridge/github') diff --git a/bridge/github/export.go b/bridge/github/export.go index 3aecdce0..976c5a05 100644 --- a/bridge/github/export.go +++ b/bridge/github/export.go @@ -7,12 +7,11 @@ import ( "fmt" "io/ioutil" "net/http" - "strings" - "sync" "time" "github.com/pkg/errors" "github.com/shurcooL/githubv4" + "golang.org/x/sync/errgroup" "github.com/MichaelMure/git-bug/bridge/core" "github.com/MichaelMure/git-bug/bug" @@ -707,21 +706,15 @@ func updateGithubIssueTitle(ctx context.Context, gc *githubv4.Client, id, title // update github issue labels func (ge *githubExporter) updateGithubIssueLabels(ctx context.Context, gc *githubv4.Client, labelableID string, added, removed []bug.Label) error { - var errs []string - var wg sync.WaitGroup - reqCtx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() + wg, ctx := errgroup.WithContext(ctx) if len(added) > 0 { - wg.Add(1) - go func() { - defer wg.Done() - + wg.Go(func() error { addedIDs, err := ge.getLabelsIDs(ctx, gc, labelableID, added) if err != nil { - errs = append(errs, errors.Wrap(err, "getting added labels ids").Error()) - return + return err } m := &addLabelsToLabelableMutation{} @@ -732,20 +725,17 @@ func (ge *githubExporter) updateGithubIssueLabels(ctx context.Context, gc *githu // add labels if err := gc.Mutate(reqCtx, m, inputAdd, nil); err != nil { - errs = append(errs, err.Error()) + return err } - }() + return nil + }) } if len(removed) > 0 { - wg.Add(1) - go func() { - defer wg.Done() - + wg.Go(func() error { removedIDs, err := ge.getLabelsIDs(ctx, gc, labelableID, removed) if err != nil { - errs = append(errs, errors.Wrap(err, "getting added labels ids").Error()) - return + return err } m2 := &removeLabelsFromLabelableMutation{} @@ -756,16 +746,11 @@ func (ge *githubExporter) updateGithubIssueLabels(ctx context.Context, gc *githu // remove label labels if err := gc.Mutate(reqCtx, m2, inputRemove, nil); err != nil { - errs = append(errs, err.Error()) + return err } - }() - } - - wg.Wait() - - if len(errs) == 0 { - return nil + return nil + }) } - return fmt.Errorf("label change error: %v", strings.Join(errs, "\n")) + return wg.Wait() } -- cgit From df412430ab35106df57bc32d6dd01bd088bf4735 Mon Sep 17 00:00:00 2001 From: Amine Hilaly Date: Sat, 17 Aug 2019 23:46:10 +0200 Subject: bridge/github: improve error handling and Nothing events vendor: add package golang.org/x/sync/errgroup --- bridge/github/import.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'bridge/github') diff --git a/bridge/github/import.go b/bridge/github/import.go index 9fef9cb5..7c4deb50 100644 --- a/bridge/github/import.go +++ b/bridge/github/import.go @@ -369,8 +369,7 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache. targetOpID, err := b.ResolveOperationWithMetadata(keyGithubId, parseId(item.Id)) if err == nil { - reason := fmt.Sprintf("comment already imported") - gi.out <- core.NewImportNothing("", reason) + gi.out <- core.NewImportNothing("", "comment already imported") } else if err != cache.ErrNoMatchingOp { // real error return err -- cgit