aboutsummaryrefslogtreecommitdiffstats
path: root/bridge/github/import.go
diff options
context:
space:
mode:
authorAmine Hilaly <hilalyamine@gmail.com>2019-08-13 19:51:14 +0200
committerAmine Hilaly <hilalyamine@gmail.com>2019-08-18 00:14:22 +0200
commit5ca326af83b90531d4d0c502bb1beabbe1b48c55 (patch)
tree6b7a32f2db9ab7321e9965c0ef4c715c6c517178 /bridge/github/import.go
parent6428352bd14828f670206b60862de7f71c52d235 (diff)
downloadgit-bug-5ca326af83b90531d4d0c502bb1beabbe1b48c55.tar.gz
bridge/core: add context.Context to ImportAll and ExportAll signatures
bridge/core: add ImportResult objects to stream import events bridge/core: launchpad support asynchronous import bridge/github: cancellable export and import functions bridge/gitlab: cancellable export and import functions commands: bridge pull/push gracefull kill bridge/github: fix github import bridge/github: use simple context for imports bridge/core: name parameters in interfaces github/core: Add EventError to export and import events types bridge/gitlab: add context support in gitlab requests functions bridge/gitlab: remove imported events count from importer logic bridge/github: remove imported events count from importer logic bridge/github: add context support in query and muration requets bridge/github: fix bug duplicate editions after multiple calls bridge/core: import import and export events String methods bridge/gitlab: fix error handling in note import events commands/bridge: Add statistics about imports and exports bridge/gitlab: properly handle context cancellation bridge/github: improve error handling bridge: break iterators on context cancel or timeout bridge: add context timeout support bridge: improve event formating and error handling commands: handle interrupt and switch cases bridge/github: add export mutation timeouts bridge: fix race condition bug in the github and gitlab importers bridge/github: improve context error handling
Diffstat (limited to 'bridge/github/import.go')
-rw-r--r--bridge/github/import.go199
1 files changed, 136 insertions, 63 deletions
diff --git a/bridge/github/import.go b/bridge/github/import.go
index dcaf2d05..9fef9cb5 100644
--- a/bridge/github/import.go
+++ b/bridge/github/import.go
@@ -28,11 +28,8 @@ type githubImporter struct {
// iterator
iterator *iterator
- // number of imported issues
- importedIssues int
-
- // number of imported identities
- importedIdentities int
+ // send only channel
+ out chan<- core.ImportResult
}
func (gi *githubImporter) Init(conf core.Configuration) error {
@@ -42,40 +39,49 @@ func (gi *githubImporter) Init(conf core.Configuration) error {
// ImportAll iterate over all the configured repository issues and ensure the creation of the
// missing issues / timeline items / edits / label events ...
-func (gi *githubImporter) ImportAll(repo *cache.RepoCache, since time.Time) error {
- gi.iterator = NewIterator(gi.conf[keyOwner], gi.conf[keyProject], gi.conf[keyToken], since)
-
- // Loop over all matching issues
- for gi.iterator.NextIssue() {
- issue := gi.iterator.IssueValue()
- fmt.Printf("importing issue: %v\n", issue.Title)
+func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) {
+ gi.iterator = NewIterator(ctx, 10, gi.conf[keyOwner], gi.conf[keyProject], gi.conf[keyToken], since)
+ out := make(chan core.ImportResult)
+ gi.out = out
+
+ go func() {
+ defer close(gi.out)
+
+ // Loop over all matching issues
+ for gi.iterator.NextIssue() {
+ issue := gi.iterator.IssueValue()
+ // create issue
+ b, err := gi.ensureIssue(repo, issue)
+ if err != nil {
+ err := fmt.Errorf("issue creation: %v", err)
+ out <- core.NewImportError(err, "")
+ return
+ }
- // create issue
- b, err := gi.ensureIssue(repo, issue)
- if err != nil {
- return fmt.Errorf("issue creation: %v", err)
- }
+ // loop over timeline items
+ for gi.iterator.NextTimelineItem() {
+ item := gi.iterator.TimelineItemValue()
+ if err := gi.ensureTimelineItem(repo, b, item); err != nil {
+ err := fmt.Errorf("timeline item creation: %v", err)
+ out <- core.NewImportError(err, "")
+ return
+ }
+ }
- // loop over timeline items
- for gi.iterator.NextTimelineItem() {
- if err := gi.ensureTimelineItem(repo, b, gi.iterator.TimelineItemValue()); err != nil {
- return fmt.Errorf("timeline item creation: %v", err)
+ // commit bug state
+ if err := b.CommitAsNeeded(); err != nil {
+ err = fmt.Errorf("bug commit: %v", err)
+ out <- core.NewImportError(err, "")
+ return
}
}
- // commit bug state
- if err := b.CommitAsNeeded(); err != nil {
- return fmt.Errorf("bug commit: %v", err)
+ if err := gi.iterator.Error(); err != nil && err != context.Canceled {
+ gi.out <- core.NewImportError(err, "")
}
- }
+ }()
- if err := gi.iterator.Error(); err != nil {
- fmt.Printf("import error: %v\n", err)
- return err
- }
-
- fmt.Printf("Successfully imported %d issues and %d identities from Github\n", gi.importedIssues, gi.importedIdentities)
- return nil
+ return out, nil
}
func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline) (*cache.BugCache, error) {
@@ -122,7 +128,9 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline
}
// importing a new bug
- gi.importedIssues++
+ gi.out <- core.NewImportBug(b.Id())
+ } else {
+ gi.out <- core.NewImportNothing("", "bug already imported")
}
} else {
@@ -130,6 +138,7 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline
for i, edit := range issueEdits {
if i == 0 && b != nil {
// The first edit in the github result is the issue creation itself, we already have that
+ gi.out <- core.NewImportNothing("", "bug already imported")
continue
}
@@ -159,13 +168,12 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline
}
// importing a new bug
- gi.importedIssues++
-
+ gi.out <- core.NewImportBug(b.Id())
continue
}
// other edits will be added as CommentEdit operations
- target, err := b.ResolveOperationWithMetadata(keyGithubUrl, issue.Url.String())
+ target, err := b.ResolveOperationWithMetadata(keyGithubId, parseId(issue.Id))
if err != nil {
return nil, err
}
@@ -181,16 +189,16 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline
}
func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.BugCache, item timelineItem) error {
- fmt.Printf("import event item: %s\n", item.Typename)
switch item.Typename {
case "IssueComment":
// collect all comment edits
- commentEdits := []userContentEdit{}
+ var commentEdits []userContentEdit
for gi.iterator.NextCommentEdit() {
commentEdits = append(commentEdits, gi.iterator.CommentEditValue())
}
+ // ensureTimelineComment send import events over out chanel
err := gi.ensureTimelineComment(repo, b, item.IssueComment, commentEdits)
if err != nil {
return fmt.Errorf("timeline comment creation: %v", err)
@@ -199,6 +207,12 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
case "LabeledEvent":
id := parseId(item.LabeledEvent.Id)
_, err := b.ResolveOperationWithMetadata(keyGithubId, id)
+ if err == nil {
+ reason := fmt.Sprintf("operation already imported: %v", item.Typename)
+ gi.out <- core.NewImportNothing("", reason)
+ return nil
+ }
+
if err != cache.ErrNoMatchingOp {
return err
}
@@ -206,7 +220,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
if err != nil {
return err
}
- _, err = b.ForceChangeLabelsRaw(
+ op, err := b.ForceChangeLabelsRaw(
author,
item.LabeledEvent.CreatedAt.Unix(),
[]string{
@@ -215,12 +229,21 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
nil,
map[string]string{keyGithubId: id},
)
+ if err != nil {
+ return err
+ }
- return err
+ gi.out <- core.NewImportLabelChange(op.Id())
+ return nil
case "UnlabeledEvent":
id := parseId(item.UnlabeledEvent.Id)
_, err := b.ResolveOperationWithMetadata(keyGithubId, id)
+ if err == nil {
+ reason := fmt.Sprintf("operation already imported: %v", item.Typename)
+ gi.out <- core.NewImportNothing("", reason)
+ return nil
+ }
if err != cache.ErrNoMatchingOp {
return err
}
@@ -229,7 +252,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
return err
}
- _, err = b.ForceChangeLabelsRaw(
+ op, err := b.ForceChangeLabelsRaw(
author,
item.UnlabeledEvent.CreatedAt.Unix(),
nil,
@@ -238,7 +261,12 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
},
map[string]string{keyGithubId: id},
)
- return err
+ if err != nil {
+ return err
+ }
+
+ gi.out <- core.NewImportLabelChange(op.Id())
+ return nil
case "ClosedEvent":
id := parseId(item.ClosedEvent.Id)
@@ -246,16 +274,27 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
if err != cache.ErrNoMatchingOp {
return err
}
+ if err == nil {
+ reason := fmt.Sprintf("operation already imported: %v", item.Typename)
+ gi.out <- core.NewImportNothing("", reason)
+ return nil
+ }
author, err := gi.ensurePerson(repo, item.ClosedEvent.Actor)
if err != nil {
return err
}
- _, err = b.CloseRaw(
+ op, err := b.CloseRaw(
author,
item.ClosedEvent.CreatedAt.Unix(),
map[string]string{keyGithubId: id},
)
- return err
+
+ if err != nil {
+ return err
+ }
+
+ gi.out <- core.NewImportStatusChange(op.Id())
+ return nil
case "ReopenedEvent":
id := parseId(item.ReopenedEvent.Id)
@@ -263,16 +302,27 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
if err != cache.ErrNoMatchingOp {
return err
}
+ if err == nil {
+ reason := fmt.Sprintf("operation already imported: %v", item.Typename)
+ gi.out <- core.NewImportNothing("", reason)
+ return nil
+ }
author, err := gi.ensurePerson(repo, item.ReopenedEvent.Actor)
if err != nil {
return err
}
- _, err = b.OpenRaw(
+ op, err := b.OpenRaw(
author,
item.ReopenedEvent.CreatedAt.Unix(),
map[string]string{keyGithubId: id},
)
- return err
+
+ if err != nil {
+ return err
+ }
+
+ gi.out <- core.NewImportStatusChange(op.Id())
+ return nil
case "RenamedTitleEvent":
id := parseId(item.RenamedTitleEvent.Id)
@@ -280,20 +330,31 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug
if err != cache.ErrNoMatchingOp {
return err
}
+ if err == nil {
+ reason := fmt.Sprintf("operation already imported: %v", item.Typename)
+ gi.out <- core.NewImportNothing("", reason)
+ return nil
+ }
author, err := gi.ensurePerson(repo, item.RenamedTitleEvent.Actor)
if err != nil {
return err
}
- _, err = b.SetTitleRaw(
+ op, err := b.SetTitleRaw(
author,
item.RenamedTitleEvent.CreatedAt.Unix(),
string(item.RenamedTitleEvent.CurrentTitle),
map[string]string{keyGithubId: id},
)
- return err
+ if err != nil {
+ return err
+ }
+
+ gi.out <- core.NewImportTitleEdition(op.Id())
+ return nil
default:
- fmt.Printf("ignore event: %v\n", item.Typename)
+ reason := fmt.Sprintf("ignoring timeline type: %v", item.Typename)
+ gi.out <- core.NewImportNothing("", reason)
}
return nil
@@ -307,14 +368,16 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache.
}
targetOpID, err := b.ResolveOperationWithMetadata(keyGithubId, parseId(item.Id))
- if err != nil && err != cache.ErrNoMatchingOp {
+ if err == nil {
+ reason := fmt.Sprintf("comment already imported")
+ gi.out <- core.NewImportNothing("", reason)
+ } else if err != cache.ErrNoMatchingOp {
// real error
return err
}
// if no edits are given we create the comment
if len(edits) == 0 {
- // if comment doesn't exist
if err == cache.ErrNoMatchingOp {
cleanText, err := text.Cleanup(string(item.Body))
if err != nil {
@@ -322,7 +385,7 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache.
}
// add comment operation
- _, err = b.AddCommentRaw(
+ op, err := b.AddCommentRaw(
author,
item.CreatedAt.Unix(),
cleanText,
@@ -332,12 +395,18 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache.
keyGithubUrl: parseId(item.Url.String()),
},
)
- return err
+ if err != nil {
+ return err
+ }
+
+ gi.out <- core.NewImportComment(op.Id())
}
+
} else {
for i, edit := range edits {
if i == 0 && targetOpID != "" {
// The first edit in the github result is the comment creation itself, we already have that
+ gi.out <- core.NewImportNothing("", "comment already imported")
continue
}
@@ -370,7 +439,6 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache.
// set target for the nexr edit now that the comment is created
targetOpID = op.Id()
-
continue
}
@@ -386,7 +454,7 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache.
func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit userContentEdit) error {
_, err := b.ResolveOperationWithMetadata(keyGithubId, parseId(edit.Id))
if err == nil {
- // already imported
+ gi.out <- core.NewImportNothing(b.Id(), "edition already imported")
return nil
}
if err != cache.ErrNoMatchingOp {
@@ -394,8 +462,6 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC
return err
}
- fmt.Println("import edition")
-
editor, err := gi.ensurePerson(repo, edit.Editor)
if err != nil {
return err
@@ -404,7 +470,7 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC
switch {
case edit.DeletedAt != nil:
// comment deletion, not supported yet
- fmt.Println("comment deletion is not supported yet")
+ gi.out <- core.NewImportNothing(b.Id(), "comment deletion is not supported yet")
case edit.DeletedAt == nil:
@@ -414,7 +480,7 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC
}
// comment edition
- _, err = b.EditCommentRaw(
+ op, err := b.EditCommentRaw(
editor,
edit.CreatedAt.Unix(),
target,
@@ -427,6 +493,8 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC
if err != nil {
return err
}
+
+ gi.out <- core.NewImportCommentEdition(op.Id())
}
return nil
@@ -450,7 +518,6 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca
}
// importing a new identity
- gi.importedIdentities++
var name string
var email string
@@ -471,7 +538,7 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca
case "Bot":
}
- return repo.NewIdentityRaw(
+ i, err = repo.NewIdentityRaw(
name,
email,
string(actor.Login),
@@ -480,6 +547,13 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca
keyGithubLogin: string(actor.Login),
},
)
+
+ if err != nil {
+ return nil, err
+ }
+
+ gi.out <- core.NewImportIdentity(i.Id())
+ return i, nil
}
func (gi *githubImporter) getGhost(repo *cache.RepoCache) (*cache.IdentityCache, error) {
@@ -500,8 +574,7 @@ func (gi *githubImporter) getGhost(repo *cache.RepoCache) (*cache.IdentityCache,
gc := buildClient(gi.conf[keyToken])
- parentCtx := context.Background()
- ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
+ ctx, cancel := context.WithTimeout(gi.iterator.ctx, defaultTimeout)
defer cancel()
err = gc.Query(ctx, &q, variables)