aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAmine Hilaly <hilalyamine@gmail.com>2019-08-13 19:51:14 +0200
committerAmine Hilaly <hilalyamine@gmail.com>2019-08-18 00:14:22 +0200
commit5ca326af83b90531d4d0c502bb1beabbe1b48c55 (patch)
tree6b7a32f2db9ab7321e9965c0ef4c715c6c517178
parent6428352bd14828f670206b60862de7f71c52d235 (diff)
downloadgit-bug-5ca326af83b90531d4d0c502bb1beabbe1b48c55.tar.gz
bridge/core: add context.Context to ImportAll and ExportAll signatures
bridge/core: add ImportResult objects to stream import events bridge/core: launchpad support asynchronous import bridge/github: cancellable export and import functions bridge/gitlab: cancellable export and import functions commands: bridge pull/push gracefull kill bridge/github: fix github import bridge/github: use simple context for imports bridge/core: name parameters in interfaces github/core: Add EventError to export and import events types bridge/gitlab: add context support in gitlab requests functions bridge/gitlab: remove imported events count from importer logic bridge/github: remove imported events count from importer logic bridge/github: add context support in query and muration requets bridge/github: fix bug duplicate editions after multiple calls bridge/core: import import and export events String methods bridge/gitlab: fix error handling in note import events commands/bridge: Add statistics about imports and exports bridge/gitlab: properly handle context cancellation bridge/github: improve error handling bridge: break iterators on context cancel or timeout bridge: add context timeout support bridge: improve event formating and error handling commands: handle interrupt and switch cases bridge/github: add export mutation timeouts bridge: fix race condition bug in the github and gitlab importers bridge/github: improve context error handling
-rw-r--r--bridge/core/bridge.go15
-rw-r--r--bridge/core/export.go24
-rw-r--r--bridge/core/import.go128
-rw-r--r--bridge/core/interfaces.go5
-rw-r--r--bridge/github/export.go140
-rw-r--r--bridge/github/export_test.go12
-rw-r--r--bridge/github/import.go199
-rw-r--r--bridge/github/import_test.go8
-rw-r--r--bridge/github/iterator.go58
-rw-r--r--bridge/gitlab/import.go156
-rw-r--r--bridge/gitlab/import_notes.go39
-rw-r--r--bridge/gitlab/import_test.go9
-rw-r--r--bridge/gitlab/iterator.go33
-rw-r--r--bridge/launchpad/import.go185
-rw-r--r--bridge/launchpad/launchpad_api.go54
-rw-r--r--cache/repo_cache.go16
-rw-r--r--commands/bridge_pull.go53
-rw-r--r--commands/bridge_push.go50
-rw-r--r--entity/merge.go8
19 files changed, 802 insertions, 390 deletions
diff --git a/bridge/core/bridge.go b/bridge/core/bridge.go
index 645dac3d..9161b418 100644
--- a/bridge/core/bridge.go
+++ b/bridge/core/bridge.go
@@ -2,6 +2,7 @@
package core
import (
+ "context"
"fmt"
"reflect"
"regexp"
@@ -289,26 +290,26 @@ func (b *Bridge) ensureInit() error {
return nil
}
-func (b *Bridge) ImportAll(since time.Time) error {
+func (b *Bridge) ImportAll(ctx context.Context, since time.Time) (<-chan ImportResult, error) {
importer := b.getImporter()
if importer == nil {
- return ErrImportNotSupported
+ return nil, ErrImportNotSupported
}
err := b.ensureConfig()
if err != nil {
- return err
+ return nil, err
}
err = b.ensureInit()
if err != nil {
- return err
+ return nil, err
}
- return importer.ImportAll(b.repo, since)
+ return importer.ImportAll(ctx, b.repo, since)
}
-func (b *Bridge) ExportAll(since time.Time) (<-chan ExportResult, error) {
+func (b *Bridge) ExportAll(ctx context.Context, since time.Time) (<-chan ExportResult, error) {
exporter := b.getExporter()
if exporter == nil {
return nil, ErrExportNotSupported
@@ -324,5 +325,5 @@ func (b *Bridge) ExportAll(since time.Time) (<-chan ExportResult, error) {
return nil, err
}
- return exporter.ExportAll(b.repo, since)
+ return exporter.ExportAll(ctx, b.repo, since)
}
diff --git a/bridge/core/export.go b/bridge/core/export.go
index 09566b62..55cf5a60 100644
--- a/bridge/core/export.go
+++ b/bridge/core/export.go
@@ -17,6 +17,7 @@ const (
ExportEventTitleEdition
ExportEventLabelChange
ExportEventNothing
+ ExportEventError
)
// ExportResult is an event that is emitted during the export process, to
@@ -32,19 +33,28 @@ type ExportResult struct {
func (er ExportResult) String() string {
switch er.Event {
case ExportEventBug:
- return "new issue"
+ return fmt.Sprintf("new issue: %s", er.ID)
case ExportEventComment:
- return "new comment"
+ return fmt.Sprintf("new comment: %s", er.ID)
case ExportEventCommentEdition:
- return "updated comment"
+ return fmt.Sprintf("updated comment: %s", er.ID)
case ExportEventStatusChange:
- return "changed status"
+ return fmt.Sprintf("changed status: %s", er.ID)
case ExportEventTitleEdition:
- return "changed title"
+ return fmt.Sprintf("changed title: %s", er.ID)
case ExportEventLabelChange:
- return "changed label"
+ return fmt.Sprintf("changed label: %s", er.ID)
case ExportEventNothing:
- return fmt.Sprintf("no event: %v", er.Reason)
+ if er.ID != "" {
+ return fmt.Sprintf("ignoring export event %s: %s", er.ID, er.Reason)
+ }
+ return fmt.Sprintf("ignoring export event: %s", er.Reason)
+ case ExportEventError:
+ if er.ID != "" {
+ return fmt.Sprintf("export error at %s: %s", er.ID, er.Err.Error())
+ }
+ return fmt.Sprintf("export error: %s", er.Err.Error())
+
default:
panic("unknown export result")
}
diff --git a/bridge/core/import.go b/bridge/core/import.go
new file mode 100644
index 00000000..0961e00b
--- /dev/null
+++ b/bridge/core/import.go
@@ -0,0 +1,128 @@
+package core
+
+import (
+ "fmt"
+
+ "github.com/MichaelMure/git-bug/entity"
+)
+
+type ImportEvent int
+
+const (
+ _ ImportEvent = iota
+ ImportEventBug
+ ImportEventComment
+ ImportEventCommentEdition
+ ImportEventStatusChange
+ ImportEventTitleEdition
+ ImportEventLabelChange
+ ImportEventIdentity
+ ImportEventNothing
+ ImportEventError
+)
+
+// ImportResult is an event that is emitted during the import process, to
+// allow calling code to report on what is happening, collect metrics or
+// display meaningful errors if something went wrong.
+type ImportResult struct {
+ Err error
+ Event ImportEvent
+ ID entity.Id
+ Reason string
+}
+
+func (er ImportResult) String() string {
+ switch er.Event {
+ case ImportEventBug:
+ return fmt.Sprintf("new issue: %s", er.ID)
+ case ImportEventComment:
+ return fmt.Sprintf("new comment: %s", er.ID)
+ case ImportEventCommentEdition:
+ return fmt.Sprintf("updated comment: %s", er.ID)
+ case ImportEventStatusChange:
+ return fmt.Sprintf("changed status: %s", er.ID)
+ case ImportEventTitleEdition:
+ return fmt.Sprintf("changed title: %s", er.ID)
+ case ImportEventLabelChange:
+ return fmt.Sprintf("changed label: %s", er.ID)
+ case ImportEventIdentity:
+ return fmt.Sprintf("new identity: %s", er.ID)
+ case ImportEventNothing:
+ if er.ID != "" {
+ return fmt.Sprintf("ignoring import event %s: %s", er.ID, er.Reason)
+ }
+ return fmt.Sprintf("ignoring event: %s", er.Reason)
+ case ImportEventError:
+ if er.ID != "" {
+ return fmt.Sprintf("import error at id %s: %s", er.ID, er.Err.Error())
+ }
+ return fmt.Sprintf("import error: %s", er.Err.Error())
+ default:
+ panic("unknown import result")
+ }
+}
+
+func NewImportError(err error, id entity.Id) ImportResult {
+ return ImportResult{
+ Err: err,
+ ID: id,
+ Event: ImportEventError,
+ }
+}
+
+func NewImportNothing(id entity.Id, reason string) ImportResult {
+ return ImportResult{
+ ID: id,
+ Reason: reason,
+ Event: ImportEventNothing,
+ }
+}
+
+func NewImportBug(id entity.Id) ImportResult {
+ return ImportResult{
+ ID: id,
+ Event: ImportEventBug,
+ }
+}
+
+func NewImportComment(id entity.Id) ImportResult {
+ return ImportResult{
+ ID: id,
+ Event: ImportEventComment,
+ }
+}
+
+func NewImportCommentEdition(id entity.Id) ImportResult {
+ return ImportResult{
+ ID: id,
+ Event: ImportEventCommentEdition,
+ }
+}
+
+func NewImportStatusChange(id entity.Id) ImportResult {
+ return ImportResult{
+ ID: id,
+ Event: ImportEventStatusChange,
+ }
+}
+
+func NewImportLabelChange(id entity.Id) ImportResult {
+ return ImportResult{
+ ID: id,
+ Event: ImportEventLabelChange,
+ }
+}
+
+func NewImportTitleEdition(id entity.Id) ImportResult {
+ return ImportResult{
+ ID: id,
+ Event: ImportEventTitleEdition,
+ }
+}
+
+func NewImportIdentity(id entity.Id) ImportResult {
+ return ImportResult{
+ ID: id,
+ Event: ImportEventIdentity,
+ }
+}
diff --git a/bridge/core/interfaces.go b/bridge/core/interfaces.go
index 76d66fb4..047f3880 100644
--- a/bridge/core/interfaces.go
+++ b/bridge/core/interfaces.go
@@ -1,6 +1,7 @@
package core
import (
+ "context"
"time"
"github.com/MichaelMure/git-bug/cache"
@@ -29,10 +30,10 @@ type BridgeImpl interface {
type Importer interface {
Init(conf Configuration) error
- ImportAll(repo *cache.RepoCache, since time.Time) error
+ ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan ImportResult, error)
}
type Exporter interface {
Init(conf Configuration) error
- ExportAll(repo *cache.RepoCache, since time.Time) (<-chan ExportResult, error)
+ ExportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan ExportResult, error)
}
diff --git a/bridge/github/export.go b/bridge/github/export.go
index 34c88310..3aecdce0 100644
--- a/bridge/github/export.go
+++ b/bridge/github/export.go
@@ -79,7 +79,7 @@ func (ge *githubExporter) getIdentityClient(id entity.Id) (*githubv4.Client, err
}
// ExportAll export all event made by the current user to Github
-func (ge *githubExporter) ExportAll(repo *cache.RepoCache, since time.Time) (<-chan core.ExportResult, error) {
+func (ge *githubExporter) ExportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ExportResult, error) {
out := make(chan core.ExportResult)
user, err := repo.GetUserIdentity()
@@ -91,6 +91,7 @@ func (ge *githubExporter) ExportAll(repo *cache.RepoCache, since time.Time) (<-c
// get repository node id
ge.repositoryID, err = getRepositoryNodeID(
+ ctx,
ge.conf[keyOwner],
ge.conf[keyProject],
ge.conf[keyToken],
@@ -117,20 +118,28 @@ func (ge *githubExporter) ExportAll(repo *cache.RepoCache, since time.Time) (<-c
return
}
- snapshot := b.Snapshot()
+ select {
- // ignore issues created before since date
- // TODO: compare the Lamport time instead of using the unix time
- if snapshot.CreatedAt.Before(since) {
- out <- core.NewExportNothing(b.Id(), "bug created before the since date")
- continue
- }
+ case <-ctx.Done():
+ // stop iterating if context cancel function is called
+ return
- if snapshot.HasAnyActor(allIdentitiesIds...) {
- // try to export the bug and it associated events
- ge.exportBug(b, since, out)
- } else {
- out <- core.NewExportNothing(id, "not an actor")
+ default:
+ snapshot := b.Snapshot()
+
+ // ignore issues created before since date
+ // TODO: compare the Lamport time instead of using the unix time
+ if snapshot.CreatedAt.Before(since) {
+ out <- core.NewExportNothing(b.Id(), "bug created before the since date")
+ continue
+ }
+
+ if snapshot.HasAnyActor(allIdentitiesIds...) {
+ // try to export the bug and it associated events
+ ge.exportBug(ctx, b, since, out)
+ } else {
+ out <- core.NewExportNothing(id, "not an actor")
+ }
}
}
}()
@@ -139,7 +148,7 @@ func (ge *githubExporter) ExportAll(repo *cache.RepoCache, since time.Time) (<-c
}
// exportBug publish bugs and related events
-func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan<- core.ExportResult) {
+func (ge *githubExporter) exportBug(ctx context.Context, b *cache.BugCache, since time.Time, out chan<- core.ExportResult) {
snapshot := b.Snapshot()
var bugGithubID string
@@ -199,7 +208,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan
}
// create bug
- id, url, err := createGithubIssue(client, ge.repositoryID, createOp.Title, createOp.Message)
+ id, url, err := createGithubIssue(ctx, client, ge.repositoryID, createOp.Title, createOp.Message)
if err != nil {
err := errors.Wrap(err, "exporting github issue")
out <- core.NewExportError(err, b.Id())
@@ -257,7 +266,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan
opr := op.(*bug.AddCommentOperation)
// send operation to github
- id, url, err = addCommentGithubIssue(client, bugGithubID, opr.Message)
+ id, url, err = addCommentGithubIssue(ctx, client, bugGithubID, opr.Message)
if err != nil {
err := errors.Wrap(err, "adding comment")
out <- core.NewExportError(err, b.Id())
@@ -277,7 +286,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan
if opr.Target == createOp.Id() {
// case bug creation operation: we need to edit the Github issue
- if err := updateGithubIssueBody(client, bugGithubID, opr.Message); err != nil {
+ if err := updateGithubIssueBody(ctx, client, bugGithubID, opr.Message); err != nil {
err := errors.Wrap(err, "editing issue")
out <- core.NewExportError(err, b.Id())
return
@@ -296,7 +305,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan
panic("unexpected error: comment id not found")
}
- eid, eurl, err := editCommentGithubIssue(client, commentID, opr.Message)
+ eid, eurl, err := editCommentGithubIssue(ctx, client, commentID, opr.Message)
if err != nil {
err := errors.Wrap(err, "editing comment")
out <- core.NewExportError(err, b.Id())
@@ -312,7 +321,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan
case *bug.SetStatusOperation:
opr := op.(*bug.SetStatusOperation)
- if err := updateGithubIssueStatus(client, bugGithubID, opr.Status); err != nil {
+ if err := updateGithubIssueStatus(ctx, client, bugGithubID, opr.Status); err != nil {
err := errors.Wrap(err, "editing status")
out <- core.NewExportError(err, b.Id())
return
@@ -325,7 +334,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan
case *bug.SetTitleOperation:
opr := op.(*bug.SetTitleOperation)
- if err := updateGithubIssueTitle(client, bugGithubID, opr.Title); err != nil {
+ if err := updateGithubIssueTitle(ctx, client, bugGithubID, opr.Title); err != nil {
err := errors.Wrap(err, "editing title")
out <- core.NewExportError(err, b.Id())
return
@@ -338,7 +347,7 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan
case *bug.LabelChangeOperation:
opr := op.(*bug.LabelChangeOperation)
- if err := ge.updateGithubIssueLabels(client, bugGithubID, opr.Added, opr.Removed); err != nil {
+ if err := ge.updateGithubIssueLabels(ctx, client, bugGithubID, opr.Added, opr.Removed); err != nil {
err := errors.Wrap(err, "updating labels")
out <- core.NewExportError(err, b.Id())
return
@@ -370,12 +379,9 @@ func (ge *githubExporter) exportBug(b *cache.BugCache, since time.Time, out chan
}
// getRepositoryNodeID request github api v3 to get repository node id
-func getRepositoryNodeID(owner, project, token string) (string, error) {
+func getRepositoryNodeID(ctx context.Context, owner, project, token string) (string, error) {
url := fmt.Sprintf("%s/repos/%s/%s", githubV3Url, owner, project)
-
- client := &http.Client{
- Timeout: defaultTimeout,
- }
+ client := &http.Client{}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
@@ -385,6 +391,10 @@ func getRepositoryNodeID(owner, project, token string) (string, error) {
// need the token for private repositories
req.Header.Set("Authorization", fmt.Sprintf("token %s", token))
+ ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
+ defer cancel()
+ req = req.WithContext(ctx)
+
resp, err := client.Do(req)
if err != nil {
return "", err
@@ -425,7 +435,7 @@ func markOperationAsExported(b *cache.BugCache, target entity.Id, githubID, gith
}
// get label from github
-func (ge *githubExporter) getGithubLabelID(gc *githubv4.Client, label string) (string, error) {
+func (ge *githubExporter) getGithubLabelID(ctx context.Context, gc *githubv4.Client, label string) (string, error) {
q := &labelQuery{}
variables := map[string]interface{}{
"label": githubv4.String(label),
@@ -433,8 +443,7 @@ func (ge *githubExporter) getGithubLabelID(gc *githubv4.Client, label string) (s
"name": githubv4.String(ge.conf[keyProject]),
}
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+ ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
if err := gc.Query(ctx, q, variables); err != nil {
@@ -452,12 +461,9 @@ func (ge *githubExporter) getGithubLabelID(gc *githubv4.Client, label string) (s
// create a new label and return it github id
// NOTE: since createLabel mutation is still in preview mode we use github api v3 to create labels
// see https://developer.github.com/v4/mutation/createlabel/ and https://developer.github.com/v4/previews/#labels-preview
-func (ge *githubExporter) createGithubLabel(label, color string) (string, error) {
+func (ge *githubExporter) createGithubLabel(ctx context.Context, label, color string) (string, error) {
url := fmt.Sprintf("%s/repos/%s/%s/labels", githubV3Url, ge.conf[keyOwner], ge.conf[keyProject])
-
- client := &http.Client{
- Timeout: defaultTimeout,
- }
+ client := &http.Client{}
params := struct {
Name string `json:"name"`
@@ -478,6 +484,10 @@ func (ge *githubExporter) createGithubLabel(label, color string) (string, error)
return "", err
}
+ ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
+ defer cancel()
+ req = req.WithContext(ctx)
+
// need the token for private repositories
req.Header.Set("Authorization", fmt.Sprintf("token %s", ge.conf[keyToken]))
@@ -529,9 +539,9 @@ func (ge *githubExporter) createGithubLabelV4(gc *githubv4.Client, label, labelC
}
*/
-func (ge *githubExporter) getOrCreateGithubLabelID(gc *githubv4.Client, repositoryID string, label bug.Label) (string, error) {
+func (ge *githubExporter) getOrCreateGithubLabelID(ctx context.Context, gc *githubv4.Client, repositoryID string, label bug.Label) (string, error) {
// try to get label id
- labelID, err := ge.getGithubLabelID(gc, string(label))
+ labelID, err := ge.getGithubLabelID(ctx, gc, string(label))
if err == nil {
return labelID, nil
}
@@ -540,7 +550,10 @@ func (ge *githubExporter) getOrCreateGithubLabelID(gc *githubv4.Client, reposito
rgba := label.RGBA()
hexColor := fmt.Sprintf("%.2x%.2x%.2x", rgba.R, rgba.G, rgba.B)
- labelID, err = ge.createGithubLabel(string(label), hexColor)
+ ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
+ defer cancel()
+
+ labelID, err = ge.createGithubLabel(ctx, string(label), hexColor)
if err != nil {
return "", err
}
@@ -548,7 +561,7 @@ func (ge *githubExporter) getOrCreateGithubLabelID(gc *githubv4.Client, reposito
return labelID, nil
}
-func (ge *githubExporter) getLabelsIDs(gc *githubv4.Client, repositoryID string, labels []bug.Label) ([]githubv4.ID, error) {
+func (ge *githubExporter) getLabelsIDs(ctx context.Context, gc *githubv4.Client, repositoryID string, labels []bug.Label) ([]githubv4.ID, error) {
ids := make([]githubv4.ID, 0, len(labels))
var err error
@@ -557,7 +570,7 @@ func (ge *githubExporter) getLabelsIDs(gc *githubv4.Client, repositoryID string,
id, ok := ge.cachedLabels[string(label)]
if !ok {
// try to query label id
- id, err = ge.getOrCreateGithubLabelID(gc, repositoryID, label)
+ id, err = ge.getOrCreateGithubLabelID(ctx, gc, repositoryID, label)
if err != nil {
return nil, errors.Wrap(err, "get or create github label")
}
@@ -573,7 +586,7 @@ func (ge *githubExporter) getLabelsIDs(gc *githubv4.Client, repositoryID string,
}
// create a github issue and return it ID
-func createGithubIssue(gc *githubv4.Client, repositoryID, title, body string) (string, string, error) {
+func createGithubIssue(ctx context.Context, gc *githubv4.Client, repositoryID, title, body string) (string, string, error) {
m := &createIssueMutation{}
input := githubv4.CreateIssueInput{
RepositoryID: repositoryID,
@@ -581,8 +594,7 @@ func createGithubIssue(gc *githubv4.Client, repositoryID, title, body string) (s
Body: (*githubv4.String)(&body),
}
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+ ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
if err := gc.Mutate(ctx, m, input, nil); err != nil {
@@ -594,15 +606,14 @@ func createGithubIssue(gc *githubv4.Client, repositoryID, title, body string) (s
}
// add a comment to an issue and return it ID
-func addCommentGithubIssue(gc *githubv4.Client, subjectID string, body string) (string, string, error) {
+func addCommentGithubIssue(ctx context.Context, gc *githubv4.Client, subjectID string, body string) (string, string, error) {
m := &addCommentToIssueMutation{}
input := githubv4.AddCommentInput{
SubjectID: subjectID,
Body: githubv4.String(body),
}
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+ ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
if err := gc.Mutate(ctx, m, input, nil); err != nil {
@@ -613,15 +624,14 @@ func addCommentGithubIssue(gc *githubv4.Client, subjectID string, body string) (
return node.ID, node.URL, nil
}
-func editCommentGithubIssue(gc *githubv4.Client, commentID, body string) (string, string, error) {
+func editCommentGithubIssue(ctx context.Context, gc *githubv4.Client, commentID, body string) (string, string, error) {
m := &updateIssueCommentMutation{}
input := githubv4.UpdateIssueCommentInput{
ID: commentID,
Body: githubv4.String(body),
}
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+ ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
if err := gc.Mutate(ctx, m, input, nil); err != nil {
@@ -631,7 +641,7 @@ func editCommentGithubIssue(gc *githubv4.Client, commentID, body string) (string
return commentID, m.UpdateIssueComment.IssueComment.URL, nil
}
-func updateGithubIssueStatus(gc *githubv4.Client, id string, status bug.Status) error {
+func updateGithubIssueStatus(ctx context.Context, gc *githubv4.Client, id string, status bug.Status) error {
m := &updateIssueMutation{}
// set state
@@ -651,8 +661,7 @@ func updateGithubIssueStatus(gc *githubv4.Client, id string, status bug.Status)
State: &state,
}
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+ ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
if err := gc.Mutate(ctx, m, input, nil); err != nil {
@@ -662,15 +671,14 @@ func updateGithubIssueStatus(gc *githubv4.Client, id string, status bug.Status)
return nil
}
-func updateGithubIssueBody(gc *githubv4.Client, id string, body string) error {
+func updateGithubIssueBody(ctx context.Context, gc *githubv4.Client, id string, body string) error {
m := &updateIssueMutation{}
input := githubv4.UpdateIssueInput{
ID: id,
Body: (*githubv4.String)(&body),
}
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+ ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
if err := gc.Mutate(ctx, m, input, nil); err != nil {
@@ -680,15 +688,14 @@ func updateGithubIssueBody(gc *githubv4.Client, id string, body string) error {
return nil
}
-func updateGithubIssueTitle(gc *githubv4.Client, id, title string) error {
+func updateGithubIssueTitle(ctx context.Context, gc *githubv4.Client, id, title string) error {
m := &updateIssueMutation{}
input := githubv4.UpdateIssueInput{
ID: id,
Title: (*githubv4.String)(&title),
}
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+ ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
if err := gc.Mutate(ctx, m, input, nil); err != nil {
@@ -699,18 +706,19 @@ func updateGithubIssueTitle(gc *githubv4.Client, id, title string) error {
}
// update github issue labels
-func (ge *githubExporter) updateGithubIssueLabels(gc *githubv4.Client, labelableID string, added, removed []bug.Label) error {
+func (ge *githubExporter) updateGithubIssueLabels(ctx context.Context, gc *githubv4.Client, labelableID string, added, removed []bug.Label) error {
var errs []string
var wg sync.WaitGroup
- parentCtx := context.Background()
+ reqCtx, cancel := context.WithTimeout(ctx, defaultTimeout)
+ defer cancel()
if len(added) > 0 {
wg.Add(1)
go func() {
defer wg.Done()
- addedIDs, err := ge.getLabelsIDs(gc, labelableID, added)
+ addedIDs, err := ge.getLabelsIDs(ctx, gc, labelableID, added)
if err != nil {
errs = append(errs, errors.Wrap(err, "getting added labels ids").Error())
return
@@ -722,11 +730,8 @@ func (ge *githubExporter) updateGithubIssueLabels(gc *githubv4.Client, labelable
LabelIDs: addedIDs,
}
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
- defer cancel()
-
// add labels
- if err := gc.Mutate(ctx, m, inputAdd, nil); err != nil {
+ if err := gc.Mutate(reqCtx, m, inputAdd, nil); err != nil {
errs = append(errs, err.Error())
}
}()
@@ -737,7 +742,7 @@ func (ge *githubExporter) updateGithubIssueLabels(gc *githubv4.Client, labelable
go func() {
defer wg.Done()
- removedIDs, err := ge.getLabelsIDs(gc, labelableID, removed)
+ removedIDs, err := ge.getLabelsIDs(ctx, gc, labelableID, removed)
if err != nil {
errs = append(errs, errors.Wrap(err, "getting added labels ids").Error())
return
@@ -749,11 +754,8 @@ func (ge *githubExporter) updateGithubIssueLabels(gc *githubv4.Client, labelable
LabelIDs: removedIDs,
}
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
- defer cancel()
-
// remove label labels
- if err := gc.Mutate(ctx, m2, inputRemove, nil); err != nil {
+ if err := gc.Mutate(reqCtx, m2, inputRemove, nil); err != nil {
errs = append(errs, err.Error())
}
}()
diff --git a/bridge/github/export_test.go b/bridge/github/export_test.go
index 107fe63b..a0be7cff 100644
--- a/bridge/github/export_test.go
+++ b/bridge/github/export_test.go
@@ -2,6 +2,7 @@ package github
import (
"bytes"
+ "context"
"encoding/json"
"fmt"
"math/rand"
@@ -177,13 +178,14 @@ func TestPushPull(t *testing.T) {
})
require.NoError(t, err)
+ ctx := context.Background()
start := time.Now()
// export all bugs
- events, err := exporter.ExportAll(backend, time.Time{})
+ exportEvents, err := exporter.ExportAll(ctx, backend, time.Time{})
require.NoError(t, err)
- for result := range events {
+ for result := range exportEvents {
require.NoError(t, result.Err)
}
require.NoError(t, err)
@@ -206,9 +208,13 @@ func TestPushPull(t *testing.T) {
require.NoError(t, err)
// import all exported bugs to the second backend
- err = importer.ImportAll(backendTwo, time.Time{})
+ importEvents, err := importer.ImportAll(ctx, backendTwo, time.Time{})
require.NoError(t, err)
+ for result := range importEvents {
+ require.NoError(t, result.Err)
+ }
+
require.Len(t, backendTwo.AllBugsIds(), len(tests))
for _, tt := range tests {
diff --git a/bridge/github/import.go b/bridge/github/import.go
index dcaf2d05..9fef9cb5 100644
--- a/bridge/github/import.go
+++ b/bridge/github/import.go
@@ -28,11 +28,8 @@ type githubImporter struct {
// iterator
iterator *iterator
- // number of imported issues
- importedIssues int
-
- // number of imported identities
- importedIdentities int
+ // send only channel
+ out chan<- core.ImportResult
}
func (gi *githubImporter) Init(conf core.Configuration) error {
@@ -42,40 +39,49 @@ func (gi *githubImporter) Init(conf core.Configuration) error {
// ImportAll iterate over all the configured repository issues and ensure the creation of the
// missing issues / timeline items / edits / label events ...
-func (gi *githubImporter) ImportAll(repo *cache.RepoCache, since time.Time) error {
- gi.iterator = NewIterator(gi.conf[keyOwner], gi.conf[keyProject], gi.conf[keyToken], since)
-
- // Loop over all matching issues
- for gi.iterator.NextIssue() {
- issue := gi.iterator.IssueValue()
- fmt.Printf("importing issue: %v\n", issue.Title)
+func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) {
+ gi.iterator = NewIterator(ctx, 10, gi.conf[keyOwner], gi.conf[keyProject], gi.conf[keyToken], since)
+ out := make(chan core.ImportResult)
+ gi.out = out
+
+ go func() {
+ defer close(gi.out)
+
+ // Loop over all matching issues
+ for gi.iterator.NextIssue() {
+ issue := gi.iterator.IssueValue()
+ // create issue
+ b, err := gi.ensureIssue(repo, issue)
+ if err != nil {
+ err := fmt.Errorf("issue creation: %v", err)
+ out <- core.NewImportError(err, "")
+ return
+ }
- // create issue
- b, err := gi.ensureIssue(repo, issue)
- if err != nil {
- return fmt.Errorf("issue creation: %v", err)
- }
+ // loop over timeline items
+ for gi.iterator.NextTimelineItem() {
+ item := gi.iterator.TimelineItemValue()
+ if err := gi.ensureTimelineItem(repo, b, item); err != nil {
+ err := fmt.Errorf("timeline item creation: %v", err)
+ out <- core.NewImportError(err, "")
+ return
+ }
+ }
- // loop over timeline items
- for gi.iterator.NextTimelineItem() {
- if err := gi.ensureTimelineItem(repo, b, gi.iterator.TimelineItemValue()); err != nil {
- return fmt.Errorf("timeline item creation: %v", err)
+ // commit bug state
+ if err := b.CommitAsNeeded(); err != nil {
+ err = fmt.Errorf("bug commit: %v", err)
+ out <- core.NewImportError(err, "")
+ return
}
}
- // commit bug state
- if err := b.CommitAsNeeded(); err != nil {
- return fmt.Errorf("bug commit: %v", err)
+ if err := gi.iterator.Error(); err != nil && err != context.Canceled {
+ gi.out <- core.NewImportError(err, "")
}
- }
+ }()
- if err := gi.iterator.Error(); err != nil {
- fmt.Printf("import error: %v\n", err)
- return err
- }
-
- fmt.Printf("Successfully imported %d issues and %d identities from Github\n", gi.importedIssues, gi.importedIdentities)
- return nil
+ return out, nil
}
func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline) (*cache.BugCache, error) {
@@ -122,7 +128,9 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline
}
// importing a new bug
- gi.importedIssues++
+ gi.out <- core.NewImportBug(b.Id())
+ } else {
+ gi.out <- core.NewImportNothing("", "bug already imported")
}
} else {
@@ -130,6 +138,7 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline
for i, edit := range issueEdits {
if i == 0 && b != nil {
// The first edit in the github result is the issue creation itself, we already have that
+ gi.out <- core.NewImportNothing("", "bug already imported")
continue
}
@@ -159,13 +168,12 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline
}
// importing a new bug
- gi.importedIssues++
-
+ gi.out <- core.NewImportBug(b.Id())
continue
}
// other edits will be added as CommentEdit operations
- target, err := b.ResolveOperationWithMetadata(keyGithubUrl, issue.Url.String())
+ target, err := b.ResolveOperationWithMetadata(keyGithubId, parseId(issue.Id))
if err != nil {
return nil, err
}
@@ -181,16 +189,16 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline
}
func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.BugCache, item timelineItem) error {
- fmt.Printf("import event item: %s\n", item.Typename)
switch item.Typename {
case "IssueComment":
// collect all comment edits
- commentEdits := []userContentEdit{}
+ var commentEdits []userContentEdit
for gi.iterator.NextCommentEdit() {
commentEdits = append(commentEdits, gi.iterator.CommentEditValue())
}
+ // ensureTimelineComment send import events over out chanel
err := gi.ensureTimelineComment(repo, b, item.IssueComment, commentEdits)
if err != nil {
return fmt.Errorf("timeline comment creation: %v", err)
@@ -199,6 +207,12 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
case "LabeledEvent":
id := parseId(item.LabeledEvent.Id)
_, err := b.ResolveOperationWithMetadata(keyGithubId, id)
+ if err == nil {
+ reason := fmt.Sprintf("operation already imported: %v", item.Typename)
+ gi.out <- core.NewImportNothing("", reason)
+ return nil
+ }
+
if err != cache.ErrNoMatchingOp {
return err
}
@@ -206,7 +220,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
if err != nil {
return err
}
- _, err = b.ForceChangeLabelsRaw(
+ op, err := b.ForceChangeLabelsRaw(
author,
item.LabeledEvent.CreatedAt.Unix(),
[]string{
@@ -215,12 +229,21 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
nil,
map[string]string{keyGithubId: id},
)
+ if err != nil {
+ return err
+ }
- return err
+ gi.out <- core.NewImportLabelChange(op.Id())
+ return nil
case "UnlabeledEvent":
id := parseId(item.UnlabeledEvent.Id)
_, err := b.ResolveOperationWithMetadata(keyGithubId, id)
+ if err == nil {
+ reason := fmt.Sprintf("operation already imported: %v", item.Typename)
+ gi.out <- core.NewImportNothing("", reason)
+ return nil
+ }
if err != cache.ErrNoMatchingOp {
return err
}
@@ -229,7 +252,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
return err
}
- _, err = b.ForceChangeLabelsRaw(
+ op, err := b.ForceChangeLabelsRaw(
author,
item.UnlabeledEvent.CreatedAt.Unix(),
nil,
@@ -238,7 +261,12 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
},
map[string]string{keyGithubId: id},
)
- return err
+ if err != nil {
+ return err
+ }
+
+ gi.out <- core.NewImportLabelChange(op.Id())
+ return nil
case "ClosedEvent":
id := parseId(item.ClosedEvent.Id)
@@ -246,16 +274,27 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
if err != cache.ErrNoMatchingOp {
return err
}
+ if err == nil {
+ reason := fmt.Sprintf("operation already imported: %v", item.Typename)
+ gi.out <- core.NewImportNothing("", reason)
+ return nil
+ }
author, err := gi.ensurePerson(repo, item.ClosedEvent.Actor)
if err != nil {
return err
}
- _, err = b.CloseRaw(
+ op, err := b.CloseRaw(
author,
item.ClosedEvent.CreatedAt.Unix(),
map[string]string{keyGithubId: id},
)
- return err
+
+ if err != nil {
+ return err
+ }
+
+ gi.out <- core.NewImportStatusChange(op.Id())
+ return nil
case "ReopenedEvent":
id := parseId(item.ReopenedEvent.Id)
@@ -263,16 +302,27 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
if err != cache.ErrNoMatchingOp {
return err
}
+ if err == nil {
+ reason := fmt.Sprintf("operation already imported: %v", item.Typename)
+ gi.out <- core.NewImportNothing("", reason)
+ return nil
+ }
author, err := gi.ensurePerson(repo, item.ReopenedEvent.Actor)
if err != nil {
return err
}
- _, err = b.OpenRaw(
+ op, err := b.OpenRaw(
author,
item.ReopenedEvent.CreatedAt.Unix(),
map[string]string{keyGithubId: id},
)
- return err
+
+ if err != nil {
+ return err
+ }
+
+ gi.out <- core.NewImportStatusChange(op.Id())
+ return nil
case "RenamedTitleEvent":
id := parseId(item.RenamedTitleEvent.Id)
@@ -280,20 +330,31 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
if err != cache.ErrNoMatchingOp {
return err
}
+ if err == nil {
+ reason := fmt.Sprintf("operation already imported: %v", item.Typename)
+ gi.out <- core.NewImportNothing("", reason)
+ return nil
+ }
author, err := gi.ensurePerson(repo, item.RenamedTitleEvent.Actor)
if err != nil {
return err
}
- _, err = b.SetTitleRaw(
+ op, err := b.SetTitleRaw(
author,
item.RenamedTitleEvent.CreatedAt.Unix(),
string(item.RenamedTitleEvent.CurrentTitle),
map[string]string{keyGithubId: id},
)
- return err
+ if err != nil {
+ return err
+ }
+
+ gi.out <- core.NewImportTitleEdition(op.Id())
+ return nil
default:
- fmt.Printf("ignore event: %v\n", item.Typename)
+ reason := fmt.Sprintf("ignoring timeline type: %v", item.Typename)
+ gi.out <- core.NewImportNothing("", reason)
}
return nil
@@ -307,14 +368,16 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache.
}
targetOpID, err := b.ResolveOperationWithMetadata(keyGithubId, parseId(item.Id))
- if err != nil && err != cache.ErrNoMatchingOp {
+ if err == nil {
+ reason := fmt.Sprintf("comment already imported")
+ gi.out <- core.NewImportNothing("", reason)
+ } else if err != cache.ErrNoMatchingOp {
// real error
return err
}
// if no edits are given we create the comment
if len(edits) == 0 {
- // if comment doesn't exist
if err == cache.ErrNoMatchingOp {
cleanText, err := text.Cleanup(string(item.Body))
if err != nil {
@@ -322,7 +385,7 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache.
}
// add comment operation
- _, err = b.AddCommentRaw(
+ op, err := b.AddCommentRaw(
author,
item.CreatedAt.Unix(),
cleanText,
@@ -332,12 +395,18 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache.
keyGithubUrl: parseId(item.Url.String()),
},
)
- return err
+ if err != nil {
+ return err
+ }
+
+ gi.out <- core.NewImportComment(op.Id())
}
+
} else {
for i, edit := range edits {
if i == 0 && targetOpID != "" {
// The first edit in the github result is the comment creation itself, we already have that
+ gi.out <- core.NewImportNothing("", "comment already imported")
continue
}
@@ -370,7 +439,6 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache.
// set target for the nexr edit now that the comment is created
targetOpID = op.Id()
-
continue
}
@@ -386,7 +454,7 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache.
func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit userContentEdit) error {
_, err := b.ResolveOperationWithMetadata(keyGithubId, parseId(edit.Id))
if err == nil {
- // already imported
+ gi.out <- core.NewImportNothing(b.Id(), "edition already imported")
return nil
}
if err != cache.ErrNoMatchingOp {
@@ -394,8 +462,6 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC
return err
}
- fmt.Println("import edition")
-
editor, err := gi.ensurePerson(repo, edit.Editor)
if err != nil {
return err
@@ -404,7 +470,7 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC
switch {
case edit.DeletedAt != nil:
// comment deletion, not supported yet
- fmt.Println("comment deletion is not supported yet")
+ gi.out <- core.NewImportNothing(b.Id(), "comment deletion is not supported yet")
case edit.DeletedAt == nil:
@@ -414,7 +480,7 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC
}
// comment edition
- _, err = b.EditCommentRaw(
+ op, err := b.EditCommentRaw(
editor,
edit.CreatedAt.Unix(),
target,
@@ -427,6 +493,8 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC
if err != nil {
return err
}
+
+ gi.out <- core.NewImportCommentEdition(op.Id())
}
return nil
@@ -450,7 +518,6 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca
}
// importing a new identity
- gi.importedIdentities++
var name string
var email string
@@ -471,7 +538,7 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca
case "Bot":
}
- return repo.NewIdentityRaw(
+ i, err = repo.NewIdentityRaw(
name,
email,
string(actor.Login),
@@ -480,6 +547,13 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca
keyGithubLogin: string(actor.Login),
},
)
+
+ if err != nil {
+ return nil, err
+ }
+
+ gi.out <- core.NewImportIdentity(i.Id())
+ return i, nil
}
func (gi *githubImporter) getGhost(repo *cache.RepoCache) (*cache.IdentityCache, error) {
@@ -500,8 +574,7 @@ func (gi *githubImporter) getGhost(repo *cache.RepoCache) (*cache.IdentityCache,
gc := buildClient(gi.conf[keyToken])
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+ ctx, cancel := context.WithTimeout(gi.iterator.ctx, defaultTimeout)
defer cancel()
err = gc.Query(ctx, &q, variables)
diff --git a/bridge/github/import_test.go b/bridge/github/import_test.go
index 5ba87993..41bcb58d 100644
--- a/bridge/github/import_test.go
+++ b/bridge/github/import_test.go
@@ -1,6 +1,7 @@
package github
import (
+ "context"
"fmt"
"os"
"testing"
@@ -146,11 +147,16 @@ func Test_Importer(t *testing.T) {
})
require.NoError(t, err)
+ ctx := context.Background()
start := time.Now()
- err = importer.ImportAll(backend, time.Time{})
+ events, err := importer.ImportAll(ctx, backend, time.Time{})
require.NoError(t, err)
+ for result := range events {
+ require.NoError(t, result.Err)
+ }
+
fmt.Printf("test repository imported in %f seconds\n", time.Since(start).Seconds())
require.Len(t, backend.AllBugsIds(), len(tests))
diff --git a/bridge/github/iterator.go b/bridge/github/iterator.go
index 6d63cf42..a97ed036 100644
--- a/bridge/github/iterator.go
+++ b/bridge/github/iterator.go
@@ -46,6 +46,9 @@ type iterator struct {
// to make
capacity int
+ // shared context used for all graphql queries
+ ctx context.Context
+
// sticky error
err error
@@ -60,11 +63,12 @@ type iterator struct {
}
// NewIterator create and initialize a new iterator
-func NewIterator(owner, project, token string, since time.Time) *iterator {
+func NewIterator(ctx context.Context, capacity int, owner, project, token string, since time.Time) *iterator {
i := &iterator{
gc: buildClient(token),
since: since,
- capacity: 10,
+ capacity: capacity,
+ ctx: ctx,
timeline: timelineIterator{
index: -1,
issueEdit: indexer{-1},
@@ -147,8 +151,7 @@ func (i *iterator) Error() error {
}
func (i *iterator) queryIssue() bool {
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+ ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout)
defer cancel()
if err := i.gc.Query(ctx, &i.timeline.query, i.timeline.variables); err != nil {
@@ -167,6 +170,10 @@ func (i *iterator) queryIssue() bool {
// NextIssue try to query the next issue and return true. Only one issue is
// queried at each call.
func (i *iterator) NextIssue() bool {
+ if i.err != nil {
+ return false
+ }
+
// if $issueAfter variable is nil we can directly make the first query
if i.timeline.variables["issueAfter"] == (*githubv4.String)(nil) {
nextIssue := i.queryIssue()
@@ -175,10 +182,6 @@ func (i *iterator) NextIssue() bool {
return nextIssue
}
- if i.err != nil {
- return false
- }
-
if !i.timeline.query.Repository.Issues.PageInfo.HasNextPage {
return false
}
@@ -207,11 +210,15 @@ func (i *iterator) NextTimelineItem() bool {
return false
}
+ if i.ctx.Err() != nil {
+ return false
+ }
+
if len(i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges) == 0 {
return false
}
- if i.timeline.index < min(i.capacity, len(i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges))-1 {
+ if i.timeline.index < len(i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges)-1 {
i.timeline.index++
return true
}
@@ -225,8 +232,7 @@ func (i *iterator) NextTimelineItem() bool {
// more timelines, query them
i.timeline.variables["timelineAfter"] = i.timeline.query.Repository.Issues.Nodes[0].Timeline.PageInfo.EndCursor
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+ ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout)
defer cancel()
if err := i.gc.Query(ctx, &i.timeline.query, i.timeline.variables); err != nil {
@@ -245,8 +251,7 @@ func (i *iterator) TimelineItemValue() timelineItem {
}
func (i *iterator) queryIssueEdit() bool {
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+ ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout)
defer cancel()
if err := i.gc.Query(ctx, &i.issueEdit.query, i.issueEdit.variables); err != nil {
@@ -285,10 +290,14 @@ func (i *iterator) NextIssueEdit() bool {
return false
}
+ if i.ctx.Err() != nil {
+ return false
+ }
+
// this mean we looped over all available issue edits in the timeline.
// now we have to use i.issueEditQuery
if i.timeline.issueEdit.index == -2 {
- if i.issueEdit.index < min(i.capacity, len(i.issueEdit.query.Repository.Issues.Nodes[0].UserContentEdits.Nodes))-1 {
+ if i.issueEdit.index < len(i.issueEdit.query.Repository.Issues.Nodes[0].UserContentEdits.Nodes)-1 {
i.issueEdit.index++
return i.nextValidIssueEdit()
}
@@ -319,7 +328,7 @@ func (i *iterator) NextIssueEdit() bool {
}
// loop over them timeline comment edits
- if i.timeline.issueEdit.index < min(i.capacity, len(i.timeline.query.Repository.Issues.Nodes[0].UserContentEdits.Nodes))-1 {
+ if i.timeline.issueEdit.index < len(i.timeline.query.Repository.Issues.Nodes[0].UserContentEdits.Nodes)-1 {
i.timeline.issueEdit.index++
return i.nextValidIssueEdit()
}
@@ -347,8 +356,7 @@ func (i *iterator) IssueEditValue() userContentEdit {
}
func (i *iterator) queryCommentEdit() bool {
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+ ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout)
defer cancel()
if err := i.gc.Query(ctx, &i.commentEdit.query, i.commentEdit.variables); err != nil {
@@ -384,10 +392,14 @@ func (i *iterator) NextCommentEdit() bool {
return false
}
+ if i.ctx.Err() != nil {
+ return false
+ }
+
// same as NextIssueEdit
if i.timeline.commentEdit.index == -2 {
- if i.commentEdit.index < min(i.capacity, len(i.commentEdit.query.Repository.Issues.Nodes[0].Timeline.Nodes[0].IssueComment.UserContentEdits.Nodes))-1 {
+ if i.commentEdit.index < len(i.commentEdit.query.Repository.Issues.Nodes[0].Timeline.Nodes[0].IssueComment.UserContentEdits.Nodes)-1 {
i.commentEdit.index++
return i.nextValidCommentEdit()
}
@@ -409,7 +421,7 @@ func (i *iterator) NextCommentEdit() bool {
}
// loop over them timeline comment edits
- if i.timeline.commentEdit.index < min(i.capacity, len(i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges[i.timeline.index].Node.IssueComment.UserContentEdits.Nodes))-1 {
+ if i.timeline.commentEdit.index < len(i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges[i.timeline.index].Node.IssueComment.UserContentEdits.Nodes)-1 {
i.timeline.commentEdit.index++
return i.nextValidCommentEdit()
}
@@ -440,14 +452,6 @@ func (i *iterator) CommentEditValue() userContentEdit {
return i.timeline.query.Repository.Issues.Nodes[0].Timeline.Edges[i.timeline.index].Node.IssueComment.UserContentEdits.Nodes[i.timeline.commentEdit.index]
}
-func min(a, b int) int {
- if a > b {
- return b
- }
-
- return a
-}
-
func reverseEdits(edits []userContentEdit) {
for i, j := 0, len(edits)-1; i < j; i, j = i+1, j-1 {
edits[i], edits[j] = edits[j], edits[i]
diff --git a/bridge/gitlab/import.go b/bridge/gitlab/import.go
index e135b8bc..40ac06d3 100644
--- a/bridge/gitlab/import.go
+++ b/bridge/gitlab/import.go
@@ -1,6 +1,7 @@
package gitlab
import (
+ "context"
"fmt"
"strconv"
"time"
@@ -21,11 +22,8 @@ type gitlabImporter struct {
// iterator
iterator *iterator
- // number of imported issues
- importedIssues int
-
- // number of imported identities
- importedIdentities int
+ // send only channel
+ out chan<- core.ImportResult
}
func (gi *gitlabImporter) Init(conf core.Configuration) error {
@@ -35,49 +33,60 @@ func (gi *gitlabImporter) Init(conf core.Configuration) error {
// ImportAll iterate over all the configured repository issues (notes) and ensure the creation
// of the missing issues / comments / label events / title changes ...
-func (gi *gitlabImporter) ImportAll(repo *cache.RepoCache, since time.Time) error {
- gi.iterator = NewIterator(gi.conf[keyProjectID], gi.conf[keyToken], since)
-
- // Loop over all matching issues
- for gi.iterator.NextIssue() {
- issue := gi.iterator.IssueValue()
- fmt.Printf("importing issue: %v\n", issue.Title)
+func (gi *gitlabImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) {
+ gi.iterator = NewIterator(ctx, 10, gi.conf[keyProjectID], gi.conf[keyToken], since)
+ out := make(chan core.ImportResult)
+ gi.out = out
+
+ go func() {
+ defer close(gi.out)
+
+ // Loop over all matching issues
+ for gi.iterator.NextIssue() {
+ issue := gi.iterator.IssueValue()
+
+ // create issue
+ b, err := gi.ensureIssue(repo, issue)
+ if err != nil {
+ err := fmt.Errorf("issue creation: %v", err)
+ out <- core.NewImportError(err, "")
+ return
+ }
- // create issue
- b, err := gi.ensureIssue(repo, issue)
- if err != nil {
- return fmt.Errorf("issue creation: %v", err)
- }
+ // Loop over all notes
+ for gi.iterator.NextNote() {
+ note := gi.iterator.NoteValue()
+ if err := gi.ensureNote(repo, b, note); err != nil {
+ err := fmt.Errorf("note creation: %v", err)
+ out <- core.NewImportError(err, entity.Id(strconv.Itoa(note.ID)))
+ return
+ }
+ }
- // Loop over all notes
- for gi.iterator.NextNote() {
- note := gi.iterator.NoteValue()
- if err := gi.ensureNote(repo, b, note); err != nil {
- return fmt.Errorf("note creation: %v", err)
+ // Loop over all label events
+ for gi.iterator.NextLabelEvent() {
+ labelEvent := gi.iterator.LabelEventValue()
+ if err := gi.ensureLabelEvent(repo, b, labelEvent); err != nil {
+ err := fmt.Errorf("label event creation: %v", err)
+ out <- core.NewImportError(err, entity.Id(strconv.Itoa(labelEvent.ID)))
+ return
+ }
}
- }
- // Loop over all label events
- for gi.iterator.NextLabelEvent() {
- labelEvent := gi.iterator.LabelEventValue()
- if err := gi.ensureLabelEvent(repo, b, labelEvent); err != nil {
- return fmt.Errorf("label event creation: %v", err)
+ // commit bug state
+ if err := b.CommitAsNeeded(); err != nil {
+ err := fmt.Errorf("bug commit: %v", err)
+ out <- core.NewImportError(err, "")
+ return
}
}
if err := gi.iterator.Error(); err != nil {
- fmt.Printf("import error: %v\n", err)
- return err
+ out <- core.NewImportError(err, "")
}
+ }()
- // commit bug state
- if err := b.CommitAsNeeded(); err != nil {
- return fmt.Errorf("bug commit: %v", err)
- }
- }
-
- fmt.Printf("Successfully imported %d issues and %d identities from Gitlab\n", gi.importedIssues, gi.importedIdentities)
- return nil
+ return out, nil
}
func (gi *gitlabImporter) ensureIssue(repo *cache.RepoCache, issue *gitlab.Issue) (*cache.BugCache, error) {
@@ -89,13 +98,14 @@ func (gi *gitlabImporter) ensureIssue(repo *cache.RepoCache, issue *gitlab.Issue
// resolve bug
b, err := repo.ResolveBugCreateMetadata(keyGitlabUrl, issue.WebURL)
- if err != nil && err != bug.ErrBugNotExist {
- return nil, err
- }
-
if err == nil {
+ reason := fmt.Sprintf("bug already imported")
+ gi.out <- core.NewImportNothing("", reason)
return b, nil
}
+ if err != bug.ErrBugNotExist {
+ return nil, err
+ }
// if bug was never imported
cleanText, err := text.Cleanup(issue.Description)
@@ -123,7 +133,7 @@ func (gi *gitlabImporter) ensureIssue(repo *cache.RepoCache, issue *gitlab.Issue
}
// importing a new bug
- gi.importedIssues++
+ gi.out <- core.NewImportBug(b.Id())
return b, nil
}
@@ -149,28 +159,36 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n
return nil
}
- _, err = b.CloseRaw(
+ op, err := b.CloseRaw(
author,
note.CreatedAt.Unix(),
map[string]string{
keyGitlabId: gitlabID,
},
)
- return err
+ if err != nil {
+ return err
+ }
+
+ gi.out <- core.NewImportStatusChange(op.Id())
case NOTE_REOPENED:
if errResolve == nil {
return nil
}
- _, err = b.OpenRaw(
+ op, err := b.OpenRaw(
author,
note.CreatedAt.Unix(),
map[string]string{
keyGitlabId: gitlabID,
},
)
- return err
+ if err != nil {
+ return err
+ }
+
+ gi.out <- core.NewImportStatusChange(op.Id())
case NOTE_DESCRIPTION_CHANGED:
issue := gi.iterator.IssueValue()
@@ -181,7 +199,7 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n
// TODO: Check only one time and ignore next 'description change' within one issue
if errResolve == cache.ErrNoMatchingOp && issue.Description != firstComment.Message {
// comment edition
- _, err = b.EditCommentRaw(
+ op, err := b.EditCommentRaw(
author,
note.UpdatedAt.Unix(),
firstComment.Id(),
@@ -190,8 +208,11 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n
keyGitlabId: gitlabID,
},
)
+ if err != nil {
+ return err
+ }
- return err
+ gi.out <- core.NewImportTitleEdition(op.Id())
}
case NOTE_COMMENT:
@@ -204,7 +225,7 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n
if errResolve == cache.ErrNoMatchingOp {
// add comment operation
- _, err = b.AddCommentRaw(
+ op, err := b.AddCommentRaw(
author,
note.CreatedAt.Unix(),
cleanText,
@@ -213,8 +234,11 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n
keyGitlabId: gitlabID,
},
)
-
- return err
+ if err != nil {
+ return err
+ }
+ gi.out <- core.NewImportComment(op.Id())
+ return nil
}
// if comment was already exported
@@ -228,7 +252,7 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n
// compare local bug comment with the new note body
if comment.Message != cleanText {
// comment edition
- _, err = b.EditCommentRaw(
+ op, err := b.EditCommentRaw(
author,
note.UpdatedAt.Unix(),
comment.Id(),
@@ -236,7 +260,10 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n
nil,
)
- return err
+ if err != nil {
+ return err
+ }
+ gi.out <- core.NewImportCommentEdition(op.Id())
}
return nil
@@ -247,7 +274,7 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n
return nil
}
- _, err = b.SetTitleRaw(
+ op, err := b.SetTitleRaw(
author,
note.CreatedAt.Unix(),
body,
@@ -255,8 +282,11 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n
keyGitlabId: gitlabID,
},
)
+ if err != nil {
+ return err
+ }
- return err
+ gi.out <- core.NewImportTitleEdition(op.Id())
case NOTE_UNKNOWN,
NOTE_ASSIGNED,
@@ -269,6 +299,9 @@ func (gi *gitlabImporter) ensureNote(repo *cache.RepoCache, b *cache.BugCache, n
NOTE_UNLOCKED,
NOTE_MENTIONED_IN_ISSUE,
NOTE_MENTIONED_IN_MERGE_REQUEST:
+
+ reason := fmt.Sprintf("unsupported note type: %s", noteType.String())
+ gi.out <- core.NewImportNothing("", reason)
return nil
default:
@@ -337,10 +370,7 @@ func (gi *gitlabImporter) ensurePerson(repo *cache.RepoCache, id int) (*cache.Id
return nil, err
}
- // importing a new identity
- gi.importedIdentities++
-
- return repo.NewIdentityRaw(
+ i, err = repo.NewIdentityRaw(
user.Name,
user.PublicEmail,
user.Username,
@@ -351,6 +381,12 @@ func (gi *gitlabImporter) ensurePerson(repo *cache.RepoCache, id int) (*cache.Id
keyGitlabLogin: user.Username,
},
)
+ if err != nil {
+ return nil, err
+ }
+
+ gi.out <- core.NewImportIdentity(i.Id())
+ return i, nil
}
func parseID(id int) string {
diff --git a/bridge/gitlab/import_notes.go b/bridge/gitlab/import_notes.go
index c0796037..b38cb371 100644
--- a/bridge/gitlab/import_notes.go
+++ b/bridge/gitlab/import_notes.go
@@ -28,6 +28,45 @@ const (
NOTE_UNKNOWN
)
+func (nt NoteType) String() string {
+ switch nt {
+ case NOTE_COMMENT:
+ return "note comment"
+ case NOTE_TITLE_CHANGED:
+ return "note title changed"
+ case NOTE_DESCRIPTION_CHANGED:
+ return "note description changed"
+ case NOTE_CLOSED:
+ return "note closed"
+ case NOTE_REOPENED:
+ return "note reopened"
+ case NOTE_LOCKED:
+ return "note locked"
+ case NOTE_UNLOCKED:
+ return "note unlocked"
+ case NOTE_CHANGED_DUEDATE:
+ return "note changed duedate"
+ case NOTE_REMOVED_DUEDATE:
+ return "note remove duedate"
+ case NOTE_ASSIGNED:
+ return "note assigned"
+ case NOTE_UNASSIGNED:
+ return "note unassigned"
+ case NOTE_CHANGED_MILESTONE:
+ return "note changed milestone"
+ case NOTE_REMOVED_MILESTONE:
+ return "note removed in milestone"
+ case NOTE_MENTIONED_IN_ISSUE:
+ return "note mentioned in issue"
+ case NOTE_MENTIONED_IN_MERGE_REQUEST:
+ return "note mentioned in merge request"
+ case NOTE_UNKNOWN:
+ return "note unknown"
+ default:
+ panic("unknown note type")
+ }
+}
+
// GetNoteType parse a note system and body and return the note type and it content
func GetNoteType(n *gitlab.Note) (NoteType, string) {
// when a note is a comment system is set to false
diff --git a/bridge/gitlab/import_test.go b/bridge/gitlab/import_test.go
index c38d3ce3..20fc67c7 100644
--- a/bridge/gitlab/import_test.go
+++ b/bridge/gitlab/import_test.go
@@ -1,6 +1,7 @@
package gitlab
import (
+ "context"
"fmt"
"os"
"testing"
@@ -99,10 +100,16 @@ func TestImport(t *testing.T) {
})
require.NoError(t, err)
+ ctx := context.Background()
start := time.Now()
- err = importer.ImportAll(backend, time.Time{})
+
+ events, err := importer.ImportAll(ctx, backend, time.Time{})
require.NoError(t, err)
+ for result := range events {
+ require.NoError(t, result.Err)
+ }
+
fmt.Printf("test repository imported in %f seconds\n", time.Since(start).Seconds())
require.Len(t, backend.AllBugsIds(), len(tests))
diff --git a/bridge/gitlab/iterator.go b/bridge/gitlab/iterator.go
index 883fea9c..198af79b 100644
--- a/bridge/gitlab/iterator.go
+++ b/bridge/gitlab/iterator.go
@@ -1,6 +1,7 @@
package gitlab
import (
+ "context"
"time"
"github.com/xanzy/go-gitlab"
@@ -38,6 +39,9 @@ type iterator struct {
// number of issues and notes to query at once
capacity int
+ // shared context
+ ctx context.Context
+
// sticky error
err error
@@ -52,12 +56,13 @@ type iterator struct {
}
// NewIterator create a new iterator
-func NewIterator(projectID, token string, since time.Time) *iterator {
+func NewIterator(ctx context.Context, capacity int, projectID, token string, since time.Time) *iterator {
return &iterator{
gc: buildClient(token),
project: projectID,
since: since,
- capacity: 20,
+ capacity: capacity,
+ ctx: ctx,
issue: &issueIterator{
index: -1,
page: 1,
@@ -79,6 +84,9 @@ func (i *iterator) Error() error {
}
func (i *iterator) getNextIssues() bool {
+ ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout)
+ defer cancel()
+
issues, _, err := i.gc.Issues.ListProjectIssues(
i.project,
&gitlab.ListProjectIssuesOptions{
@@ -90,6 +98,7 @@ func (i *iterator) getNextIssues() bool {
UpdatedAfter: &i.since,
Sort: gitlab.String("asc"),
},
+ gitlab.WithContext(ctx),
)
if err != nil {
@@ -116,6 +125,10 @@ func (i *iterator) NextIssue() bool {
return false
}
+ if i.ctx.Err() != nil {
+ return false
+ }
+
// first query
if i.issue.cache == nil {
return i.getNextIssues()
@@ -135,6 +148,9 @@ func (i *iterator) IssueValue() *gitlab.Issue {
}
func (i *iterator) getNextNotes() bool {
+ ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout)
+ defer cancel()
+
notes, _, err := i.gc.Notes.ListIssueNotes(
i.project,
i.IssueValue().IID,
@@ -146,6 +162,7 @@ func (i *iterator) getNextNotes() bool {
Sort: gitlab.String("asc"),
OrderBy: gitlab.String("created_at"),
},
+ gitlab.WithContext(ctx),
)
if err != nil {
@@ -171,6 +188,10 @@ func (i *iterator) NextNote() bool {
return false
}
+ if i.ctx.Err() != nil {
+ return false
+ }
+
if len(i.note.cache) == 0 {
return i.getNextNotes()
}
@@ -189,6 +210,9 @@ func (i *iterator) NoteValue() *gitlab.Note {
}
func (i *iterator) getNextLabelEvents() bool {
+ ctx, cancel := context.WithTimeout(i.ctx, defaultTimeout)
+ defer cancel()
+
labelEvents, _, err := i.gc.ResourceLabelEvents.ListIssueLabelEvents(
i.project,
i.IssueValue().IID,
@@ -198,6 +222,7 @@ func (i *iterator) getNextLabelEvents() bool {
PerPage: i.capacity,
},
},
+ gitlab.WithContext(ctx),
)
if err != nil {
@@ -224,6 +249,10 @@ func (i *iterator) NextLabelEvent() bool {
return false
}
+ if i.ctx.Err() != nil {
+ return false
+ }
+
if len(i.labelEvent.cache) == 0 {
return i.getNextLabelEvents()
}
diff --git a/bridge/launchpad/import.go b/bridge/launchpad/import.go
index 7ef11416..7f50d898 100644
--- a/bridge/launchpad/import.go
+++ b/bridge/launchpad/import.go
@@ -1,11 +1,10 @@
package launchpad
import (
+ "context"
"fmt"
"time"
- "github.com/pkg/errors"
-
"github.com/MichaelMure/git-bug/bridge/core"
"github.com/MichaelMure/git-bug/bug"
"github.com/MichaelMure/git-bug/cache"
@@ -45,98 +44,116 @@ func (li *launchpadImporter) ensurePerson(repo *cache.RepoCache, owner LPPerson)
)
}
-func (li *launchpadImporter) ImportAll(repo *cache.RepoCache, since time.Time) error {
+func (li *launchpadImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) {
+ out := make(chan core.ImportResult)
lpAPI := new(launchpadAPI)
err := lpAPI.Init()
if err != nil {
- return err
+ return nil, err
}
- lpBugs, err := lpAPI.SearchTasks(li.conf["project"])
+ lpBugs, err := lpAPI.SearchTasks(ctx, li.conf["project"])
if err != nil {
- return err
+ return nil, err
}
- for _, lpBug := range lpBugs {
- var b *cache.BugCache
- var err error
-
- lpBugID := fmt.Sprintf("%d", lpBug.ID)
- b, err = repo.ResolveBugCreateMetadata(keyLaunchpadID, lpBugID)
- if err != nil && err != bug.ErrBugNotExist {
- return err
- }
-
- owner, err := li.ensurePerson(repo, lpBug.Owner)
- if err != nil {
- return err
- }
-
- if err == bug.ErrBugNotExist {
- createdAt, _ := time.Parse(time.RFC3339, lpBug.CreatedAt)
- b, _, err = repo.NewBugRaw(
- owner,
- createdAt.Unix(),
- lpBug.Title,
- lpBug.Description,
- nil,
- map[string]string{
- keyLaunchpadID: lpBugID,
- },
- )
- if err != nil {
- return errors.Wrapf(err, "failed to add bug id #%s", lpBugID)
+ go func() {
+ for _, lpBug := range lpBugs {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ lpBugID := fmt.Sprintf("%d", lpBug.ID)
+ b, err := repo.ResolveBugCreateMetadata(keyLaunchpadID, lpBugID)
+ if err != nil && err != bug.ErrBugNotExist {
+ out <- core.NewImportError(err, entity.Id(lpBugID))
+ return
+ }
+
+ owner, err := li.ensurePerson(repo, lpBug.Owner)
+ if err != nil {
+ out <- core.NewImportError(err, entity.Id(lpBugID))
+ return
+ }
+
+ if err == bug.ErrBugNotExist {
+ createdAt, _ := time.Parse(time.RFC3339, lpBug.CreatedAt)
+ b, _, err = repo.NewBugRaw(
+ owner,
+ createdAt.Unix(),
+ lpBug.Title,
+ lpBug.Description,
+ nil,
+ map[string]string{
+ keyLaunchpadID: lpBugID,
+ },
+ )
+ if err != nil {
+ out <- core.NewImportError(err, entity.Id(lpBugID))
+ return
+ }
+
+ out <- core.NewImportBug(b.Id())
+
+ }
+
+ /* Handle messages */
+ if len(lpBug.Messages) == 0 {
+ err := fmt.Sprintf("bug doesn't have any comments")
+ out <- core.NewImportNothing(entity.Id(lpBugID), err)
+ return
+ }
+
+ // The Launchpad API returns the bug description as the first
+ // comment, so skip it.
+ for _, lpMessage := range lpBug.Messages[1:] {
+ _, err := b.ResolveOperationWithMetadata(keyLaunchpadID, lpMessage.ID)
+ if err != nil && err != cache.ErrNoMatchingOp {
+ out <- core.NewImportError(err, entity.Id(lpMessage.ID))
+ return
+ }
+
+ // If this comment already exists, we are probably
+ // updating an existing bug. We do not want to duplicate
+ // the comments, so let us just skip this one.
+ // TODO: Can Launchpad comments be edited?
+ if err == nil {
+ continue
+ }
+
+ owner, err := li.ensurePerson(repo, lpMessage.Owner)
+ if err != nil {
+ out <- core.NewImportError(err, "")
+ return
+ }
+
+ // This is a new comment, we can add it.
+ createdAt, _ := time.Parse(time.RFC3339, lpMessage.CreatedAt)
+ op, err := b.AddCommentRaw(
+ owner,
+ createdAt.Unix(),
+ lpMessage.Content,
+ nil,
+ map[string]string{
+ keyLaunchpadID: lpMessage.ID,
+ })
+ if err != nil {
+ out <- core.NewImportError(err, op.Id())
+ return
+ }
+
+ out <- core.NewImportComment(op.Id())
+ }
+
+ err = b.CommitAsNeeded()
+ if err != nil {
+ out <- core.NewImportError(err, "")
+ return
+ }
}
- } else {
- /* TODO: Update bug */
- fmt.Println("TODO: Update bug")
}
+ }()
- /* Handle messages */
- if len(lpBug.Messages) == 0 {
- return errors.Wrapf(err, "failed to fetch comments for bug #%s", lpBugID)
- }
-
- // The Launchpad API returns the bug description as the first
- // comment, so skip it.
- for _, lpMessage := range lpBug.Messages[1:] {
- _, err := b.ResolveOperationWithMetadata(keyLaunchpadID, lpMessage.ID)
- if err != nil && err != cache.ErrNoMatchingOp {
- return errors.Wrapf(err, "failed to fetch comments for bug #%s", lpBugID)
- }
-
- // If this comment already exists, we are probably
- // updating an existing bug. We do not want to duplicate
- // the comments, so let us just skip this one.
- // TODO: Can Launchpad comments be edited?
- if err == nil {
- continue
- }
-
- owner, err := li.ensurePerson(repo, lpMessage.Owner)
- if err != nil {
- return err
- }
-
- // This is a new comment, we can add it.
- createdAt, _ := time.Parse(time.RFC3339, lpMessage.CreatedAt)
- _, err = b.AddCommentRaw(
- owner,
- createdAt.Unix(),
- lpMessage.Content,
- nil,
- map[string]string{
- keyLaunchpadID: lpMessage.ID,
- })
- if err != nil {
- return errors.Wrapf(err, "failed to add comment to bug #%s", lpBugID)
- }
- }
- err = b.CommitAsNeeded()
- if err != nil {
- return err
- }
- }
- return nil
+ return out, nil
}
diff --git a/bridge/launchpad/launchpad_api.go b/bridge/launchpad/launchpad_api.go
index 8cafa241..763e774e 100644
--- a/bridge/launchpad/launchpad_api.go
+++ b/bridge/launchpad/launchpad_api.go
@@ -14,6 +14,7 @@ package launchpad
*/
import (
+ "context"
"encoding/json"
"fmt"
"net/http"
@@ -33,43 +34,6 @@ type LPPerson struct {
// https://api.launchpad.net/devel/~login
var personCache = make(map[string]LPPerson)
-func (owner *LPPerson) UnmarshalJSON(data []byte) error {
- type LPPersonX LPPerson // Avoid infinite recursion
- var ownerLink string
- if err := json.Unmarshal(data, &ownerLink); err != nil {
- return err
- }
-
- // First, try to gather info about the bug owner using our cache.
- if cachedPerson, hasKey := personCache[ownerLink]; hasKey {
- *owner = cachedPerson
- return nil
- }
-
- // If the bug owner is not already known, we have to send a request.
- req, err := http.NewRequest("GET", ownerLink, nil)
- if err != nil {
- return nil
- }
-
- client := &http.Client{}
- resp, err := client.Do(req)
- if err != nil {
- return nil
- }
-
- defer resp.Body.Close()
-
- var p LPPersonX
- if err := json.NewDecoder(resp.Body).Decode(&p); err != nil {
- return nil
- }
- *owner = LPPerson(p)
- // Do not forget to update the cache.
- personCache[ownerLink] = *owner
- return nil
-}
-
// LPBug describes a Launchpad bug.
type LPBug struct {
Title string `json:"title"`
@@ -109,11 +73,13 @@ type launchpadAPI struct {
}
func (lapi *launchpadAPI) Init() error {
- lapi.client = &http.Client{}
+ lapi.client = &http.Client{
+ Timeout: defaultTimeout,
+ }
return nil
}
-func (lapi *launchpadAPI) SearchTasks(project string) ([]LPBug, error) {
+func (lapi *launchpadAPI) SearchTasks(ctx context.Context, project string) ([]LPBug, error) {
var bugs []LPBug
// First, let us build the URL. Not all statuses are included by
@@ -153,7 +119,7 @@ func (lapi *launchpadAPI) SearchTasks(project string) ([]LPBug, error) {
}
for _, bugEntry := range result.Entries {
- bug, err := lapi.queryBug(bugEntry.BugLink)
+ bug, err := lapi.queryBug(ctx, bugEntry.BugLink)
if err == nil {
bugs = append(bugs, bug)
}
@@ -170,13 +136,14 @@ func (lapi *launchpadAPI) SearchTasks(project string) ([]LPBug, error) {
return bugs, nil
}
-func (lapi *launchpadAPI) queryBug(url string) (LPBug, error) {
+func (lapi *launchpadAPI) queryBug(ctx context.Context, url string) (LPBug, error) {
var bug LPBug
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return bug, err
}
+ req = req.WithContext(ctx)
resp, err := lapi.client.Do(req)
if err != nil {
@@ -191,7 +158,7 @@ func (lapi *launchpadAPI) queryBug(url string) (LPBug, error) {
/* Fetch messages */
messagesCollectionLink := fmt.Sprintf("%s/bugs/%d/messages", apiRoot, bug.ID)
- messages, err := lapi.queryMessages(messagesCollectionLink)
+ messages, err := lapi.queryMessages(ctx, messagesCollectionLink)
if err != nil {
return bug, err
}
@@ -200,7 +167,7 @@ func (lapi *launchpadAPI) queryBug(url string) (LPBug, error) {
return bug, nil
}
-func (lapi *launchpadAPI) queryMessages(messagesURL string) ([]LPMessage, error) {
+func (lapi *launchpadAPI) queryMessages(ctx context.Context, messagesURL string) ([]LPMessage, error) {
var messages []LPMessage
for {
@@ -208,6 +175,7 @@ func (lapi *launchpadAPI) queryMessages(messagesURL string) ([]LPMessage, error)
if err != nil {
return nil, err
}
+ req = req.WithContext(ctx)
resp, err := lapi.client.Do(req)
if err != nil {
diff --git a/cache/repo_cache.go b/cache/repo_cache.go
index d6e8857d..107a4876 100644
--- a/cache/repo_cache.go
+++ b/cache/repo_cache.go
@@ -172,18 +172,10 @@ func (c *RepoCache) lock() error {
}
func (c *RepoCache) Close() error {
- for id := range c.identities {
- delete(c.identities, id)
- }
- for id := range c.identitiesExcerpts {
- delete(c.identitiesExcerpts, id)
- }
- for id := range c.bugs {
- delete(c.bugs, id)
- }
- for id := range c.bugExcerpts {
- delete(c.bugExcerpts, id)
- }
+ c.identities = make(map[entity.Id]*IdentityCache)
+ c.identitiesExcerpts = nil
+ c.bugs = make(map[entity.Id]*BugCache)
+ c.bugExcerpts = nil
lockPath := repoLockFilePath(c.repo)
return os.Remove(lockPath)
diff --git a/commands/bridge_pull.go b/commands/bridge_pull.go
index 2edabfaf..0f3b8413 100644
--- a/commands/bridge_pull.go
+++ b/commands/bridge_pull.go
@@ -1,6 +1,10 @@
package commands
import (
+ "context"
+ "fmt"
+ "os"
+ "sync"
"time"
"github.com/spf13/cobra"
@@ -31,12 +35,59 @@ func runBridgePull(cmd *cobra.Command, args []string) error {
return err
}
+ parentCtx := context.Background()
+ ctx, cancel := context.WithCancel(parentCtx)
+ defer cancel()
+
+ // buffered channel to avoid send block at the end
+ done := make(chan struct{}, 1)
+
+ var mu sync.Mutex
+ interruptCount := 0
+ interrupt.RegisterCleaner(func() error {
+ mu.Lock()
+ if interruptCount > 0 {
+ fmt.Println("Received another interrupt before graceful stop, terminating...")
+ os.Exit(0)
+ }
+
+ interruptCount++
+ mu.Unlock()
+
+ fmt.Println("Received interrupt signal, stopping the import...\n(Hit ctrl-c again to kill the process.)")
+
+ // send signal to stop the importer
+ cancel()
+
+ // block until importer gracefully shutdown
+ <-done
+ return nil
+ })
+
// TODO: by default import only new events
- err = b.ImportAll(time.Time{})
+ events, err := b.ImportAll(ctx, time.Time{})
if err != nil {
return err
}
+ importedIssues := 0
+ importedIdentities := 0
+ for result := range events {
+ fmt.Println(result.String())
+
+ switch result.Event {
+ case core.ImportEventBug:
+ importedIssues++
+ case core.ImportEventIdentity:
+ importedIdentities++
+ }
+ }
+
+ // send done signal
+ close(done)
+
+ fmt.Printf("Successfully imported %d issues and %d identities with %s bridge\n", importedIssues, importedIdentities, b.Name)
+
return nil
}
diff --git a/commands/bridge_push.go b/commands/bridge_push.go
index 11f5ca82..77fe8b29 100644
--- a/commands/bridge_push.go
+++ b/commands/bridge_push.go
@@ -1,7 +1,10 @@
package commands
import (
+ "context"
"fmt"
+ "os"
+ "sync"
"time"
"github.com/spf13/cobra"
@@ -32,20 +35,55 @@ func runBridgePush(cmd *cobra.Command, args []string) error {
return err
}
+ parentCtx := context.Background()
+ ctx, cancel := context.WithCancel(parentCtx)
+ defer cancel()
+
+ done := make(chan struct{}, 1)
+
+ var mu sync.Mutex
+ interruptCount := 0
+ interrupt.RegisterCleaner(func() error {
+ mu.Lock()
+ if interruptCount > 0 {
+ fmt.Println("Received another interrupt before graceful stop, terminating...")
+ os.Exit(0)
+ }
+
+ interruptCount++
+ mu.Unlock()
+
+ fmt.Println("Received interrupt signal, stopping the import...\n(Hit ctrl-c again to kill the process.)")
+
+ // send signal to stop the importer
+ cancel()
+
+ // block until importer gracefully shutdown
+ <-done
+ return nil
+ })
+
// TODO: by default export only new events
- out, err := b.ExportAll(time.Time{})
+ events, err := b.ExportAll(ctx, time.Time{})
if err != nil {
return err
}
- for result := range out {
- if result.Err != nil {
- fmt.Println(result.Err, result.Reason)
- } else {
- fmt.Printf("%s: %s\n", result.String(), result.ID)
+ exportedIssues := 0
+ for result := range events {
+ fmt.Println(result.String())
+
+ switch result.Event {
+ case core.ExportEventBug:
+ exportedIssues++
}
}
+ // send done signal
+ close(done)
+
+ fmt.Printf("Successfully exported %d issues with %s bridge\n", exportedIssues, b.Name)
+
return nil
}
diff --git a/entity/merge.go b/entity/merge.go
index 7c3e71c8..3ce8edac 100644
--- a/entity/merge.go
+++ b/entity/merge.go
@@ -13,6 +13,7 @@ const (
MergeStatusInvalid
MergeStatusUpdated
MergeStatusNothing
+ MergeStatusError
)
type MergeResult struct {
@@ -39,6 +40,8 @@ func (mr MergeResult) String() string {
return "updated"
case MergeStatusNothing:
return "nothing to do"
+ case MergeStatusError:
+ return fmt.Sprintf("merge error on %s: %s", mr.Id, mr.Err.Error())
default:
panic("unknown merge status")
}
@@ -46,8 +49,9 @@ func (mr MergeResult) String() string {
func NewMergeError(err error, id Id) MergeResult {
return MergeResult{
- Err: err,
- Id: id,
+ Err: err,
+ Id: id,
+ Status: MergeStatusError,
}
}