#!/usr/bin/env python3
# Copyright 2024 Bryan Gardiner <bog@khumba.net>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# Imports issues from an exported Gitlab project into a Sourcehut tracker.
#
# Reads *.ndjson files from an exported Gitlab project, and recreates tickets
# and their histories in a new, empty Sourcehut tracker. Preserves notes
# attached to each issue; generally this includes comments, status changes,
# labels, milestones, and anything else that is included in as a plain text
# note, but definitely doesn't include all available metadata.
#
# Tickets are created in sr.ht via SMTP, so a working mail setup is required.
# Surely using Sourcehut's API would be better.
#
# There are a few caveats:
#
# 1. If all issue IDs from 1 to the max ID are available in your export, and the
# tracker you import into is a new tracker, then your Gitlab and Sourcehut issue
# IDs will match up one-to-one, and mentions of one ticket from another will
# work. If not, you need to decide how you want to handle this. You can choose
# to create empty Sourcehut tickets for the missing Gitlab issues so that IDs
# still match, by passing --create-missing-issues. Blank issues will be created
# then closed. Alternatively, you can pass --skip-missing-issues to not create
# any extra Sourcehut tickets, but IDs will not line up. If one of these issues
# is needed, this program will tell you.
#
# 2. Because emails are used to create tickets, we also assume that emails are
# processed in the order that they are sent, so that tickets don't get created
# out of order. This program has no way of knowing if that happens, however,
# there is a configurable delay between sending each email, for this reason.
#
# 3. Gitlab project exports are missing some crucial information, in particular
# they don't include ticket author names or label IDs. For best results,
# appropriate mappings for your project can be provided manually in CSV files to
# --labels-file and --users-file. These CSV files should be headerless, and
# each row should contain a label or user ID, followed by the name for that
# entity. If you want to skip these, then --skip-labels and --skip-users must
# be passed. Some label and user info will still be included, but label
# references in comments and issue creator names will be missing. You can run
# with incomplete files by passing --skip-unknown-labels or
# --skip-unknown-users.
#
# 4. If your project has confidential issues or comments in it, then you will
# need to decide to exclude them with --skip-confidential, or include them all
# with --include-confidential. If there are confidential items and you don't
# pass either of these options, then an exception will be thrown. If you need
# more fine-grained control over confidential items, edit issues.ndjson by hand.
#
# 5. The projects I have tested this on are small, and don't make use of many of
# Gitlab's features. This may bork on more complex projects.
# Still here? Here's how to use this:
#
# First take an export of your Gitlab project from its settings area, then
# extract the archive. The important files are tree/project/*.ndjson.
#
# Let's generate a report of all the emails that would be sent. Preview the
# output to make sure things look right, and ensure that the command completes
# without error:
#
# touch labels.csv users.csv # First create these empty files.
#
# ./import_issues.py \
# --srht-owner=MY_SRHT_USER \
# --srht-tracker=MY_SRHT_TRACKER \
# --gitlab-project-url=https://gitlab.com/ME/PROJECT/ \
# --from='Moi <me@email.com>' \
# --labels-file=labels.csv \
# --users-file=users.csv \
# .../gitlab-export/tree/project \
# >issue-emails.txt
#
# You may get errors if you are missing label or user mappings, and you haven't
# disabled these; add them to the labels.csv or users.csv until you get no more
# errors:
#
# labels.csv:
# 123456,Bug
# 232323,Feature
# ...
#
# users.csv:
# 1234000,John Joe (@jdoe)
# ...
#
# If the issue-emails.txt file looks correct, then you can proceed with sending
# emails. Double-check that your tracket is empty to start with, then rerun the
# command with "--mode=send" and with your SMTP parameters. SMTP options can be
# specified either via parameters --smtp-{host,port,user,password} or the
# equivalent SMTP_{HOST,PORT,USER,PASSWORD} environment variables. Pass
# --smtp-ssl to enable SSL. Also by default there is a five-second delay
# between sending emails, that you may wish to change with --delay.
#
# ./import_issues.py \
# --srht-owner=MY_SRHT_USER \
# --srht-tracker=MY_SRHT_TRACKER \
# --gitlab-project-url=https://gitlab.com/ME/PROJECT/ \
# --from='Moi <me@email.com>' \
# --labels-file=labels.csv \
# --users-file=users.csv \
# --smtp-host=SMTP_HOSTNAME \
# --smtp-ssl \
# --smtp-user=SMTP_USERNAME \
# --smtp-password=SMTP_PASSWORD \
# .../gitlab-export/tree/project
import argparse
import csv
import json
import logging
import os
import re
import smtplib
import subprocess
import sys
import time
from datetime import datetime, timezone
from email.message import EmailMessage
from email.utils import format_datetime, make_msgid
from pathlib import Path
from typing import Any, Optional
from urllib.parse import urlparse
logging.basicConfig(
format="%(levelname)s:%(funcName)s:%(message)s",
level=logging.DEBUG,
stream=sys.stdout,
)
ID_RE = re.compile(r"^[0-9]+$")
# todo.sr.ht seems to be limited to less than 16k comments
# to be sure, we will do just 12k
MAX_SIZE_COMMENT = 12 * 1024
tickets_to_be_closed = []
def split_long_str(in_str: str, max_len: int = MAX_SIZE_COMMENT) -> list[str]:
out = []
tmp_str = ""
for line in in_str.splitlines(keepends=True):
if len(tmp_str + line) < max_len:
tmp_str += line
else:
out.append(tmp_str)
tmp_str = line
if len(tmp_str) > 0:
out.append(tmp_str)
return out
def get_labels(tracker: str) -> list[dict[str, str]]:
"""
collects labels for your named tracker
param: tracker: name of the tracker
return: list of all labels in the tracker
"""
query = (
'query { me { tracker(name: "'
+ tracker
+ '") { labels { results { id, name, foregroundColor, backgroundColor, created } } } }}'
)
try:
ret = subprocess.run(
["hut", "graphql", "todo", "--stdin"],
input=query,
text=True,
check=True,
capture_output=True,
)
except subprocess.CalledProcessError as ex:
raise RuntimeError(
f"hut failed with excitcode {ex.returncode}\n\nstdout:\n{ex.stdout}\n\nand stderr:\n{ex.stderr}"
) from ex
data = json.loads(ret.stdout)
return data["me"]["tracker"]["labels"]["results"]
log = logging.getLogger()
email_count = 0
issue_count = 0
def read_id_map_file(file_path: Path) -> dict[int, str]:
"""Reads a CSV file with ID,NAME mappings and returns the resulting dict."""
result: dict[int, str] = {}
with open(file_path, newline="") as fh:
reader = csv.reader(fh)
line_num = 0
for row in reader:
line_num += 1
assert (
len(row) == 2 and ID_RE.search(row[0]) and row[1]
), f"Row {line_num} of {file_path} is not in the form <ID>,<NAME>: {row!r}"
new_id = int(row[0])
assert (
new_id not in result
), f"ID {new_id} appears multiple times in {file_path}."
result[new_id] = row[1]
return result
def do_mail(
*,
smtp,
delay: float,
mode: str,
frm: str,
to: str,
body: str,
subject: Optional[str] = None,
):
global email_count
email_count += 1
if mode == "print":
verb = "Printing"
elif mode == "send":
verb = "Sending"
else:
raise ValueError(f"Unhandled mode {mode!r}.")
log.info("%s email #%d.", verb, email_count)
date = format_datetime(datetime.now(timezone.utc))
msg_id = make_msgid()
indent = " "
if mode == "print":
print(f"{indent}From: {frm}")
print(f"{indent}To: {to}")
print(f"{indent}Date: {date}")
if subject:
print(f"{indent}Subject: {subject}")
print(f"{indent}Message-ID: {msg_id}")
print()
print(indent + re.sub("\n", f"\n{indent}", body))
print()
elif mode == "send":
msg = EmailMessage()
msg.set_content(body)
msg["From"] = frm
msg["To"] = to
msg["Date"] = date
if subject:
msg["Subject"] = subject
# Message-ID is required, unless you want this error message from the
# sr.ht mail server:
#
# 500 Error: (AttributeError) 'NoneType' object has no attribute
# 'removeprefix' (in reply to end of DATA command)
msg["Message-ID"] = msg_id
smtp.send_message(msg)
time.sleep(delay)
else:
raise RuntimeError(f"Unknown mode: {mode!r}")
def run_hut(cmds, tracker, msg, args=None, delay=None):
log.debug(
"cmds = %s, tracker = %s, args = %s\n\nmsg:\n%s", cmds, tracker, args, msg
)
if args is None:
args = []
try:
if msg is None:
res = subprocess.run(
["hut", "todo"] + cmds + ["-t", tracker] + args,
check=True,
encoding="utf-8",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
else:
res = subprocess.run(
["hut", "todo"] + cmds + ["-t", tracker, "--stdin"] + args,
check=True,
encoding="utf-8",
input=msg,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
except subprocess.CalledProcessError as ex:
raise RuntimeError(
f"hut failed with excitcode {ex.returncode}\n\nstdout:\n{ex.stdout}\n\nand stderr:\n{ex.stderr}"
) from ex
time.sleep(delay)
return res
def open_ticket_by_email(
smtp,
delay: float,
mode: str,
tracker: str,
frm: str,
title: str,
body: str,
created_by: Optional[str],
created_at: str,
closed_at: Optional[str],
is_closed: bool,
is_confidential: bool,
labels: list[dict[str, Any]],
milestone_name: Optional[str],
gitlab_ticket_url: str,
):
lines = []
pheaders = []
pheaders.append(f"Migrated from: {gitlab_ticket_url}")
if created_by:
pheaders.append(f"Created by: {created_by}")
pheaders.append(f"Created at: {created_at}")
if closed_at is not None:
pheaders.append(f"Closed at: {closed_at}")
elif is_closed:
pheaders.append("State: closed")
if milestone_name:
pheaders.append(f"Milestone: {milestone_name}")
if labels:
pheaders.append(
"Labels: "
+ ", ".join(
[
x["label"]["title"]
for x in sorted(labels, key=lambda x: x["label"]["title"])
]
)
)
if is_confidential:
pheaders.append("Confidential: true")
lines.append(" \\\n".join(pheaders))
lines.append("")
lines.append(body)
body_split = split_long_str("\n".join(lines))
# First create the ticket
do_mail(
smtp=smtp,
delay=delay,
mode=mode,
frm=frm,
to=f"{tracker}@todo.sr.ht",
subject=title,
body=body_split[0],
)
# then add remaining parts of the description as comments
if len(body_split) > 1:
url_split = urlparse(gitlab_ticket_url)
issue_id = int(os.path.basename(url_split.path))
for part in body_split[1:]:
do_mail(
smtp=smtp,
delay=delay,
mode=mode,
frm=frm,
to=f"{tracker}/{issue_id}@todo.sr.ht",
body=part,
)
def open_ticket_by_hut(
delay: float,
tracker: str,
frm: str,
title: str,
body: str,
created_by: Optional[str],
created_at: str,
closed_at: Optional[str],
is_closed: bool,
is_confidential: bool,
labels: list[dict[str, Any]],
milestone_name: Optional[str],
gitlab_ticket_url: str,
) -> str:
lines = []
pheaders = []
pheaders.append(f"Migrated from: {gitlab_ticket_url}")
if created_by:
pheaders.append(f"Created by: {created_by}")
pheaders.append(f"Created at: {created_at}")
if closed_at is not None:
pheaders.append(f"Closed at: {closed_at}")
elif is_closed:
pheaders.append("State: closed")
if milestone_name:
pheaders.append(f"Milestone: {milestone_name}")
if is_confidential:
pheaders.append("Confidential: true")
lines.append(" \\\n".join(pheaders))
lines.append("")
lines.append(body)
msg = title
if len(lines):
msg += "\n" + "\n".join(lines)
msg_split = split_long_str(msg)
out = run_hut(["ticket", "create"], tracker, msg_split[0], delay=delay)
out.issue_id = int(out.stderr.strip()[len("Created new ticket #") :])
for label in sorted(labels, key=lambda x: x["label"]["title"]):
# {"target_type":"Issue",
# "created_at":"2023-11-03T22:10:29.419Z",
# "updated_at":"2023-11-03T22:10:29.419Z",
# "label":{"title":"smime",
# "color":"#00b140",
# "project_id":346279,
# "created_at":"2023-02-02T10:49:17.779Z",
# "updated_at":"2023-02-02T10:49:17.779Z",
# "template":false,
# "description":null,
# "group_id":null,
# "type":"ProjectLabel",
# "priorities":[]}}
label_name = label["label"]["title"]
label_color = label["label"]["color"]
ensure_label(tracker, label_name, label_color, delay=delay)
run_hut(
["ticket", "label"],
tracker,
None,
args=[str(out.issue_id), "-l", label_name],
delay=delay,
)
# then add remaining parts of the description as comments
if len(msg_split) > 1:
for part in msg_split[1:]:
run_hut(
["ticket", "comment"], tracker, part, [str(out.issue_id)], delay=delay
)
return out
def open_ticket(
*,
smtp,
delay: float,
mode: str,
tracker: str,
frm: str,
title: str,
body: str,
created_by: Optional[str],
created_at: str,
closed_at: Optional[str],
is_closed: bool,
is_confidential: bool,
labels: list[dict[str, Any]],
milestone_name: Optional[str],
gitlab_ticket_url: str,
) -> int:
global issue_count
if mode in ("send", "print"):
open_ticket_by_email(
smtp,
delay,
mode,
tracker,
frm,
title,
body,
created_by,
created_at,
closed_at,
is_closed,
is_confidential,
labels,
milestone_name,
gitlab_ticket_url,
)
elif mode == "hut":
open_ticket_by_hut(
delay,
tracker,
frm,
title,
body,
created_by,
created_at,
closed_at,
is_closed,
is_confidential,
labels,
milestone_name,
gitlab_ticket_url,
)
issue_count += 1
return issue_count
def file_missing_ticket(
*,
smtp,
delay: float,
mode: str,
tracker: str,
frm: str,
issue_id: int,
):
global issue_count
if mode in ["send", "print"]:
do_mail(
smtp=smtp,
delay=delay,
mode=mode,
frm=frm,
to=f"{tracker}@todo.sr.ht",
subject="Missing issue",
body=f"Issue {issue_id} is not known.",
)
elif mode == "hut":
msg = f"Missing issue\n\nIssue {issue_id} is not known."
run_hut(["ticket", "create"], tracker, msg, delay=delay)
issue_count += 1
tickets_to_be_closed.append(
(
smtp,
delay,
mode,
tracker,
frm,
issue_count,
)
)
def send_comment(
*,
smtp,
delay: float,
mode: str,
tracker: str,
frm: str,
issue_id: int,
body: str,
author_name: str,
created_at: str,
last_edited_at: str,
is_system: bool,
is_confidential: bool,
):
lines = []
pheaders = []
# Pseudo-headers, if any.
if is_confidential:
pheaders.append("Confidential: true")
if pheaders:
lines.append(" \\\n".join(pheaders))
lines.append("")
# Authorship note for a regular comment.
if is_system:
lines.append(f"Changed on {created_at} by {author_name}:")
else:
lines.append(f"On {created_at}, {author_name} wrote:")
lines.append("")
lines.append(body)
if last_edited_at and last_edited_at != created_at:
lines.append("")
lines.append(f"(Last edited at {last_edited_at}.)")
body = "\n".join(lines)
body_split = split_long_str(body)
for part in body_split:
if mode in ["send", "print"]:
do_mail(
smtp=smtp,
delay=delay,
mode=mode,
frm=frm,
to=f"{tracker}/{issue_id}@todo.sr.ht",
body=part,
)
elif mode == "hut":
run_hut(["ticket", "comment"], tracker, part, [str(issue_id)], delay=delay)
def close_ticket(
*,
smtp,
delay: float,
mode: str,
tracker: str,
frm: str,
issue_id: int,
closed_at: Optional[str],
is_closed: bool,
):
lines = []
if closed_at is not None:
# (Skipping pseudoheaders array here, only have one.)
lines.append(f"Closed at: {closed_at}")
elif is_closed:
lines.append("Ticket closed.")
if mode in ["send", "print"]:
lines.append("")
lines.append("!resolve fixed")
do_mail(
smtp=smtp,
delay=delay,
mode=mode,
frm=frm,
to=f"{tracker}/{issue_id}@todo.sr.ht",
body="\n".join(lines),
)
elif mode == "hut":
run_hut(
["ticket", "update-status"],
tracker,
None,
[str(issue_id), "--resolution", "fixed", "--status", "resolved"],
delay=delay,
)
def ensure_label(
tracker: str, name: str, bg_color: str, fg_color: str = "#FFFFFF", delay=None
):
labels = get_labels(tracker.split("/", 1)[1])
if not ([x for x in labels if x["name"] == name]):
run_hut(
["label", "create"],
tracker,
None,
["--background", bg_color, "--foreground", fg_color, name],
delay=delay,
)
def run(
*,
smtp,
delay: float,
mode: str,
tracker: str,
frm: str,
export_dir_path: Path,
gitlab_project_url: str,
labels_file_path: Optional[Path],
skip_unknown_labels: bool,
users_file_path: Optional[Path],
skip_unknown_users: bool,
skip_missing_issues: bool,
create_missing_issues: bool,
include_confidential: bool,
skip_confidential: bool,
):
label_ids_to_names: Optional[dict[int, str]] = (
read_id_map_file(labels_file_path) if labels_file_path else None
)
user_ids_to_names: Optional[dict[int, str]] = (
read_id_map_file(users_file_path) if users_file_path else None
)
# TODO Might be able to automatically map note.events.author_id to
# note.author.name for a subset of relevant users.
milestone_jsons = []
milestones_file_path = export_dir_path / "milestones.ndjson"
# This file may not exist if the project has no milestones.
if milestones_file_path.exists():
with open(milestones_file_path) as milestones_file:
for line in milestones_file:
milestone_jsons.append(json.loads(line))
milestone_ids_to_titles = {}
for milestone_json in milestone_jsons:
milestone_ids_to_titles[milestone_json["iid"]] = milestone_json["title"]
issue_jsons = []
with open(export_dir_path / "issues.ndjson") as issues_file:
for line in issues_file:
issue_jsons.append(json.loads(line))
if skip_confidential:
issue_jsons = [x for x in issue_jsons if not x.get("confidential")]
for issue_json in issue_jsons:
issue_json["notes"] = [
n for n in issue_json["notes"] if not n.get("confidential")
]
elif not include_confidential:
have_confidential_issues = any(x.get("confidential") for x in issue_jsons)
have_confidential_notes = any(
n.get("confidential") for x in issue_jsons for n in x["notes"]
)
confidential_types = []
if have_confidential_issues:
confidential_types.append("issues")
if have_confidential_notes:
confidential_types.append("notes")
assert not (have_confidential_issues or have_confidential_notes), (
f"Found confidential {' and '.join(confidential_types)}; please "
f"decide whether these should all be included, then pass either "
f"--include-confidential or --skip-confidential, or edit "
f"issues.ndjson for more fine-grained control."
)
issue_jsons.sort(key=lambda x: x["iid"])
max_issue_id = max(x["iid"] for x in issue_jsons)
present_issue_id_set = {x["iid"] for x in issue_jsons}
missing_issue_ids = set(range(1, max_issue_id + 1)) - present_issue_id_set
if missing_issue_ids and not (skip_missing_issues or create_missing_issues):
if skip_confidential:
because_confidential_msg = (
" (possibly because some confidential issues were excluded)"
)
else:
because_confidential_msg = ""
raise RuntimeError(
f"Don't have all issues from 1 to {max_issue_id}{because_confidential_msg}, "
f"please pass --create-missing-issues or --skip-missing-issues to proceed."
)
issues_by_id = {}
for issue_json in issue_jsons:
issues_by_id[issue_json["iid"]] = issue_json
# Need to sort notes by date, they seem to come unsorted.
for issue_json in issue_jsons:
issue_json["notes"].sort(key=lambda x: x["created_at"])
log.info("Creating tickets.")
issue_id_map: dict[int, int] = {}
# While we're creating tickets, we can't just loop over the sorted
# issue_jsons. We have to loop over potential issue IDs and handle any that
# are missing as well.
for gitlab_issue_id in range(1, max_issue_id + 1):
if gitlab_issue_id not in issues_by_id:
if create_missing_issues:
file_missing_ticket(
smtp=smtp,
delay=delay,
mode=mode,
tracker=tracker,
frm=frm,
issue_id=gitlab_issue_id,
)
elif not skip_missing_issues:
raise RuntimeError(
f"Internal error, don't know what to do with missing "
f"issue ID {gitlab_issue_id}."
)
continue
issue_json = issues_by_id[gitlab_issue_id]
author_id = issue_json["author_id"]
created_by: Optional[str]
if user_ids_to_names is None:
created_by = None
elif author_id in user_ids_to_names:
created_by = user_ids_to_names[author_id]
else:
assert skip_unknown_users, (
f"Unknown author #{author_id} of ticket #{gitlab_issue_id}, "
f"please add to the users file."
)
created_by = None
srht_issue_id = open_ticket(
smtp=smtp,
delay=delay,
mode=mode,
tracker=tracker,
frm=frm,
title=issue_json["title"],
body=issue_json["description"],
created_by=created_by,
created_at=issue_json["created_at"],
closed_at=issue_json["closed_at"],
is_closed=(issue_json["state"] == "closed"),
is_confidential=(issue_json.get("confidential") is True),
labels=list(issue_json["label_links"]),
milestone_name=issue_json.get("milestone", {}).get("title") or None,
gitlab_ticket_url=f"{gitlab_project_url}/-/issues/{gitlab_issue_id}",
)
if not skip_missing_issues:
assert srht_issue_id == gitlab_issue_id, (
f"Internal error, srht_issue_id {srht_issue_id} != "
f"gitlab_issue_id {gitlab_issue_id} "
f"(skip_missing_issues={skip_missing_issues}, "
f"create_missing_issues={create_missing_issues})."
)
issue_id_map[gitlab_issue_id] = srht_issue_id
log.info("Creating comments.")
for issue_json in issue_jsons:
for note_json in issue_json["notes"]:
system_action = note_json.get("system_note_metadata", {}).get(
"action", None
)
body = note_json["note"]
# The "Removed" part is a guess here, don't know if that actually shows up.
if label_ids_to_names is not None and (
system_action == "label"
or re.search(r"^(Added|Removed) ~[0-9]+ label", body)
):
def expand_label(ref):
ref_num = int(ref.group(1))
if ref_num in label_ids_to_names:
return label_ids_to_names[ref_num]
assert (
skip_unknown_labels
), f"Unknown label #{ref_num}, please add to the labels file."
return ref.group(0) # Return the original "~id" string.
body = re.sub(r"~([0-9]+)", expand_label, body)
if system_action == "milestone" or re.search(
r"^Milestone changed to %[0-9]+$", body
):
def expand_milestone(ref):
ref_num = int(ref.group(1))
assert (
ref_num in milestone_ids_to_titles
), f"Unknown milestone #{ref_num}."
return milestone_ids_to_titles[ref_num]
body = re.sub(r"%([0-9]+)", expand_milestone, body)
send_comment(
smtp=smtp,
delay=delay,
mode=mode,
tracker=tracker,
frm=frm,
issue_id=issue_id_map[issue_json["iid"]],
body=body,
author_name=note_json["author"]["name"],
created_at=note_json["created_at"],
last_edited_at=note_json["last_edited_at"],
is_system=note_json["system"],
is_confidential=(note_json["confidential"] is True),
)
log.info("Closing closed issues.")
for issue_json in issue_jsons:
if issue_json["state"] == "closed":
close_ticket(
smtp=smtp,
delay=delay,
mode=mode,
tracker=tracker,
frm=frm,
issue_id=issue_id_map[issue_json["iid"]],
closed_at=issue_json["closed_at"],
is_closed=(issue_json["state"] == "closed"),
)
log.info("Delayed closing issues.")
for ticket in tickets_to_be_closed:
close_ticket(
smtp=ticket[0],
delay=ticket[1],
mode=ticket[2],
tracker=ticket[3],
frm=ticket[4],
issue_id=ticket[5],
closed_at=None,
is_closed=False,
)
def parse_args(args_in: list[str]) -> dict[str, Any]:
parser = argparse.ArgumentParser(
prog="import_issues.py",
description="Import Gitlab issues into Sourcehut via SMTP.",
)
parser.add_argument(
"--srht-owner",
required=True,
help="Owner of the Sorucehut tracker.",
)
parser.add_argument(
"--srht-tracker",
required=True,
help="Name of Sourcehut tracker to submit to.",
)
parser.add_argument(
"--gitlab-project-url",
required=True,
help="The base URL the project on Gitlab.",
)
parser.add_argument(
"--mode",
choices=["print", "send", "hut"],
default="print",
help="Action to take.",
)
parser.add_argument(
"--from",
help="From address if mode is 'send'.",
)
parser.add_argument(
"--smtp-host",
help="SMTP host to use.",
)
parser.add_argument(
"--smtp-port",
default=None,
type=int,
help="SMTP port to use.",
)
parser.add_argument(
"--smtp-ssl",
action="store_true",
help="Use SMTP over SSL.",
)
parser.add_argument(
"--smtp-starttls",
action="store_true",
help="Use STARTTLS.",
)
parser.add_argument(
"--smtp-user",
help="SMTP username.",
)
parser.add_argument(
"--smtp-password",
help="SMTP password.",
)
parser.add_argument(
"--delay",
default=None,
help="Decimal number of seconds to wait between accessing the server.",
)
parser.add_argument(
"--labels-file",
help="CSV file mapping label IDs to names.",
)
parser.add_argument(
"--skip-labels",
action="store_true",
help="Skip mapping label IDs to names.",
)
parser.add_argument(
"--skip-unknown-labels",
action="store_true",
help="Skip mapping labels that aren't in the labels file.",
)
parser.add_argument(
"--users-file",
help="CSV file mapping user IDs to names.",
)
parser.add_argument(
"--skip-users",
action="store_true",
help="Skip mapping user IDs to names.",
)
parser.add_argument(
"--skip-unknown-users",
action="store_true",
help="Skip mapping users that aren't in the users file.",
)
parser.add_argument(
"--skip-missing-issues",
action="store_true",
help="Skip missing Gitlab issue IDs; GL and sr.ht IDs will not match.",
)
parser.add_argument(
"--create-missing-issues",
action="store_true",
help="Create missing GL issues in sr.ht to make issue IDs match.",
)
parser.add_argument(
"--include-confidential",
action="store_true",
help="Include confidential tickets and notes.",
)
parser.add_argument(
"--skip-confidential",
action="store_true",
help="Skip confidential tickets and notes.",
)
parser.add_argument(
"export_dir",
help="Exported Gitlab tree/project/ directory containing ndjson files.",
)
return vars(parser.parse_args(args_in))
def process_smtp(args: dict[str, Any]) -> Optional[smtplib.SMTP]:
if args["mode"] != "send":
return None
else:
smtp_ssl = args["smtp_ssl"]
smtp_starttls = args["smtp_starttls"]
smtp_host = args["smtp_host"] or os.environ.get("SMTP_HOST", "localhost")
smtp_port = int(
args["smtp_port"] or os.environ.get("SMTP_PORT", 465 if smtp_ssl else 25)
)
smtp_user = args["smtp_user"] or os.environ.get("SMTP_USER", None)
smtp_password = args["smtp_password"] or os.environ.get("SMTP_PASSWORD", None)
assert smtp_user, "No SMTP user given."
assert smtp_password, "No SMTP password given."
log.info("Connecting to %s:%d, user %r.", smtp_host, smtp_port, smtp_user)
if smtp_ssl:
smtp: smtplib.SMTP = smtplib.SMTP_SSL(host=smtp_host, port=smtp_port)
else:
smtp = smtplib.SMTP(host=smtp_host, port=smtp_port)
# If SMTP isn't working:
# smtp.set_debuglevel(2)
if smtp_starttls:
smtp.starttls()
if smtp_user:
smtp.login(smtp_user, smtp_password)
return smtp
def main():
args = parse_args(sys.argv[1:])
export_dir = args["export_dir"]
assert export_dir, "Must have a exported project directory."
export_dir_path = Path(export_dir)
assert (
export_dir_path.is_dir()
), f"Project directory is not a directory: {export_dir_path}"
mode = args["mode"]
frm = args["from"]
labels_file = args["labels_file"]
skip_labels = args["skip_labels"]
skip_unknown_labels = args["skip_unknown_labels"]
assert (
labels_file or skip_labels
), "One of --labels-file or --skip-labels must be provided."
users_file = args["users_file"]
skip_users = args["skip_users"]
skip_unknown_users = args["skip_unknown_users"]
assert (
skip_users or users_file
), "One of --users-file or --skip-users must be provided."
skip_missing_issues = args["skip_missing_issues"]
create_missing_issues = args["create_missing_issues"]
assert not (
skip_missing_issues and create_missing_issues
), "Can accept at most one of --skip-missing-issues and --create-missing-issues."
include_confidential = args["include_confidential"]
skip_confidential = args["skip_confidential"]
assert not (
include_confidential and skip_confidential
), "Can accept at most one of --include-confidential and --skip-confidential."
srht_owner = args["srht_owner"]
srht_tracker = args["srht_tracker"]
tracker = f"~{srht_owner}/{srht_tracker}"
delay = args["delay"]
if delay is None:
if mode == "hut":
delay = 0.5
else:
delay = 5
else:
delay = float(delay)
smtp = process_smtp(args)
try:
run(
smtp=smtp,
delay=delay,
mode=mode,
tracker=tracker,
frm=frm,
export_dir_path=export_dir_path,
gitlab_project_url=args["gitlab_project_url"].rstrip("/"),
labels_file_path=None if skip_labels else Path(labels_file),
skip_unknown_labels=skip_unknown_labels,
users_file_path=None if skip_users else Path(users_file),
skip_unknown_users=skip_unknown_users,
skip_missing_issues=skip_missing_issues,
create_missing_issues=create_missing_issues,
include_confidential=include_confidential,
skip_confidential=skip_confidential,
)
finally:
if mode == "send":
smtp.quit()
if __name__ == "__main__":
main()