From 5ca326af83b90531d4d0c502bb1beabbe1b48c55 Mon Sep 17 00:00:00 2001 From: Amine Hilaly Date: Tue, 13 Aug 2019 19:51:14 +0200 Subject: bridge/core: add context.Context to ImportAll and ExportAll signatures bridge/core: add ImportResult objects to stream import events bridge/core: launchpad support asynchronous import bridge/github: cancellable export and import functions bridge/gitlab: cancellable export and import functions commands: bridge pull/push gracefull kill bridge/github: fix github import bridge/github: use simple context for imports bridge/core: name parameters in interfaces github/core: Add EventError to export and import events types bridge/gitlab: add context support in gitlab requests functions bridge/gitlab: remove imported events count from importer logic bridge/github: remove imported events count from importer logic bridge/github: add context support in query and muration requets bridge/github: fix bug duplicate editions after multiple calls bridge/core: import import and export events String methods bridge/gitlab: fix error handling in note import events commands/bridge: Add statistics about imports and exports bridge/gitlab: properly handle context cancellation bridge/github: improve error handling bridge: break iterators on context cancel or timeout bridge: add context timeout support bridge: improve event formating and error handling commands: handle interrupt and switch cases bridge/github: add export mutation timeouts bridge: fix race condition bug in the github and gitlab importers bridge/github: improve context error handling --- bridge/core/bridge.go | 15 +-- bridge/core/export.go | 24 +++-- bridge/core/import.go | 128 ++++++++++++++++++++++++ bridge/core/interfaces.go | 5 +- bridge/github/export.go | 140 ++++++++++++++------------- bridge/github/export_test.go | 12 ++- bridge/github/import.go | 199 ++++++++++++++++++++++++++------------ bridge/github/import_test.go | 8 +- bridge/github/iterator.go | 58 +++++------ bridge/gitlab/import.go | 156 ++++++++++++++++++------------ bridge/gitlab/import_notes.go | 39 ++++++++ bridge/gitlab/import_test.go | 9 +- bridge/gitlab/iterator.go | 33 ++++++- bridge/launchpad/import.go | 185 +++++++++++++++++++---------------- bridge/launchpad/launchpad_api.go | 54 +++-------- cache/repo_cache.go | 16 +-- commands/bridge_pull.go | 53 +++++++++- commands/bridge_push.go | 50 ++++++++-- entity/merge.go | 8 +- 19 files changed, 802 insertions(+), 390 deletions(-) create mode 100644 bridge/core/import.go diff --git a/bridge/core/bridge.go b/bridge/core/bridge.go index 645dac3d..9161b418 100644 --- a/bridge/core/bridge.go +++ b/bridge/core/bridge.go @@ -2,6 +2,7 @@ package core import ( + "context" "fmt" "reflect" "regexp" @@ -289,26 +290,26 @@ func (b *Bridge) ensureInit() error { return nil } -func (b *Bridge) ImportAll(since time.Time) error { +func (b *Bridge) ImportAll(ctx context.Context, since time.Time) (<-chan ImportResult, error) { importer := b.getImporter() if importer == nil { - return ErrImportNotSupported + return nil, ErrImportNotSupported } err := b.ensureConfig() if err != nil { - return err + return nil, err } err = b.ensureInit() if err != nil { - return err + return nil, err } - return importer.ImportAll(b.repo, since) + return importer.ImportAll(ctx, b.repo, since) } -func (b *Bridge) ExportAll(since time.Time) (<-chan ExportResult, error) { +func (b *Bridge) ExportAll(ctx context.Context, since time.Time) (<-chan ExportResult, error) { exporter := b.getExporter() if exporter == nil { return nil, ErrExportNotSupported @@ -324,5 +325,5 @@ func (b *Bridge) ExportAll(since time.Time) (<-chan ExportResult, error) { return nil, err } - return exporter.ExportAll(b.repo, since) + return exporter.ExportAll(ctx, b.repo, since) } diff --git a/bridge/core/export.go b/bridge/core/export.go index 09566b62..55cf5a60 100644 --- a/bridge/core/export.go +++ b/bridge/core/export.go @@ -17,6 +17,7 @@ const ( ExportEventTitleEdition ExportEventLabelChange ExportEventNothing + ExportEventError ) // ExportResult is an event that is emitted during the export process, to @@ -32,19 +33,28 @@ type ExportResult struct { func (er ExportResult) String() string { switch er.Event { case ExportEventBug: - return "new issue" + return fmt.Sprintf("new issue: %s", er.ID) case ExportEventComment: - return "new comment" + return fmt.Sprintf("new comment: %s", er.ID) case ExportEventCommentEdition: - return "updated comment" + return fmt.Sprintf("updated comment: %s", er.ID) case ExportEventStatusChange: - return "changed status" + return fmt.Sprintf("changed status: %s", er.ID) case ExportEventTitleEdition: - return "changed title" + return fmt.Sprintf("changed title: %s", er.ID) case ExportEventLabelChange: - return "changed label" + return fmt.Sprintf("changed label: %s", er.ID) case ExportEventNothing: - return fmt.Sprintf("no event: %v", er.Reason) + if er.ID != "" { + return fmt.Sprintf("ignoring export event %s: %s", er.ID, er.Reason) + } + return fmt.Sprintf("ignoring export event: %s", er.Reason) + case ExportEventError: + if er.ID != "" { + return fmt.Sprintf("export error at %s: %s", er.ID, er.Err.Error()) + } + return fmt.Sprintf("export error: %s", er.Err.Error()) + default: panic("unknown export result") } diff --git a/bridge/core/import.go b/bridge/core/import.go new file mode 100644 index 00000000..0961e00b --- /dev/null +++ b/bridge/core/import.go @@ -0,0 +1,128 @@ +package core + +import ( + "fmt" + + "github.com/MichaelMure/git-bug/entity" +) + +type ImportEvent int + +const ( + _ ImportEvent = iota + ImportEventBug + ImportEventComment + ImportEventCommentEdition + ImportEventStatusChange + ImportEventTitleEdition + ImportEventLabelChange + ImportEventIdentity + ImportEventNothing + ImportEventError +) + +// ImportResult is an event that is emitted during the import process, to +// allow calling code to report on what is happening, collect metrics or +// display meaningful errors if something went wrong. +type ImportResult struct { + Err error + Event ImportEvent + ID entity.Id + Reason string +} + +func (er ImportResult) String() string { + switch er.Event { + case ImportEventBug: + return fmt.Sprintf("new issue: %s", er.ID) + case ImportEventComment: + return fmt.Sprintf("new comment: %s", er.ID) + case ImportEventCommentEdition: + return fmt.Sprintf("updated comment: %s", er.ID) + case ImportEventStatusChange: + return fmt.Sprintf("changed status: %s", er.ID) + case ImportEventTitleEdition: + return fmt.Sprintf("changed title: %s", er.ID) + case ImportEventLabelChange: + return fmt.Sprintf("changed label: %s", er.ID) + case ImportEventIdentity: + return fmt.Sprintf("new identity: %s", er.ID) + case ImportEventNothing: + if er.ID != "" { + return fmt.Sprintf("ignoring import event %s: %s", er.ID, er.Reason) + } + return fmt.Sprintf("ignoring event: %s", er.Reason) + case ImportEventError: + if er.ID != "" { + return fmt.Sprintf("import error at id %s: %s", er.ID, er.Err.Error()) + } + return fmt.Sprintf("import error: %s", er.Err.Error()) + default: + panic("unknown import result") + } +} + +func NewImportError(err error, id entity.Id) ImportResult { + return ImportResult{ + Err: err, + ID: id, + Event: ImportEventError, + } +} + +func NewImportNothing(id entity.Id, reason string) ImportResult { + return ImportResult{ + ID: id, + Reason: reason, + Event: ImportEventNothing, + } +} + +func NewImportBug(id entity.Id) ImportResult { + return ImportResult{ + ID: id, + Event: ImportEventBug, + } +} + +func NewImportComment(id entity.Id) ImportResult { + return ImportResult{ + ID: id, + Event: ImportEventComment, + } +} + +func NewImportCommentEdition(id entity.Id) ImportResult { + return ImportResult{ + ID: id, + Event: ImportEventCommentEdition, + } +} + +func NewImportStatusChange(id entity.Id) ImportResult { + return ImportResult{ + ID: id, + Event: ImportEventStatusChange, + } +} + +func NewImportLabelChange(id entity.Id) ImportResult { + return ImportResult{ + ID: id, + Event: ImportEventLabelChange, + } +} + +func NewImportTitleEdition(id entity.Id) ImportResult { + return ImportResult{ + ID: id, + Event: ImportEventTitleEdition, + } +} + +func NewImportIdentity(id entity.Id) ImportResult { + return ImportResult{ + ID: id, + Event: ImportEventIdentity, + } +} diff --git a/bridge/core/interfaces.go b/bridge/core/interfaces.go index 76d66fb4..047f3880 100644 --- a/bridge/core/interfaces.go +++ b/bridge/core/interfaces.go @@ -1,6 +1,7 @@ package core import ( + "context" "time" "github.com/MichaelMure/git-bug/cache" @@ -29,10 +30,10 @@ type BridgeImpl interface { type Importer interface { Init(conf Configuration) error - ImportAll(repo *cache.RepoCache, since time.Time) error + ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan ImportResult, error) } type Exporter interface { Init(conf Configuration) error - ExportAll(repo *cache.RepoCache, since time.Time) (<-chan ExportResult, error) + ExportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan ExportResult, error) } diff --git a/bridge/github/export.go b/bridge/github/export.go index 34c88310..3aecdce0 100644 --- a/bridge/github/export.go +++ b/bridge/github/export.go @@ -79,7 +79,7 @@ func (ge *githubExporter) getIdentityClient(id entity.Id) (*githubv4.Client, err } // ExportAll export all event made by the current user to Github -func (ge *githubExporter) ExportAll(repo *cache.RepoCache, since time.Time) (<-chan core.ExportResult, error) { +func (ge *githubExporter) ExportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ExportResult, error) { out := make(chan core.ExportResult) user, err := repo.GetUserIdentity() @@ -91,6 +91,7 @@ func (ge *githubExporter) ExportAll(repo *cache.RepoCache, since time.Time) (<-c // get repository node id ge.repositoryID, err = getRepositoryNodeID( + ctx, ge.conf[keyOwner], ge.conf[keyProject], ge.conf[keyToken], @@ -117,20 +118,28 @@ func (ge *githubExporter) ExportAll(repo *cache.RepoCache, since time.Time) (<-c return } - snapshot := b.Snapshot() + select { - // ignore issues created before since date - // TODO: compare the Lamport time instead of using the unix time - if snapshot.CreatedAt.Before(since) { - out <- core.NewExportNothing(b.Id(), "bug created before the since date") - continue - } + case <-ctx.Done(): + // stop iterating if context cancel function is called + return - if snapshot.HasAnyActor(allIdentitiesIds...) { - // try to export the bug and it associated events - ge.exportBug(b, since, out) - } else { - out <- core.NewExportNothing(id, "not an actor") + default: + snapshot := b.Snapshot() + + // ignore issues created before since date + // TODO: compare the Lamport time instead of using the unix time + if snapshot.CreatedAt.Before(since) { + out <- core.NewExportNothing(b.Id(), "bug created before the since date") + continue + } + + if snapshot.HasAnyActor(allIdentitiesIds...) { + // try to export the bug and it associated events + ge.exportBug(ctx, b, since, out) + } else { + out <- core.NewExportNothing(id, "not an actor") + } } } }() @@ -139,7 +148,7 @@ func (ge *githubExporter) ExportAll(repo *cache.RepoCache, since time.Time) (<-c } // exportBug publish bugs and related events -func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan<- core.ExportResult) { +func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, since time.Time, out chan<- core.ExportResult) { snapshot := b.Snapshot() var bugGithubID string @@ -199,7 +208,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan } // create bug - id, url, err := createGithubIssue(client, ge.repositoryID, createOp.Title, createOp.Message) + id, url, err := createGithubIssue(ctx, client, ge.repositoryID, createOp.Title, createOp.Message) if err != nil { err := errors.Wrap(err, "exporting github issue") out <- core.NewExportError(err, b.Id()) @@ -257,7 +266,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan opr := op.(*bug.AddCommentOperation) // send operation to github - id, url, err = addCommentGithubIssue(client, bugGithubID, opr.Message) + id, url, err = addCommentGithubIssue(ctx, client, bugGithubID, opr.Message) if err != nil { err := errors.Wrap(err, "adding comment") out <- core.NewExportError(err, b.Id()) @@ -277,7 +286,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan if opr.Target == createOp.Id() { // case bug creation operation: we need to edit the Github issue - if err := updateGithubIssueBody(client, bugGithubID, opr.Message); err != nil { + if err := updateGithubIssueBody(ctx, client, bugGithubID, opr.Message); err != nil { err := errors.Wrap(err, "editing issue") out <- core.NewExportError(err, b.Id()) return @@ -296,7 +305,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan panic("unexpected error: comment id not found") } - eid, eurl, err := editCommentGithubIssue(client, commentID, opr.Message) + eid, eurl, err := editCommentGithubIssue(ctx, client, commentID, opr.Message) if err != nil { err := errors.Wrap(err, "editing comment") out <- core.NewExportError(err, b.Id()) @@ -312,7 +321,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan case *bug.SetStatusOperation: opr := op.(*bug.SetStatusOperation) - if err := updateGithubIssueStatus(client, bugGithubID, opr.Status); err != nil { + if err := updateGithubIssueStatus(ctx, client, bugGithubID, opr.Status); err != nil { err := errors.Wrap(err, "editing status") out <- core.NewExportError(err, b.Id()) return @@ -325,7 +334,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan case *bug.SetTitleOperation: opr := op.(*bug.SetTitleOperation) - if err := updateGithubIssueTitle(client, bugGithubID, opr.Title); err != nil { + if err := updateGithubIssueTitle(ctx, client, bugGithubID, opr.Title); err != nil { err := errors.Wrap(err, "editing title") out <- core.NewExportError(err, b.Id()) return @@ -338,7 +347,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan case *bug.LabelChangeOperation: opr := op.(*bug.LabelChangeOperation) - if err := ge.updateGithubIssueLabels(client, bugGithubID, opr.Added, opr.Removed); err != nil { + if err := ge.updateGithubIssueLabels(ctx, client, bugGithubID, opr.Added, opr.Removed); err != nil { err := errors.Wrap(err, "updating labels") out <- core.NewExportError(err, b.Id()) return @@ -370,12 +379,9 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan } // getRepositoryNodeID request github api v3 to get repository node id -func getRepositoryNodeID(owner, project, token string) (string, error) { +func getRepositoryNodeID(ctx context.Context, owner, project, token string) (string, error) { url := fmt.Sprintf("%s/repos/%s/%s", githubV3Url, owner, project) - - client := &http.Client{ - Timeout: defaultTimeout, - } + client := &http.Client{} req, err := http.NewRequest("GET", url, nil) if err != nil { @@ -385,6 +391,10 @@ func getRepositoryNodeID(owner, project, token string) (string, error) { // need the token for private repositories req.Header.Set("Authorization", fmt.Sprintf("token %s", token)) + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + req = req.WithContext(ctx) + resp, err := client.Do(req) if err != nil { return "", err @@ -425,7 +435,7 @@ func markOperationAsExported(b *cache.BugCache, target entity.Id, githubID, gith } // get label from github -func (ge *githubExporter) getGithubLabelID(gc *githubv4.Client, label string) (string, error) { +func (ge *githubExporter) getGithubLabelID(ctx context.Context, gc *githubv4.Client, label string) (string, error) { q := &labelQuery{} variables := map[string]interface{}{ "label": githubv4.String(label), @@ -433,8 +443,7 @@ func (ge *githubExporter) getGithubLabelID(gc *githubv4.Client, label string) (s "name": githubv4.String(ge.conf[keyProject]), } - parentCtx := context.Background() - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() if err := gc.Query(ctx, q, variables); err != nil { @@ -452,12 +461,9 @@ func (ge *githubExporter) getGithubLabelID(gc *githubv4.Client, label string) (s // create a new label and return it github id // NOTE: since createLabel mutation is still in preview mode we use github api v3 to create labels // see https://developer.github.com/v4/mutation/createlabel/ and https://developer.github.com/v4/previews/#labels-preview -func (ge *githubExporter) createGithubLabel(label, color string) (string, error) { +func (ge *githubExporter) createGithubLabel(ctx context.Context, label, color string) (string, error) { url := fmt.Sprintf("%s/repos/%s/%s/labels", githubV3Url, ge.conf[keyOwner], ge.conf[keyProject]) - - client := &http.Client{ - Timeout: defaultTimeout, - } + client := &http.Client{} params := struct { Name string `json:"name"` @@ -478,6 +484,10 @@ func (ge *githubExporter) createGithubLabel(label, color string) (string, error) return "", err } + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + req = req.WithContext(ctx) + // need the token for private repositories req.Header.Set("Authorization", fmt.Sprintf("token %s", ge.conf[keyToken])) @@ -529,9 +539,9 @@ func (ge *githubExporter) createGithubLabelV4(gc *githubv4.Client, label, labelC } */ -func (ge *githubExporter) getOrCreateGithubLabelID(gc *githubv4.Client, repositoryID string, label bug.Label) (string, error) { +func (ge *githubExporter) getOrCreateGithubLabelID(ctx context.Context, gc *githubv4.Client, repositoryID string, label bug.Label) (string, error) { // try to get label id - labelID, err := ge.getGithubLabelID(gc, string(label)) + labelID, err := ge.getGithubLabelID(ctx, gc, string(label)) if err == nil { return labelID, nil } @@ -540,7 +550,10 @@ func (ge *githubExporter) getOrCreateGithubLabelID(gc *githubv4.Client, reposito rgba := label.RGBA() hexColor := fmt.Sprintf("%.2x%.2x%.2x", rgba.R, rgba.G, rgba.B) - labelID, err = ge.createGithubLabel(string(label), hexColor) + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + + labelID, err = ge.createGithubLabel(ctx, string(label), hexColor) if err != nil { return "", err } @@ -548,7 +561,7 @@ func (ge *githubExporter) getOrCreateGithubLabelID(gc *githubv4.Client, reposito return labelID, nil } -func (ge *githubExporter) getLabelsIDs(gc *githubv4.Client, repositoryID string, labels []bug.Label) ([]githubv4.ID, error) { +func (ge *githubExporter) getLabelsIDs(ctx context.Context, gc *githubv4.Client, repositoryID string, labels []bug.Label) ([]githubv4.ID, error) { ids := make([]githubv4.ID, 0, len(labels)) var err error @@ -557,7 +570,7 @@ func (ge *githubExporter) getLabelsIDs(gc *githubv4.Client, repositoryID string, id, ok := ge.cachedLabels[string(label)] if !ok { // try to query label id - id, err = ge.getOrCreateGithubLabelID(gc, repositoryID, label) + id, err = ge.getOrCreateGithubLabelID(ctx, gc, repositoryID, label) if err != nil { return nil, errors.Wrap(err, "get or create github label") } @@ -573,7 +586,7 @@ func (ge *githubExporter) getLabelsIDs(gc *githubv4.Client, repositoryID string, } // create a github issue and return it ID -func createGithubIssue(gc *githubv4.Client, repositoryID, title, body string) (string, string, error) { +func createGithubIssue(ctx context.Context, gc *githubv4.Client, repositoryID, title, body string) (string, string, error) { m := &createIssueMutation{} input := githubv4.CreateIssueInput{ RepositoryID: repositoryID, @@ -581,8 +594,7 @@ func createGithubIssue(gc *githubv4.Client, repositoryID, title, body string) (s Body: (*githubv4.String)(&body), } - parentCtx := context.Background() - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() if err := gc.Mutate(ctx, m, input, nil); err != nil { @@ -594,15 +606,14 @@ func createGithubIssue(gc *githubv4.Client, repositoryID, title, body string) (s } // add a comment to an issue and return it ID -func addCommentGithubIssue(gc *githubv4.Client, subjectID string, body string) (string, string, error) { +func addCommentGithubIssue(ctx context.Context, gc *githubv4.Client, subjectID string, body string) (string, string, error) { m := &addCommentToIssueMutation{} input := githubv4.AddCommentInput{ SubjectID: subjectID, Body: githubv4.String(body), } - parentCtx := context.Background() - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() if err := gc.Mutate(ctx, m, input, nil); err != nil { @@ -613,15 +624,14 @@ func addCommentGithubIssue(gc *githubv4.Client, subjectID string, body string) ( return node.ID, node.URL, nil } -func editCommentGithubIssue(gc *githubv4.Client, commentID, body string) (string, string, error) { +func editCommentGithubIssue(ctx context.Context, gc *githubv4.Client, commentID, body string) (string, string, error) { m := &updateIssueCommentMutation{} input := githubv4.UpdateIssueCommentInput{ ID: commentID, Body: githubv4.String(body), } - parentCtx := context.Background() - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() if err := gc.Mutate(ctx, m, input, nil); err != nil { @@ -631,7 +641,7 @@ func editCommentGithubIssue(gc *githubv4.Client, commentID, body string) (string return commentID, m.UpdateIssueComment.IssueComment.URL, nil } -func updateGithubIssueStatus(gc *githubv4.Client, id string, status bug.Status) error { +func updateGithubIssueStatus(ctx context.Context, gc *githubv4.Client, id string, status bug.Status) error { m := &updateIssueMutation{} // set state @@ -651,8 +661,7 @@ func updateGithubIssueStatus(gc *githubv4.Client, id string, status bug.Status) State: &state, } - parentCtx := context.Background() - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() if err := gc.Mutate(ctx, m, input, nil); err != nil { @@ -662,15 +671,14 @@ func updateGithubIssueStatus(gc *githubv4.Client, id string, status bug.Status) return nil } -func updateGithubIssueBody(gc *githubv4.Client, id string, body string) error { +func updateGithubIssueBody(ctx context.Context, gc *githubv4.Client, id string, body string) error { m := &updateIssueMutation{} input := githubv4.UpdateIssueInput{ ID: id, Body: (*githubv4.String)(&body), } - parentCtx := context.Background() - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() if err := gc.Mutate(ctx, m, input, nil); err != nil { @@ -680,15 +688,14 @@ func updateGithubIssueBody(gc *githubv4.Client, id string, body string) error { return nil } -func updateGithubIssueTitle(gc *githubv4.Client, id, title string) error { +func updateGithubIssueTitle(ctx context.Context, gc *githubv4.Client, id, title string) error { m := &updateIssueMutation{} input := githubv4.UpdateIssueInput{ ID: id, Title: (*githubv4.String)(&title), } - parentCtx := context.Background() - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) defer cancel() if err := gc.Mutate(ctx, m, input, nil); err != nil { @@ -699,18 +706,19 @@ func updateGithubIssueTitle(gc *githubv4.Client, id, title string) error { } // update github issue labels -func (ge *githubExporter) updateGithubIssueLabels(gc *githubv4.Client, labelableID string, added, removed []bug.Label) error { +func (ge *githubExporter) updateGithubIssueLabels(ctx context.Context, gc *githubv4.Client, labelableID string, added, removed []bug.Label) error { var errs []string var wg sync.WaitGroup - parentCtx := context.Background() + reqCtx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() if len(added) > 0 { wg.Add(1) go func() { defer wg.Done() - addedIDs, err := ge.getLabelsIDs(gc, labelableID, added) + addedIDs, err := ge.getLabelsIDs(ctx, gc, labelableID, added) if err != nil { errs = append(errs, errors.Wrap(err, "getting added labels ids").Error()) return @@ -722,11 +730,8 @@ func (ge *githubExporter) updateGithubIssueLabels(gc *githubv4.Client, labelable LabelIDs: addedIDs, } - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) - defer cancel() - // add labels - if err := gc.Mutate(ctx, m, inputAdd, nil); err != nil { + if err := gc.Mutate(reqCtx, m, inputAdd, nil); err != nil { errs = append(errs, err.Error()) } }() @@ -737,7 +742,7 @@ func (ge *githubExporter) updateGithubIssueLabels(gc *githubv4.Client, labelable go func() { defer wg.Done() - removedIDs, err := ge.getLabelsIDs(gc, labelableID, removed) + removedIDs, err := ge.getLabelsIDs(ctx, gc, labelableID, removed) if err != nil { errs = append(errs, errors.Wrap(err, "getting added labels ids").Error()) return @@ -749,11 +754,8 @@ func (ge *githubExporter) updateGithubIssueLabels(gc *githubv4.Client, labelable LabelIDs: removedIDs, } - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) - defer cancel() - // remove label labels - if err := gc.Mutate(ctx, m2, inputRemove, nil); err != nil { + if err := gc.Mutate(reqCtx, m2, inputRemove, nil); err != nil { errs = append(errs, err.Error()) } }() diff --git a/bridge/github/export_test.go b/bridge/github/export_test.go index 107fe63b..a0be7cff 100644 --- a/bridge/github/export_test.go +++ b/bridge/github/export_test.go @@ -2,6 +2,7 @@ package github import ( "bytes" + "context" "encoding/json" "fmt" "math/rand" @@ -177,13 +178,14 @@ func TestPushPull(t *testing.T) { }) require.NoError(t, err) + ctx := context.Background() start := time.Now() // export all bugs - events, err := exporter.ExportAll(backend, time.Time{}) + exportEvents, err := exporter.ExportAll(ctx, backend, time.Time{}) require.NoError(t, err) - for result := range events { + for result := range exportEvents { require.NoError(t, result.Err) } require.NoError(t, err) @@ -206,9 +208,13 @@ func TestPushPull(t *testing.T) { require.NoError(t, err) // import all exported bugs to the second backend - err = importer.ImportAll(backendTwo, time.Time{}) + importEvents, err := importer.ImportAll(ctx, backendTwo, time.Time{}) require.NoError(t, err) + for result := range importEvents { + require.NoError(t, result.Err) + } + require.Len(t, backendTwo.AllBugsIds(), len(tests)) for _, tt := range tests { diff --git a/bridge/github/import.go b/bridge/github/import.go index dcaf2d05..9fef9cb5 100644 --- a/bridge/github/import.go +++ b/bridge/github/import.go @@ -28,11 +28,8 @@ type githubImporter struct { // iterator iterator *iterator - // number of imported issues - importedIssues int - - // number of imported identities - importedIdentities int + // send only channel + out chan<- core.ImportResult } func (gi *githubImporter) Init(conf core.Configuration) error { @@ -42,40 +39,49 @@ func (gi *githubImporter) Init(conf core.Configuration) error { // ImportAll iterate over all the configured repository issues and ensure the creation of the // missing issues / timeline items / edits / label events ... -func (gi *githubImporter) ImportAll(repo *cache.RepoCache, since time.Time) error { - gi.iterator = NewIterator(gi.conf[keyOwner], gi.conf[keyProject], gi.conf[keyToken], since) - - // Loop over all matching issues - for gi.iterator.NextIssue() { - issue := gi.iterator.IssueValue() - fmt.Printf("importing issue: %v\n", issue.Title) +func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) { + gi.iterator = NewIterator(ctx, 10, gi.conf[keyOwner], gi.conf[keyProject], gi.conf[keyToken], since) + out := make(chan core.ImportResult) + gi.out = out + + go func() { + defer close(gi.out) + + // Loop over all matching issues + for gi.iterator.NextIssue() { + issue := gi.iterator.IssueValue() + // create issue + b, err := gi.ensureIssue(repo, issue) + if err != nil { + err := fmt.Errorf("issue creation: %v", err) + out <- core.NewImportError(err, "") + return + } - // create issue - b, err := gi.ensureIssue(repo, issue) - if err != nil { - return fmt.Errorf("issue creation: %v", err) - } + // loop over timeline items + for gi.iterator.NextTimelineItem() { + item := gi.iterator.TimelineItemValue() + if err := gi.ensureTimelineItem(repo, b, item); err != nil { + err := fmt.Errorf("timeline item creation: %v", err) + out <- core.NewImportError(err, "") + return + } + } - // loop over timeline items - for gi.iterator.NextTimelineItem() { - if err := gi.ensureTimelineItem(repo, b, gi.iterator.TimelineItemValue()); err != nil { - return fmt.Errorf("timeline item creation: %v", err) + // commit bug state + if err := b.CommitAsNeeded(); err != nil { + err = fmt.Errorf("bug commit: %v", err) + out <- core.NewImportError(err, "") + return } } - // commit bug state - if err := b.CommitAsNeeded(); err != nil { - return fmt.Errorf("bug commit: %v", err) + if err := gi.iterator.Error(); err != nil && err != context.Canceled { + gi.out <- core.NewImportError(err, "") } - } + }() - if err := gi.iterator.Error(); err != nil { - fmt.Printf("import error: %v\n", err) - return err - } - - fmt.Printf("Successfully imported %d issues and %d identities from Github\n", gi.importedIssues, gi.importedIdentities) - return nil + return out, nil } func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline) (*cache.BugCache, error) { @@ -122,7 +128,9 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline } // importing a new bug - gi.importedIssues++ + gi.out <- core.NewImportBug(b.Id()) + } else { + gi.out <- core.NewImportNothing("", "bug already imported") } } else { @@ -130,6 +138,7 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline for i, edit := range issueEdits { if i == 0 && b != nil { // The first edit in the github result is the issue creation itself, we already have that + gi.out <- core.NewImportNothing("", "bug already imported") continue } @@ -159,13 +168,12 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline } // importing a new bug - gi.importedIssues++ - + gi.out <- core.NewImportBug(b.Id()) continue } // other edits will be added as CommentEdit operations - target, err := b.ResolveOperationWithMetadata(keyGithubUrl, issue.Url.String()) + target, err := b.ResolveOperationWithMetadata(keyGithubId, parseId(issue.Id)) if err != nil { return nil, err } @@ -181,16 +189,16 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline } func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.BugCache, item timelineItem) error { - fmt.Printf("import event item: %s\n", item.Typename) switch item.Typename { case "IssueComment": // collect all comment edits - commentEdits := []userContentEdit{} + var commentEdits []userContentEdit for gi.iterator.NextCommentEdit() { commentEdits = append(commentEdits, gi.iterator.CommentEditValue()) } + // ensureTimelineComment send import events over out chanel err := gi.ensureTimelineComment(repo, b, item.IssueComment, commentEdits) if err != nil { return fmt.Errorf("timeline comment creation: %v", err) @@ -199,6 +207,12 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug case "LabeledEvent": id := parseId(item.LabeledEvent.Id) _, err := b.ResolveOperationWithMetadata(keyGithubId, id) + if err == nil { + reason := fmt.Sprintf("operation already imported: %v", item.Typename) + gi.out <- core.NewImportNothing("", reason) + return nil + } + if err != cache.ErrNoMatchingOp { return err } @@ -206,7 +220,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug if err != nil { return err } - _, err = b.ForceChangeLabelsRaw( + op, err := b.ForceChangeLabelsRaw( author, item.LabeledEvent.CreatedAt.Unix(), []string{ @@ -215,12 +229,21 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug nil, map[string]string{keyGithubId: id}, ) + if err != nil { + return err + } - return err + gi.out <- core.NewImportLabelChange(op.Id()) + return nil case "UnlabeledEvent": id := parseId(item.UnlabeledEvent.Id) _, err := b.ResolveOperationWithMetadata(keyGithubId, id) + if err == nil { + reason := fmt.Sprintf("operation already imported: %v", item.Typename) + gi.out <- core.NewImportNothing("", reason) + return nil + } if err != cache.ErrNoMatchingOp { return err } @@ -229,7 +252,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug return err } - _, err = b.ForceChangeLabelsRaw( + op, err := b.ForceChangeLabelsRaw( author, item.UnlabeledEvent.CreatedAt.Unix(), nil, @@ -238,7 +261,12 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug }, map[string]string{keyGithubId: id}, ) - return err + if err != nil { + return err + } + + gi.out <- core.NewImportLabelChange(op.Id()) + return nil case "ClosedEvent": id := parseId(item.ClosedEvent.Id) @@ -246,16 +274,27 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug if err != cache.ErrNoMatchingOp { return err } + if err == nil { + reason := fmt.Sprintf("operation already imported: %v", item.Typename) + gi.out <- core.NewImportNothing("", reason) + return nil + } author, err := gi.ensurePerson(repo, item.ClosedEvent.Actor) if err != nil { return err } - _, err = b.CloseRaw( + op, err := b.CloseRaw( author, item.ClosedEvent.CreatedAt.Unix(), map[string]string{keyGithubId: id}, ) - return err + + if err != nil { + return err + } + + gi.out <- core.NewImportStatusChange(op.Id()) + return nil case "ReopenedEvent": id := parseId(item.ReopenedEvent.Id) @@ -263,16 +302,27 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug if err != cache.ErrNoMatchingOp { return err } + if err == nil { + reason := fmt.Sprintf("operation already imported: %v", item.Typename) + gi.out <- core.NewImportNothing("", reason) + return nil + } author, err := gi.ensurePerson(repo, item.ReopenedEvent.Actor) if err != nil { return err } - _, err = b.OpenRaw( + op, err := b.OpenRaw( author, item.ReopenedEvent.CreatedAt.Unix(), map[string]string{keyGithubId: id}, ) - return err + + if err != nil { + return err + } + + gi.out <- core.NewImportStatusChange(op.Id()) + return nil case "RenamedTitleEvent": id := parseId(item.RenamedTitleEvent.Id) @@ -280,20 +330,31 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug if err != cache.ErrNoMatchingOp { return err } + if err == nil { + reason := fmt.Sprintf("operation already imported: %v", item.Typename) + gi.out <- core.NewImportNothing("", reason) + return nil + } author, err := gi.ensurePerson(repo, item.RenamedTitleEvent.Actor) if err != nil { return err } - _, err = b.SetTitleRaw( + op, err := b.SetTitleRaw( author, item.RenamedTitleEvent.CreatedAt.Unix(), string(item.RenamedTitleEvent.CurrentTitle), map[string]string{keyGithubId: id}, ) - return err + if err != nil { + return err + } + + gi.out <- core.NewImportTitleEdition(op.Id()) + return nil default: - fmt.Printf("ignore event: %v\n", item.Typename) + reason := fmt.Sprintf("ignoring timeline type: %v", item.Typename) + gi.out <- core.NewImportNothing("", reason) } return nil @@ -307,14 +368,16 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache. } targetOpID, err := b.ResolveOperationWithMetadata(keyGithubId, parseId(item.Id)) - if err != nil && err != cache.ErrNoMatchingOp { + if err == nil { + reason := fmt.Sprintf("comment already imported") + gi.out <- core.NewImportNothing("", reason) + } else if err != cache.ErrNoMatchingOp { // real error return err } // if no edits are given we create the comment if len(edits) == 0 { - // if comment doesn't exist if err == cache.ErrNoMatchingOp { cleanText, err := text.Cleanup(string(item.Body)) if err != nil { @@ -322,7 +385,7 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache. } // add comment operation - _, err = b.AddCommentRaw( + op, err := b.AddCommentRaw( author, item.CreatedAt.Unix(), cleanText, @@ -332,12 +395,18 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache. keyGithubUrl: parseId(item.Url.String()), }, ) - return err + if err != nil { + return err + } + + gi.out <- core.NewImportComment(op.Id()) } + } else { for i, edit := range edits { if i == 0 && targetOpID != "" { // The first edit in the github result is the comment creation itself, we already have that + gi.out <- core.NewImportNothing("", "comment already imported") continue } @@ -370,7 +439,6 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache. // set target for the nexr edit now that the comment is created targetOpID = op.Id() - continue } @@ -386,7 +454,7 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache. func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit userContentEdit) error { _, err := b.ResolveOperationWithMetadata(keyGithubId, parseId(edit.Id)) if err == nil { - // already imported + gi.out <- core.NewImportNothing(b.Id(), "edition already imported") return nil } if err != cache.ErrNoMatchingOp { @@ -394,8 +462,6 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC return err } - fmt.Println("import edition") - editor, err := gi.ensurePerson(repo, edit.Editor) if err != nil { return err @@ -404,7 +470,7 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC switch { case edit.DeletedAt != nil: // comment deletion, not supported yet - fmt.Println("comment deletion is not supported yet") + gi.out <- core.NewImportNothing(b.Id(), "comment deletion is not supported yet") case edit.DeletedAt == nil: @@ -414,7 +480,7 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC } // comment edition - _, err = b.EditCommentRaw( + op, err := b.EditCommentRaw( editor, edit.CreatedAt.Unix(), target, @@ -427,6 +493,8 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC if err != nil { return err } + + gi.out <- core.NewImportCommentEdition(op.Id()) } return nil @@ -450,7 +518,6 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca } // importing a new identity - gi.importedIdentities++ var name string var email string @@ -471,7 +538,7 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca case "Bot": } - return repo.NewIdentityRaw( + i, err = repo.NewIdentityRaw( name, email, string(actor.Login), @@ -480,6 +547,13 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca keyGithubLogin: string(actor.Login), }, ) + + if err != nil { + return nil, err + } + + gi.out <- core.NewImportIdentity(i.Id()) + return i, nil } func (gi *githubImporter) getGhost(repo *cache.RepoCache) (*cache.IdentityCache, error) { @@ -500,8 +574,7 @@ func (gi *githubImporter) getGhost(repo *cache.RepoCache) (*cache.IdentityCache, gc := buildClient(gi.conf[keyToken]) - parentCtx := context.Background() - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) + ctx, cancel := context.WithTimeout(gi.iterator.ctx, defaultTimeout) defer cancel() err = gc.Query(ctx, &q, variables) diff --git a/bridge/github/import_test.go b/bridge/github/import_test.go index 5ba87993..41bcb58d 100644 --- a/bridge/github/import_test.go +++ b/bridge/github/import_test.go @@ -1,6 +1,7 @@ package github import ( + "context" "fmt" "os" "testing" @@ -146,11 +147,16 @@ func Test_Importer(t *testing.T) { }) require.NoError(t, err) + ctx := context.Background() start := time.Now() - err = importer.ImportAll(backend, time.Time{}) + events, err := importer.ImportAll(ctx, backend, time.Time{}) require.NoError(t, err) + for result := range events { + require.NoError(t, result.Err) + } + fmt.Printf("test repository imported in %f seconds\n", time.Since(start).Seconds()) require.Len(t, backend.AllBugsIds(), len(tests)) diff --git a/bridge/github/iterator.go b/bridge/github/iterator.go index 6d63cf42..a97ed036 100644 --- a/bridge/github/iterator.go +++ b/bridge/github/iterator.go @@ -46,6 +46,9 @@ type iterator struct { // to make capacity int + // shared context used for all graphql queries + ctx context.Context + // sticky error err error @@ -60,11 +63,12 @@ type iterator struct { } // NewIterator create and initialize a new iterator -func NewIterator(owner, project, token string, since time.Time) *iterator { +func NewIterator(ctx context.Context, capacity int, owner, project, token string, since time.Time) *iterator { i := &iterator{ gc: buildClient(token), since: since, - capacity: 10, + capacity: capacity, + ctx: ctx, timeline: timelineIterator{ index: -1, issueEdit: indexer{-1}, @@ -147,8 +151,7 @@ func (i *iterator) Error() error { } func (i *iterator) queryIssue() bool { - parentCtx := context.Background() - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) + ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout) defer cancel() if err := i.gc.Query(ctx, &i.timeline.query, i.timeline.variables); err != nil { @@ -167,6 +170,10 @@ func (i *iterator) queryIssue() bool { // NextIssue try to query the next issue and return true. Only one issue is // queried at each call. func (i *iterator) NextIssue() bool { + if i.err != nil { + return false + } + // if $issueAfter variable is nil we can directly make the first query if i.timeline.variables["issueAfter"] == (*githubv4.String)(nil) { nextIssue := i.queryIssue() @@ -175,10 +182,6 @@ func (i *iterator) NextIssue() bool { return nextIssue } - if i.err != nil { - return false - } - if !i.timeline.query.Repository.Issues.PageInfo.HasNextPage { return false } @@ -207,11 +210,15 @@ func (i *iterator) NextTimelineItem() bool { return false } + if i.ctx.Err() != nil { + return false + } + if len(i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges) == 0 { return false } - if i.timeline.index < min(i.capacity, len(i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges))-1 { + if i.timeline.index < len(i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges)-1 { i.timeline.index++ return true } @@ -225,8 +232,7 @@ func (i *iterator) NextTimelineItem() bool { // more timelines, query them i.timeline.variables["timelineAfter"] = i.timeline.query.Repository.Issues.Nodes[0].Timeline.PageInfo.EndCursor - parentCtx := context.Background() - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) + ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout) defer cancel() if err := i.gc.Query(ctx, &i.timeline.query, i.timeline.variables); err != nil { @@ -245,8 +251,7 @@ func (i *iterator) TimelineItemValue() timelineItem { } func (i *iterator) queryIssueEdit() bool { - parentCtx := context.Background() - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) + ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout) defer cancel() if err := i.gc.Query(ctx, &i.issueEdit.query, i.issueEdit.variables); err != nil { @@ -285,10 +290,14 @@ func (i *iterator) NextIssueEdit() bool { return false } + if i.ctx.Err() != nil { + return false + } + // this mean we looped over all available issue edits in the timeline. // now we have to use i.issueEditQuery if i.timeline.issueEdit.index == -2 { - if i.issueEdit.index < min(i.capacity, len(i.issueEdit.query.Repository.Issues.Nodes[0].UserContentEdits.Nodes))-1 { + if i.issueEdit.index < len(i.issueEdit.query.Repository.Issues.Nodes[0].UserContentEdits.Nodes)-1 { i.issueEdit.index++ return i.nextValidIssueEdit() } @@ -319,7 +328,7 @@ func (i *iterator) NextIssueEdit() bool { } // loop over them timeline comment edits - if i.timeline.issueEdit.index < min(i.capacity, len(i.timeline.query.Repository.Issues.Nodes[0].UserContentEdits.Nodes))-1 { + if i.timeline.issueEdit.index < len(i.timeline.query.Repository.Issues.Nodes[0].UserContentEdits.Nodes)-1 { i.timeline.issueEdit.index++ return i.nextValidIssueEdit() } @@ -347,8 +356,7 @@ func (i *iterator) IssueEditValue() userContentEdit { } func (i *iterator) queryCommentEdit() bool { - parentCtx := context.Background() - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) + ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout) defer cancel() if err := i.gc.Query(ctx, &i.commentEdit.query, i.commentEdit.variables); err != nil { @@ -384,10 +392,14 @@ func (i *iterator) NextCommentEdit() bool { return false } + if i.ctx.Err() != nil { + return false + } + // same as NextIssueEdit if i.timeline.commentEdit.index == -2 { - if i.commentEdit.index < min(i.capacity, len(i.commentEdit.query.Repository.Issues.Nodes[0].Timeline.Nodes[0].IssueComment.UserContentEdits.Nodes))-1 { + if i.commentEdit.index < len(i.commentEdit.query.Repository.Issues.Nodes[0].Timeline.Nodes[0].IssueComment.UserContentEdits.Nodes)-1 { i.commentEdit.index++ return i.nextValidCommentEdit() } @@ -409,7 +421,7 @@ func (i *iterator) NextCommentEdit() bool { } // loop over them timeline comment edits - if i.timeline.commentEdit.index < min(i.capacity, len(i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges[i.timeline.index].Node.IssueComment.UserContentEdits.Nodes))-1 { + if i.timeline.commentEdit.index < len(i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges[i.timeline.index].Node.IssueComment.UserContentEdits.Nodes)-1 { i.timeline.commentEdit.index++ return i.nextValidCommentEdit() } @@ -440,14 +452,6 @@ func (i *iterator) CommentEditValue() userContentEdit { return i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges[i.timeline.index].Node.IssueComment.UserContentEdits.Nodes[i.timeline.commentEdit.index] } -func min(a, b int) int { - if a > b { - return b - } - - return a -} - func reverseEdits(edits []userContentEdit) { for i, j := 0, len(edits)-1; i < j; i, j = i+1, j-1 { edits[i], edits[j] = edits[j], edits[i] diff --git a/bridge/gitlab/import.go b/bridge/gitlab/import.go index e135b8bc..40ac06d3 100644 --- a/bridge/gitlab/import.go +++ b/bridge/gitlab/import.go @@ -1,6 +1,7 @@ package gitlab import ( + "context" "fmt" "strconv" "time" @@ -21,11 +22,8 @@ type gitlabImporter struct { // iterator iterator *iterator - // number of imported issues - importedIssues int - - // number of imported identities - importedIdentities int + // send only channel + out chan<- core.ImportResult } func (gi *gitlabImporter) Init(conf core.Configuration) error { @@ -35,49 +33,60 @@ func (gi *gitlabImporter) Init(conf core.Configuration) error { // ImportAll iterate over all the configured repository issues (notes) and ensure the creation // of the missing issues / comments / label events / title changes ... -func (gi *gitlabImporter) ImportAll(repo *cache.RepoCache, since time.Time) error { - gi.iterator = NewIterator(gi.conf[keyProjectID], gi.conf[keyToken], since) - - // Loop over all matching issues - for gi.iterator.NextIssue() { - issue := gi.iterator.IssueValue() - fmt.Printf("importing issue: %v\n", issue.Title) +func (gi *gitlabImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) { + gi.iterator = NewIterator(ctx, 10, gi.conf[keyProjectID], gi.conf[keyToken], since) + out := make(chan core.ImportResult) + gi.out = out + + go func() { + defer close(gi.out) + + // Loop over all matching issues + for gi.iterator.NextIssue() { + issue := gi.iterator.IssueValue() + + // create issue + b, err := gi.ensureIssue(repo, issue) + if err != nil { + err := fmt.Errorf("issue creation: %v", err) + out <- core.NewImportError(err, "") + return + } - // create issue - b, err := gi.ensureIssue(repo, issue) - if err != nil { - return fmt.Errorf("issue creation: %v", err) - } + // Loop over all notes + for gi.iterator.NextNote() { + note := gi.iterator.NoteValue() + if err := gi.ensureNote(repo, b, note); err != nil { + err := fmt.Errorf("note creation: %v", err) + out <- core.NewImportError(err, entity.Id(strconv.Itoa(note.ID))) + return + } + } - // Loop over all notes - for gi.iterator.NextNote() { - note := gi.iterator.NoteValue() - if err := gi.ensureNote(repo, b, note); err != nil { - return fmt.Errorf("note creation: %v", err) + // Loop over all label events + for gi.iterator.NextLabelEvent() { + labelEvent := gi.iterator.LabelEventValue() + if err := gi.ensureLabelEvent(repo, b, labelEvent); err != nil { + err := fmt.Errorf("label event creation: %v", err) + out <- core.NewImportError(err, entity.Id(strconv.Itoa(labelEvent.ID))) + return + } } - } - // Loop over all label events - for gi.iterator.NextLabelEvent() { - labelEvent := gi.iterator.LabelEventValue() - if err := gi.ensureLabelEvent(repo, b, labelEvent); err != nil { - return fmt.Errorf("label event creation: %v", err) + // commit bug state + if err := b.CommitAsNeeded(); err != nil { + err := fmt.Errorf("bug commit: %v", err) + out <- core.NewImportError(err, "") + return } } if err := gi.iterator.Error(); err != nil { - fmt.Printf("import error: %v\n", err) - return err + out <- core.NewImportError(err, "") } + }() - // commit bug state - if err := b.CommitAsNeeded(); err != nil { - return fmt.Errorf("bug commit: %v", err) - } - } - - fmt.Printf("Successfully imported %d issues and %d identities from Gitlab\n", gi.importedIssues, gi.importedIdentities) - return nil + return out, nil } func (gi *gitlabImporter) ensureIssue(repo *cache.RepoCache, issue *gitlab.Issue) (*cache.BugCache, error) { @@ -89,13 +98,14 @@ func (gi *gitlabImporter) ensureIssue(repo *cache.RepoCache, issue *gitlab.Issue // resolve bug b, err := repo.ResolveBugCreateMetadata(keyGitlabUrl, issue.WebURL) - if err != nil && err != bug.ErrBugNotExist { - return nil, err - } - if err == nil { + reason := fmt.Sprintf("bug already imported") + gi.out <- core.NewImportNothing("", reason) return b, nil } + if err != bug.ErrBugNotExist { + return nil, err + } // if bug was never imported cleanText, err := text.Cleanup(issue.Description) @@ -123,7 +133,7 @@ func (gi *gitlabImporter) ensureIssue(repo *cache.RepoCache, issue *gitlab.Issue } // importing a new bug - gi.importedIssues++ + gi.out <- core.NewImportBug(b.Id()) return b, nil } @@ -149,28 +159,36 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n return nil } - _, err = b.CloseRaw( + op, err := b.CloseRaw( author, note.CreatedAt.Unix(), map[string]string{ keyGitlabId: gitlabID, }, ) - return err + if err != nil { + return err + } + + gi.out <- core.NewImportStatusChange(op.Id()) case NOTE_REOPENED: if errResolve == nil { return nil } - _, err = b.OpenRaw( + op, err := b.OpenRaw( author, note.CreatedAt.Unix(), map[string]string{ keyGitlabId: gitlabID, }, ) - return err + if err != nil { + return err + } + + gi.out <- core.NewImportStatusChange(op.Id()) case NOTE_DESCRIPTION_CHANGED: issue := gi.iterator.IssueValue() @@ -181,7 +199,7 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n // TODO: Check only one time and ignore next 'description change' within one issue if errResolve == cache.ErrNoMatchingOp && issue.Description != firstComment.Message { // comment edition - _, err = b.EditCommentRaw( + op, err := b.EditCommentRaw( author, note.UpdatedAt.Unix(), firstComment.Id(), @@ -190,8 +208,11 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n keyGitlabId: gitlabID, }, ) + if err != nil { + return err + } - return err + gi.out <- core.NewImportTitleEdition(op.Id()) } case NOTE_COMMENT: @@ -204,7 +225,7 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n if errResolve == cache.ErrNoMatchingOp { // add comment operation - _, err = b.AddCommentRaw( + op, err := b.AddCommentRaw( author, note.CreatedAt.Unix(), cleanText, @@ -213,8 +234,11 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n keyGitlabId: gitlabID, }, ) - - return err + if err != nil { + return err + } + gi.out <- core.NewImportComment(op.Id()) + return nil } // if comment was already exported @@ -228,7 +252,7 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n // compare local bug comment with the new note body if comment.Message != cleanText { // comment edition - _, err = b.EditCommentRaw( + op, err := b.EditCommentRaw( author, note.UpdatedAt.Unix(), comment.Id(), @@ -236,7 +260,10 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n nil, ) - return err + if err != nil { + return err + } + gi.out <- core.NewImportCommentEdition(op.Id()) } return nil @@ -247,7 +274,7 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n return nil } - _, err = b.SetTitleRaw( + op, err := b.SetTitleRaw( author, note.CreatedAt.Unix(), body, @@ -255,8 +282,11 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n keyGitlabId: gitlabID, }, ) + if err != nil { + return err + } - return err + gi.out <- core.NewImportTitleEdition(op.Id()) case NOTE_UNKNOWN, NOTE_ASSIGNED, @@ -269,6 +299,9 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n NOTE_UNLOCKED, NOTE_MENTIONED_IN_ISSUE, NOTE_MENTIONED_IN_MERGE_REQUEST: + + reason := fmt.Sprintf("unsupported note type: %s", noteType.String()) + gi.out <- core.NewImportNothing("", reason) return nil default: @@ -337,10 +370,7 @@ func (gi *gitlabImporter) ensurePerson(repo *cache.RepoCache, id int) (*cache.Id return nil, err } - // importing a new identity - gi.importedIdentities++ - - return repo.NewIdentityRaw( + i, err = repo.NewIdentityRaw( user.Name, user.PublicEmail, user.Username, @@ -351,6 +381,12 @@ func (gi *gitlabImporter) ensurePerson(repo *cache.RepoCache, id int) (*cache.Id keyGitlabLogin: user.Username, }, ) + if err != nil { + return nil, err + } + + gi.out <- core.NewImportIdentity(i.Id()) + return i, nil } func parseID(id int) string { diff --git a/bridge/gitlab/import_notes.go b/bridge/gitlab/import_notes.go index c0796037..b38cb371 100644 --- a/bridge/gitlab/import_notes.go +++ b/bridge/gitlab/import_notes.go @@ -28,6 +28,45 @@ const ( NOTE_UNKNOWN ) +func (nt NoteType) String() string { + switch nt { + case NOTE_COMMENT: + return "note comment" + case NOTE_TITLE_CHANGED: + return "note title changed" + case NOTE_DESCRIPTION_CHANGED: + return "note description changed" + case NOTE_CLOSED: + return "note closed" + case NOTE_REOPENED: + return "note reopened" + case NOTE_LOCKED: + return "note locked" + case NOTE_UNLOCKED: + return "note unlocked" + case NOTE_CHANGED_DUEDATE: + return "note changed duedate" + case NOTE_REMOVED_DUEDATE: + return "note remove duedate" + case NOTE_ASSIGNED: + return "note assigned" + case NOTE_UNASSIGNED: + return "note unassigned" + case NOTE_CHANGED_MILESTONE: + return "note changed milestone" + case NOTE_REMOVED_MILESTONE: + return "note removed in milestone" + case NOTE_MENTIONED_IN_ISSUE: + return "note mentioned in issue" + case NOTE_MENTIONED_IN_MERGE_REQUEST: + return "note mentioned in merge request" + case NOTE_UNKNOWN: + return "note unknown" + default: + panic("unknown note type") + } +} + // GetNoteType parse a note system and body and return the note type and it content func GetNoteType(n *gitlab.Note) (NoteType, string) { // when a note is a comment system is set to false diff --git a/bridge/gitlab/import_test.go b/bridge/gitlab/import_test.go index c38d3ce3..20fc67c7 100644 --- a/bridge/gitlab/import_test.go +++ b/bridge/gitlab/import_test.go @@ -1,6 +1,7 @@ package gitlab import ( + "context" "fmt" "os" "testing" @@ -99,10 +100,16 @@ func TestImport(t *testing.T) { }) require.NoError(t, err) + ctx := context.Background() start := time.Now() - err = importer.ImportAll(backend, time.Time{}) + + events, err := importer.ImportAll(ctx, backend, time.Time{}) require.NoError(t, err) + for result := range events { + require.NoError(t, result.Err) + } + fmt.Printf("test repository imported in %f seconds\n", time.Since(start).Seconds()) require.Len(t, backend.AllBugsIds(), len(tests)) diff --git a/bridge/gitlab/iterator.go b/bridge/gitlab/iterator.go index 883fea9c..198af79b 100644 --- a/bridge/gitlab/iterator.go +++ b/bridge/gitlab/iterator.go @@ -1,6 +1,7 @@ package gitlab import ( + "context" "time" "github.com/xanzy/go-gitlab" @@ -38,6 +39,9 @@ type iterator struct { // number of issues and notes to query at once capacity int + // shared context + ctx context.Context + // sticky error err error @@ -52,12 +56,13 @@ type iterator struct { } // NewIterator create a new iterator -func NewIterator(projectID, token string, since time.Time) *iterator { +func NewIterator(ctx context.Context, capacity int, projectID, token string, since time.Time) *iterator { return &iterator{ gc: buildClient(token), project: projectID, since: since, - capacity: 20, + capacity: capacity, + ctx: ctx, issue: &issueIterator{ index: -1, page: 1, @@ -79,6 +84,9 @@ func (i *iterator) Error() error { } func (i *iterator) getNextIssues() bool { + ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout) + defer cancel() + issues, _, err := i.gc.Issues.ListProjectIssues( i.project, &gitlab.ListProjectIssuesOptions{ @@ -90,6 +98,7 @@ func (i *iterator) getNextIssues() bool { UpdatedAfter: &i.since, Sort: gitlab.String("asc"), }, + gitlab.WithContext(ctx), ) if err != nil { @@ -116,6 +125,10 @@ func (i *iterator) NextIssue() bool { return false } + if i.ctx.Err() != nil { + return false + } + // first query if i.issue.cache == nil { return i.getNextIssues() @@ -135,6 +148,9 @@ func (i *iterator) IssueValue() *gitlab.Issue { } func (i *iterator) getNextNotes() bool { + ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout) + defer cancel() + notes, _, err := i.gc.Notes.ListIssueNotes( i.project, i.IssueValue().IID, @@ -146,6 +162,7 @@ func (i *iterator) getNextNotes() bool { Sort: gitlab.String("asc"), OrderBy: gitlab.String("created_at"), }, + gitlab.WithContext(ctx), ) if err != nil { @@ -171,6 +188,10 @@ func (i *iterator) NextNote() bool { return false } + if i.ctx.Err() != nil { + return false + } + if len(i.note.cache) == 0 { return i.getNextNotes() } @@ -189,6 +210,9 @@ func (i *iterator) NoteValue() *gitlab.Note { } func (i *iterator) getNextLabelEvents() bool { + ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout) + defer cancel() + labelEvents, _, err := i.gc.ResourceLabelEvents.ListIssueLabelEvents( i.project, i.IssueValue().IID, @@ -198,6 +222,7 @@ func (i *iterator) getNextLabelEvents() bool { PerPage: i.capacity, }, }, + gitlab.WithContext(ctx), ) if err != nil { @@ -224,6 +249,10 @@ func (i *iterator) NextLabelEvent() bool { return false } + if i.ctx.Err() != nil { + return false + } + if len(i.labelEvent.cache) == 0 { return i.getNextLabelEvents() } diff --git a/bridge/launchpad/import.go b/bridge/launchpad/import.go index 7ef11416..7f50d898 100644 --- a/bridge/launchpad/import.go +++ b/bridge/launchpad/import.go @@ -1,11 +1,10 @@ package launchpad import ( + "context" "fmt" "time" - "github.com/pkg/errors" - "github.com/MichaelMure/git-bug/bridge/core" "github.com/MichaelMure/git-bug/bug" "github.com/MichaelMure/git-bug/cache" @@ -45,98 +44,116 @@ func (li *launchpadImporter) ensurePerson(repo *cache.RepoCache, owner LPPerson) ) } -func (li *launchpadImporter) ImportAll(repo *cache.RepoCache, since time.Time) error { +func (li *launchpadImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) { + out := make(chan core.ImportResult) lpAPI := new(launchpadAPI) err := lpAPI.Init() if err != nil { - return err + return nil, err } - lpBugs, err := lpAPI.SearchTasks(li.conf["project"]) + lpBugs, err := lpAPI.SearchTasks(ctx, li.conf["project"]) if err != nil { - return err + return nil, err } - for _, lpBug := range lpBugs { - var b *cache.BugCache - var err error - - lpBugID := fmt.Sprintf("%d", lpBug.ID) - b, err = repo.ResolveBugCreateMetadata(keyLaunchpadID, lpBugID) - if err != nil && err != bug.ErrBugNotExist { - return err - } - - owner, err := li.ensurePerson(repo, lpBug.Owner) - if err != nil { - return err - } - - if err == bug.ErrBugNotExist { - createdAt, _ := time.Parse(time.RFC3339, lpBug.CreatedAt) - b, _, err = repo.NewBugRaw( - owner, - createdAt.Unix(), - lpBug.Title, - lpBug.Description, - nil, - map[string]string{ - keyLaunchpadID: lpBugID, - }, - ) - if err != nil { - return errors.Wrapf(err, "failed to add bug id #%s", lpBugID) + go func() { + for _, lpBug := range lpBugs { + select { + case <-ctx.Done(): + return + default: + lpBugID := fmt.Sprintf("%d", lpBug.ID) + b, err := repo.ResolveBugCreateMetadata(keyLaunchpadID, lpBugID) + if err != nil && err != bug.ErrBugNotExist { + out <- core.NewImportError(err, entity.Id(lpBugID)) + return + } + + owner, err := li.ensurePerson(repo, lpBug.Owner) + if err != nil { + out <- core.NewImportError(err, entity.Id(lpBugID)) + return + } + + if err == bug.ErrBugNotExist { + createdAt, _ := time.Parse(time.RFC3339, lpBug.CreatedAt) + b, _, err = repo.NewBugRaw( + owner, + createdAt.Unix(), + lpBug.Title, + lpBug.Description, + nil, + map[string]string{ + keyLaunchpadID: lpBugID, + }, + ) + if err != nil { + out <- core.NewImportError(err, entity.Id(lpBugID)) + return + } + + out <- core.NewImportBug(b.Id()) + + } + + /* Handle messages */ + if len(lpBug.Messages) == 0 { + err := fmt.Sprintf("bug doesn't have any comments") + out <- core.NewImportNothing(entity.Id(lpBugID), err) + return + } + + // The Launchpad API returns the bug description as the first + // comment, so skip it. + for _, lpMessage := range lpBug.Messages[1:] { + _, err := b.ResolveOperationWithMetadata(keyLaunchpadID, lpMessage.ID) + if err != nil && err != cache.ErrNoMatchingOp { + out <- core.NewImportError(err, entity.Id(lpMessage.ID)) + return + } + + // If this comment already exists, we are probably + // updating an existing bug. We do not want to duplicate + // the comments, so let us just skip this one. + // TODO: Can Launchpad comments be edited? + if err == nil { + continue + } + + owner, err := li.ensurePerson(repo, lpMessage.Owner) + if err != nil { + out <- core.NewImportError(err, "") + return + } + + // This is a new comment, we can add it. + createdAt, _ := time.Parse(time.RFC3339, lpMessage.CreatedAt) + op, err := b.AddCommentRaw( + owner, + createdAt.Unix(), + lpMessage.Content, + nil, + map[string]string{ + keyLaunchpadID: lpMessage.ID, + }) + if err != nil { + out <- core.NewImportError(err, op.Id()) + return + } + + out <- core.NewImportComment(op.Id()) + } + + err = b.CommitAsNeeded() + if err != nil { + out <- core.NewImportError(err, "") + return + } } - } else { - /* TODO: Update bug */ - fmt.Println("TODO: Update bug") } + }() - /* Handle messages */ - if len(lpBug.Messages) == 0 { - return errors.Wrapf(err, "failed to fetch comments for bug #%s", lpBugID) - } - - // The Launchpad API returns the bug description as the first - // comment, so skip it. - for _, lpMessage := range lpBug.Messages[1:] { - _, err := b.ResolveOperationWithMetadata(keyLaunchpadID, lpMessage.ID) - if err != nil && err != cache.ErrNoMatchingOp { - return errors.Wrapf(err, "failed to fetch comments for bug #%s", lpBugID) - } - - // If this comment already exists, we are probably - // updating an existing bug. We do not want to duplicate - // the comments, so let us just skip this one. - // TODO: Can Launchpad comments be edited? - if err == nil { - continue - } - - owner, err := li.ensurePerson(repo, lpMessage.Owner) - if err != nil { - return err - } - - // This is a new comment, we can add it. - createdAt, _ := time.Parse(time.RFC3339, lpMessage.CreatedAt) - _, err = b.AddCommentRaw( - owner, - createdAt.Unix(), - lpMessage.Content, - nil, - map[string]string{ - keyLaunchpadID: lpMessage.ID, - }) - if err != nil { - return errors.Wrapf(err, "failed to add comment to bug #%s", lpBugID) - } - } - err = b.CommitAsNeeded() - if err != nil { - return err - } - } - return nil + return out, nil } diff --git a/bridge/launchpad/launchpad_api.go b/bridge/launchpad/launchpad_api.go index 8cafa241..763e774e 100644 --- a/bridge/launchpad/launchpad_api.go +++ b/bridge/launchpad/launchpad_api.go @@ -14,6 +14,7 @@ package launchpad */ import ( + "context" "encoding/json" "fmt" "net/http" @@ -33,43 +34,6 @@ type LPPerson struct { // https://api.launchpad.net/devel/~login var personCache = make(map[string]LPPerson) -func (owner *LPPerson) UnmarshalJSON(data []byte) error { - type LPPersonX LPPerson // Avoid infinite recursion - var ownerLink string - if err := json.Unmarshal(data, &ownerLink); err != nil { - return err - } - - // First, try to gather info about the bug owner using our cache. - if cachedPerson, hasKey := personCache[ownerLink]; hasKey { - *owner = cachedPerson - return nil - } - - // If the bug owner is not already known, we have to send a request. - req, err := http.NewRequest("GET", ownerLink, nil) - if err != nil { - return nil - } - - client := &http.Client{} - resp, err := client.Do(req) - if err != nil { - return nil - } - - defer resp.Body.Close() - - var p LPPersonX - if err := json.NewDecoder(resp.Body).Decode(&p); err != nil { - return nil - } - *owner = LPPerson(p) - // Do not forget to update the cache. - personCache[ownerLink] = *owner - return nil -} - // LPBug describes a Launchpad bug. type LPBug struct { Title string `json:"title"` @@ -109,11 +73,13 @@ type launchpadAPI struct { } func (lapi *launchpadAPI) Init() error { - lapi.client = &http.Client{} + lapi.client = &http.Client{ + Timeout: defaultTimeout, + } return nil } -func (lapi *launchpadAPI) SearchTasks(project string) ([]LPBug, error) { +func (lapi *launchpadAPI) SearchTasks(ctx context.Context, project string) ([]LPBug, error) { var bugs []LPBug // First, let us build the URL. Not all statuses are included by @@ -153,7 +119,7 @@ func (lapi *launchpadAPI) SearchTasks(project string) ([]LPBug, error) { } for _, bugEntry := range result.Entries { - bug, err := lapi.queryBug(bugEntry.BugLink) + bug, err := lapi.queryBug(ctx, bugEntry.BugLink) if err == nil { bugs = append(bugs, bug) } @@ -170,13 +136,14 @@ func (lapi *launchpadAPI) SearchTasks(project string) ([]LPBug, error) { return bugs, nil } -func (lapi *launchpadAPI) queryBug(url string) (LPBug, error) { +func (lapi *launchpadAPI) queryBug(ctx context.Context, url string) (LPBug, error) { var bug LPBug req, err := http.NewRequest("GET", url, nil) if err != nil { return bug, err } + req = req.WithContext(ctx) resp, err := lapi.client.Do(req) if err != nil { @@ -191,7 +158,7 @@ func (lapi *launchpadAPI) queryBug(url string) (LPBug, error) { /* Fetch messages */ messagesCollectionLink := fmt.Sprintf("%s/bugs/%d/messages", apiRoot, bug.ID) - messages, err := lapi.queryMessages(messagesCollectionLink) + messages, err := lapi.queryMessages(ctx, messagesCollectionLink) if err != nil { return bug, err } @@ -200,7 +167,7 @@ func (lapi *launchpadAPI) queryBug(url string) (LPBug, error) { return bug, nil } -func (lapi *launchpadAPI) queryMessages(messagesURL string) ([]LPMessage, error) { +func (lapi *launchpadAPI) queryMessages(ctx context.Context, messagesURL string) ([]LPMessage, error) { var messages []LPMessage for { @@ -208,6 +175,7 @@ func (lapi *launchpadAPI) queryMessages(messagesURL string) ([]LPMessage, error) if err != nil { return nil, err } + req = req.WithContext(ctx) resp, err := lapi.client.Do(req) if err != nil { diff --git a/cache/repo_cache.go b/cache/repo_cache.go index d6e8857d..107a4876 100644 --- a/cache/repo_cache.go +++ b/cache/repo_cache.go @@ -172,18 +172,10 @@ func (c *RepoCache) lock() error { } func (c *RepoCache) Close() error { - for id := range c.identities { - delete(c.identities, id) - } - for id := range c.identitiesExcerpts { - delete(c.identitiesExcerpts, id) - } - for id := range c.bugs { - delete(c.bugs, id) - } - for id := range c.bugExcerpts { - delete(c.bugExcerpts, id) - } + c.identities = make(map[entity.Id]*IdentityCache) + c.identitiesExcerpts = nil + c.bugs = make(map[entity.Id]*BugCache) + c.bugExcerpts = nil lockPath := repoLockFilePath(c.repo) return os.Remove(lockPath) diff --git a/commands/bridge_pull.go b/commands/bridge_pull.go index 2edabfaf..0f3b8413 100644 --- a/commands/bridge_pull.go +++ b/commands/bridge_pull.go @@ -1,6 +1,10 @@ package commands import ( + "context" + "fmt" + "os" + "sync" "time" "github.com/spf13/cobra" @@ -31,12 +35,59 @@ func runBridgePull(cmd *cobra.Command, args []string) error { return err } + parentCtx := context.Background() + ctx, cancel := context.WithCancel(parentCtx) + defer cancel() + + // buffered channel to avoid send block at the end + done := make(chan struct{}, 1) + + var mu sync.Mutex + interruptCount := 0 + interrupt.RegisterCleaner(func() error { + mu.Lock() + if interruptCount > 0 { + fmt.Println("Received another interrupt before graceful stop, terminating...") + os.Exit(0) + } + + interruptCount++ + mu.Unlock() + + fmt.Println("Received interrupt signal, stopping the import...\n(Hit ctrl-c again to kill the process.)") + + // send signal to stop the importer + cancel() + + // block until importer gracefully shutdown + <-done + return nil + }) + // TODO: by default import only new events - err = b.ImportAll(time.Time{}) + events, err := b.ImportAll(ctx, time.Time{}) if err != nil { return err } + importedIssues := 0 + importedIdentities := 0 + for result := range events { + fmt.Println(result.String()) + + switch result.Event { + case core.ImportEventBug: + importedIssues++ + case core.ImportEventIdentity: + importedIdentities++ + } + } + + // send done signal + close(done) + + fmt.Printf("Successfully imported %d issues and %d identities with %s bridge\n", importedIssues, importedIdentities, b.Name) + return nil } diff --git a/commands/bridge_push.go b/commands/bridge_push.go index 11f5ca82..77fe8b29 100644 --- a/commands/bridge_push.go +++ b/commands/bridge_push.go @@ -1,7 +1,10 @@ package commands import ( + "context" "fmt" + "os" + "sync" "time" "github.com/spf13/cobra" @@ -32,20 +35,55 @@ func runBridgePush(cmd *cobra.Command, args []string) error { return err } + parentCtx := context.Background() + ctx, cancel := context.WithCancel(parentCtx) + defer cancel() + + done := make(chan struct{}, 1) + + var mu sync.Mutex + interruptCount := 0 + interrupt.RegisterCleaner(func() error { + mu.Lock() + if interruptCount > 0 { + fmt.Println("Received another interrupt before graceful stop, terminating...") + os.Exit(0) + } + + interruptCount++ + mu.Unlock() + + fmt.Println("Received interrupt signal, stopping the import...\n(Hit ctrl-c again to kill the process.)") + + // send signal to stop the importer + cancel() + + // block until importer gracefully shutdown + <-done + return nil + }) + // TODO: by default export only new events - out, err := b.ExportAll(time.Time{}) + events, err := b.ExportAll(ctx, time.Time{}) if err != nil { return err } - for result := range out { - if result.Err != nil { - fmt.Println(result.Err, result.Reason) - } else { - fmt.Printf("%s: %s\n", result.String(), result.ID) + exportedIssues := 0 + for result := range events { + fmt.Println(result.String()) + + switch result.Event { + case core.ExportEventBug: + exportedIssues++ } } + // send done signal + close(done) + + fmt.Printf("Successfully exported %d issues with %s bridge\n", exportedIssues, b.Name) + return nil } diff --git a/entity/merge.go b/entity/merge.go index 7c3e71c8..3ce8edac 100644 --- a/entity/merge.go +++ b/entity/merge.go @@ -13,6 +13,7 @@ const ( MergeStatusInvalid MergeStatusUpdated MergeStatusNothing + MergeStatusError ) type MergeResult struct { @@ -39,6 +40,8 @@ func (mr MergeResult) String() string { return "updated" case MergeStatusNothing: return "nothing to do" + case MergeStatusError: + return fmt.Sprintf("merge error on %s: %s", mr.Id, mr.Err.Error()) default: panic("unknown merge status") } @@ -46,8 +49,9 @@ func (mr MergeResult) String() string { func NewMergeError(err error, id Id) MergeResult { return MergeResult{ - Err: err, - Id: id, + Err: err, + Id: id, + Status: MergeStatusError, } } -- cgit