diff options
author | Amine Hilaly <hilalyamine@gmail.com> | 2019-08-13 19:51:14 +0200 |
---|---|---|
committer | Amine Hilaly <hilalyamine@gmail.com> | 2019-08-18 00:14:22 +0200 |
commit | 5ca326af83b90531d4d0c502bb1beabbe1b48c55 (patch) | |
tree | 6b7a32f2db9ab7321e9965c0ef4c715c6c517178 /bridge/github/import.go | |
parent | 6428352bd14828f670206b60862de7f71c52d235 (diff) | |
download | git-bug-5ca326af83b90531d4d0c502bb1beabbe1b48c55.tar.gz |
bridge/core: add context.Context to ImportAll and ExportAll signatures
bridge/core: add ImportResult objects to stream import events
bridge/core: launchpad support asynchronous import
bridge/github: cancellable export and import functions
bridge/gitlab: cancellable export and import functions
commands: bridge pull/push gracefull kill
bridge/github: fix github import
bridge/github: use simple context for imports
bridge/core: name parameters in interfaces
github/core: Add EventError to export and import events types
bridge/gitlab: add context support in gitlab requests functions
bridge/gitlab: remove imported events count from importer logic
bridge/github: remove imported events count from importer logic
bridge/github: add context support in query and muration requets
bridge/github: fix bug duplicate editions after multiple calls
bridge/core: import import and export events String methods
bridge/gitlab: fix error handling in note import events
commands/bridge: Add statistics about imports and exports
bridge/gitlab: properly handle context cancellation
bridge/github: improve error handling
bridge: break iterators on context cancel or timeout
bridge: add context timeout support
bridge: improve event formating and error handling
commands: handle interrupt and switch cases
bridge/github: add export mutation timeouts
bridge: fix race condition bug in the github and gitlab importers
bridge/github: improve context error handling
Diffstat (limited to 'bridge/github/import.go')
-rw-r--r-- | bridge/github/import.go | 199 |
1 files changed, 136 insertions, 63 deletions
diff --git a/bridge/github/import.go b/bridge/github/import.go index dcaf2d05..9fef9cb5 100644 --- a/bridge/github/import.go +++ b/bridge/github/import.go @@ -28,11 +28,8 @@ type githubImporter struct { // iterator iterator *iterator - // number of imported issues - importedIssues int - - // number of imported identities - importedIdentities int + // send only channel + out chan<- core.ImportResult } func (gi *githubImporter) Init(conf core.Configuration) error { @@ -42,40 +39,49 @@ func (gi *githubImporter) Init(conf core.Configuration) error { // ImportAll iterate over all the configured repository issues and ensure the creation of the // missing issues / timeline items / edits / label events ... -func (gi *githubImporter) ImportAll(repo *cache.RepoCache, since time.Time) error { - gi.iterator = NewIterator(gi.conf[keyOwner], gi.conf[keyProject], gi.conf[keyToken], since) - - // Loop over all matching issues - for gi.iterator.NextIssue() { - issue := gi.iterator.IssueValue() - fmt.Printf("importing issue: %v\n", issue.Title) +func (gi *githubImporter) ImportAll(ctx context.Context, repo *cache.RepoCache, since time.Time) (<-chan core.ImportResult, error) { + gi.iterator = NewIterator(ctx, 10, gi.conf[keyOwner], gi.conf[keyProject], gi.conf[keyToken], since) + out := make(chan core.ImportResult) + gi.out = out + + go func() { + defer close(gi.out) + + // Loop over all matching issues + for gi.iterator.NextIssue() { + issue := gi.iterator.IssueValue() + // create issue + b, err := gi.ensureIssue(repo, issue) + if err != nil { + err := fmt.Errorf("issue creation: %v", err) + out <- core.NewImportError(err, "") + return + } - // create issue - b, err := gi.ensureIssue(repo, issue) - if err != nil { - return fmt.Errorf("issue creation: %v", err) - } + // loop over timeline items + for gi.iterator.NextTimelineItem() { + item := gi.iterator.TimelineItemValue() + if err := gi.ensureTimelineItem(repo, b, item); err != nil { + err := fmt.Errorf("timeline item creation: %v", err) + out <- core.NewImportError(err, "") + return + } + } - // loop over timeline items - for gi.iterator.NextTimelineItem() { - if err := gi.ensureTimelineItem(repo, b, gi.iterator.TimelineItemValue()); err != nil { - return fmt.Errorf("timeline item creation: %v", err) + // commit bug state + if err := b.CommitAsNeeded(); err != nil { + err = fmt.Errorf("bug commit: %v", err) + out <- core.NewImportError(err, "") + return } } - // commit bug state - if err := b.CommitAsNeeded(); err != nil { - return fmt.Errorf("bug commit: %v", err) + if err := gi.iterator.Error(); err != nil && err != context.Canceled { + gi.out <- core.NewImportError(err, "") } - } + }() - if err := gi.iterator.Error(); err != nil { - fmt.Printf("import error: %v\n", err) - return err - } - - fmt.Printf("Successfully imported %d issues and %d identities from Github\n", gi.importedIssues, gi.importedIdentities) - return nil + return out, nil } func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline) (*cache.BugCache, error) { @@ -122,7 +128,9 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline } // importing a new bug - gi.importedIssues++ + gi.out <- core.NewImportBug(b.Id()) + } else { + gi.out <- core.NewImportNothing("", "bug already imported") } } else { @@ -130,6 +138,7 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline for i, edit := range issueEdits { if i == 0 && b != nil { // The first edit in the github result is the issue creation itself, we already have that + gi.out <- core.NewImportNothing("", "bug already imported") continue } @@ -159,13 +168,12 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline } // importing a new bug - gi.importedIssues++ - + gi.out <- core.NewImportBug(b.Id()) continue } // other edits will be added as CommentEdit operations - target, err := b.ResolveOperationWithMetadata(keyGithubUrl, issue.Url.String()) + target, err := b.ResolveOperationWithMetadata(keyGithubId, parseId(issue.Id)) if err != nil { return nil, err } @@ -181,16 +189,16 @@ func (gi *githubImporter) ensureIssue(repo *cache.RepoCache, issue issueTimeline } func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.BugCache, item timelineItem) error { - fmt.Printf("import event item: %s\n", item.Typename) switch item.Typename { case "IssueComment": // collect all comment edits - commentEdits := []userContentEdit{} + var commentEdits []userContentEdit for gi.iterator.NextCommentEdit() { commentEdits = append(commentEdits, gi.iterator.CommentEditValue()) } + // ensureTimelineComment send import events over out chanel err := gi.ensureTimelineComment(repo, b, item.IssueComment, commentEdits) if err != nil { return fmt.Errorf("timeline comment creation: %v", err) @@ -199,6 +207,12 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug case "LabeledEvent": id := parseId(item.LabeledEvent.Id) _, err := b.ResolveOperationWithMetadata(keyGithubId, id) + if err == nil { + reason := fmt.Sprintf("operation already imported: %v", item.Typename) + gi.out <- core.NewImportNothing("", reason) + return nil + } + if err != cache.ErrNoMatchingOp { return err } @@ -206,7 +220,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug if err != nil { return err } - _, err = b.ForceChangeLabelsRaw( + op, err := b.ForceChangeLabelsRaw( author, item.LabeledEvent.CreatedAt.Unix(), []string{ @@ -215,12 +229,21 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug nil, map[string]string{keyGithubId: id}, ) + if err != nil { + return err + } - return err + gi.out <- core.NewImportLabelChange(op.Id()) + return nil case "UnlabeledEvent": id := parseId(item.UnlabeledEvent.Id) _, err := b.ResolveOperationWithMetadata(keyGithubId, id) + if err == nil { + reason := fmt.Sprintf("operation already imported: %v", item.Typename) + gi.out <- core.NewImportNothing("", reason) + return nil + } if err != cache.ErrNoMatchingOp { return err } @@ -229,7 +252,7 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug return err } - _, err = b.ForceChangeLabelsRaw( + op, err := b.ForceChangeLabelsRaw( author, item.UnlabeledEvent.CreatedAt.Unix(), nil, @@ -238,7 +261,12 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug }, map[string]string{keyGithubId: id}, ) - return err + if err != nil { + return err + } + + gi.out <- core.NewImportLabelChange(op.Id()) + return nil case "ClosedEvent": id := parseId(item.ClosedEvent.Id) @@ -246,16 +274,27 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug if err != cache.ErrNoMatchingOp { return err } + if err == nil { + reason := fmt.Sprintf("operation already imported: %v", item.Typename) + gi.out <- core.NewImportNothing("", reason) + return nil + } author, err := gi.ensurePerson(repo, item.ClosedEvent.Actor) if err != nil { return err } - _, err = b.CloseRaw( + op, err := b.CloseRaw( author, item.ClosedEvent.CreatedAt.Unix(), map[string]string{keyGithubId: id}, ) - return err + + if err != nil { + return err + } + + gi.out <- core.NewImportStatusChange(op.Id()) + return nil case "ReopenedEvent": id := parseId(item.ReopenedEvent.Id) @@ -263,16 +302,27 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug if err != cache.ErrNoMatchingOp { return err } + if err == nil { + reason := fmt.Sprintf("operation already imported: %v", item.Typename) + gi.out <- core.NewImportNothing("", reason) + return nil + } author, err := gi.ensurePerson(repo, item.ReopenedEvent.Actor) if err != nil { return err } - _, err = b.OpenRaw( + op, err := b.OpenRaw( author, item.ReopenedEvent.CreatedAt.Unix(), map[string]string{keyGithubId: id}, ) - return err + + if err != nil { + return err + } + + gi.out <- core.NewImportStatusChange(op.Id()) + return nil case "RenamedTitleEvent": id := parseId(item.RenamedTitleEvent.Id) @@ -280,20 +330,31 @@ func (gi *githubImporter) ensureTimelineItem(repo *cache.RepoCache, b *cache.Bug if err != cache.ErrNoMatchingOp { return err } + if err == nil { + reason := fmt.Sprintf("operation already imported: %v", item.Typename) + gi.out <- core.NewImportNothing("", reason) + return nil + } author, err := gi.ensurePerson(repo, item.RenamedTitleEvent.Actor) if err != nil { return err } - _, err = b.SetTitleRaw( + op, err := b.SetTitleRaw( author, item.RenamedTitleEvent.CreatedAt.Unix(), string(item.RenamedTitleEvent.CurrentTitle), map[string]string{keyGithubId: id}, ) - return err + if err != nil { + return err + } + + gi.out <- core.NewImportTitleEdition(op.Id()) + return nil default: - fmt.Printf("ignore event: %v\n", item.Typename) + reason := fmt.Sprintf("ignoring timeline type: %v", item.Typename) + gi.out <- core.NewImportNothing("", reason) } return nil @@ -307,14 +368,16 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache. } targetOpID, err := b.ResolveOperationWithMetadata(keyGithubId, parseId(item.Id)) - if err != nil && err != cache.ErrNoMatchingOp { + if err == nil { + reason := fmt.Sprintf("comment already imported") + gi.out <- core.NewImportNothing("", reason) + } else if err != cache.ErrNoMatchingOp { // real error return err } // if no edits are given we create the comment if len(edits) == 0 { - // if comment doesn't exist if err == cache.ErrNoMatchingOp { cleanText, err := text.Cleanup(string(item.Body)) if err != nil { @@ -322,7 +385,7 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache. } // add comment operation - _, err = b.AddCommentRaw( + op, err := b.AddCommentRaw( author, item.CreatedAt.Unix(), cleanText, @@ -332,12 +395,18 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache. keyGithubUrl: parseId(item.Url.String()), }, ) - return err + if err != nil { + return err + } + + gi.out <- core.NewImportComment(op.Id()) } + } else { for i, edit := range edits { if i == 0 && targetOpID != "" { // The first edit in the github result is the comment creation itself, we already have that + gi.out <- core.NewImportNothing("", "comment already imported") continue } @@ -370,7 +439,6 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache. // set target for the nexr edit now that the comment is created targetOpID = op.Id() - continue } @@ -386,7 +454,7 @@ func (gi *githubImporter) ensureTimelineComment(repo *cache.RepoCache, b *cache. func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugCache, target entity.Id, edit userContentEdit) error { _, err := b.ResolveOperationWithMetadata(keyGithubId, parseId(edit.Id)) if err == nil { - // already imported + gi.out <- core.NewImportNothing(b.Id(), "edition already imported") return nil } if err != cache.ErrNoMatchingOp { @@ -394,8 +462,6 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC return err } - fmt.Println("import edition") - editor, err := gi.ensurePerson(repo, edit.Editor) if err != nil { return err @@ -404,7 +470,7 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC switch { case edit.DeletedAt != nil: // comment deletion, not supported yet - fmt.Println("comment deletion is not supported yet") + gi.out <- core.NewImportNothing(b.Id(), "comment deletion is not supported yet") case edit.DeletedAt == nil: @@ -414,7 +480,7 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC } // comment edition - _, err = b.EditCommentRaw( + op, err := b.EditCommentRaw( editor, edit.CreatedAt.Unix(), target, @@ -427,6 +493,8 @@ func (gi *githubImporter) ensureCommentEdit(repo *cache.RepoCache, b *cache.BugC if err != nil { return err } + + gi.out <- core.NewImportCommentEdition(op.Id()) } return nil @@ -450,7 +518,6 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca } // importing a new identity - gi.importedIdentities++ var name string var email string @@ -471,7 +538,7 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca case "Bot": } - return repo.NewIdentityRaw( + i, err = repo.NewIdentityRaw( name, email, string(actor.Login), @@ -480,6 +547,13 @@ func (gi *githubImporter) ensurePerson(repo *cache.RepoCache, actor *actor) (*ca keyGithubLogin: string(actor.Login), }, ) + + if err != nil { + return nil, err + } + + gi.out <- core.NewImportIdentity(i.Id()) + return i, nil } func (gi *githubImporter) getGhost(repo *cache.RepoCache) (*cache.IdentityCache, error) { @@ -500,8 +574,7 @@ func (gi *githubImporter) getGhost(repo *cache.RepoCache) (*cache.IdentityCache, gc := buildClient(gi.conf[keyToken]) - parentCtx := context.Background() - ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout) + ctx, cancel := context.WithTimeout(gi.iterator.ctx, defaultTimeout) defer cancel() err = gc.Query(ctx, &q, variables) |