#!/usr/bin/env python3
# Copyright 2024 Bryan Gardiner <bog@khumba.net>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# Imports issues from an exported Gitlab project into a Sourcehut tracker.
#
# Reads *.ndjson files from an exported Gitlab project, and recreates tickets
# and their histories in a new, empty Sourcehut tracker. Preserves notes
# attached to each issue; generally this includes comments, status changes,
# labels, milestones, and anything else that is included in as a plain text
# note, but definitely doesn't include all available metadata.
#
# Tickets are created in sr.ht via SMTP, so a working mail setup is required.
# Surely using Sourcehut's API would be better.
#
# There are a few caveats:
#
# 1. If all issue IDs from 1 to the max ID are available in your export, and the
# tracker you import into is a new tracker, then your Gitlab and Sourcehut issue
# IDs will match up one-to-one, and mentions of one ticket from another will
# work. If not, you need to decide how you want to handle this. You can choose
# to create empty Sourcehut tickets for the missing Gitlab issues so that IDs
# still match, by passing --create-missing-issues. Blank issues will be created
# then closed. Alternatively, you can pass --skip-missing-issues to not create
# any extra Sourcehut tickets, but IDs will not line up. If one of these issues
# is needed, this program will tell you.
#
# 2. Because emails are used to create tickets, we also assume that emails are
# processed in the order that they are sent, so that tickets don't get created
# out of order. This program has no way of knowing if that happens, however,
# there is a configurable delay between sending each email, for this reason.
#
# 3. Gitlab project exports are missing some crucial information, in particular
# they don't include ticket author names or label IDs. For best results,
# appropriate mappings for your project can be provided manually in CSV files to
# --labels-file and --users-file. These CSV files should be headerless, and
# each row should contain a label or user ID, followed by the name for that
# entity. If you want to skip these, then --skip-labels and --skip-users must
# be passed. Some label and user info will still be included, but label
# references in comments and issue creator names will be missing. You can run
# with incomplete files by passing --skip-unknown-labels or
# --skip-unknown-users.
#
# 4. If your project has confidential issues or comments in it, then you will
# need to decide to exclude them with --skip-confidential, or include them all
# with --include-confidential. If there are confidential items and you don't
# pass either of these options, then an exception will be thrown. If you need
# more fine-grained control over confidential items, edit issues.ndjson by hand.
#
# 5. The projects I have tested this on are small, and don't make use of many of
# Gitlab's features. This may bork on more complex projects.
# Still here? Here's how to use this:
#
# First take an export of your Gitlab project from its settings area, then
# extract the archive. The important files are tree/project/*.ndjson.
#
# Let's generate a report of all the emails that would be sent. Preview the
# output to make sure things look right, and ensure that the command completes
# without error:
#
# touch labels.csv users.csv # First create these empty files.
#
# ./import_issues.py \
# --srht-owner=MY_SRHT_USER \
# --srht-tracker=MY_SRHT_TRACKER \
# --gitlab-project-url=https://gitlab.com/ME/PROJECT/ \
# --from='Moi <me@email.com>' \
# --labels-file=labels.csv \
# --users-file=users.csv \
# .../gitlab-export/tree/project \
# >issue-emails.txt
#
# You may get errors if you are missing label or user mappings, and you haven't
# disabled these; add them to the labels.csv or users.csv until you get no more
# errors:
#
# labels.csv:
# 123456,Bug
# 232323,Feature
# ...
#
# users.csv:
# 1234000,John Joe (@jdoe)
# ...
#
# If the issue-emails.txt file looks correct, then you can proceed with sending
# emails. Double-check that your tracket is empty to start with, then rerun the
# command with "--mode=send" and with your SMTP parameters. SMTP options can be
# specified either via parameters --smtp-{host,port,user,password} or the
# equivalent SMTP_{HOST,PORT,USER,PASSWORD} environment variables. Pass
# --smtp-ssl to enable SSL. Also by default there is a five-second delay
# between sending emails, that you may wish to change with --smtp-delay.
#
# ./import_issues.py \
# --srht-owner=MY_SRHT_USER \
# --srht-tracker=MY_SRHT_TRACKER \
# --gitlab-project-url=https://gitlab.com/ME/PROJECT/ \
# --from='Moi <me@email.com>' \
# --labels-file=labels.csv \
# --users-file=users.csv \
# --smtp-host=SMTP_HOSTNAME \
# --smtp-ssl \
# --smtp-user=SMTP_USERNAME \
# --smtp-password=SMTP_PASSWORD \
# .../gitlab-export/tree/project
import argparse
import csv
import logging
import json
import os
import re
import smtplib
import time
from email.message import EmailMessage
from email.utils import format_datetime, make_msgid
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional
ID_RE = re.compile(r'^[0-9]+$')
logging.basicConfig(
format='%(levelname)s:%(funcName)s:%(message)s',
level=logging.DEBUG,
)
log = logging.getLogger()
email_count = 0
issue_count = 0
def read_id_map_file(file_path: Path) -> Dict[int, str]:
"""Reads a CSV file with ID,NAME mappings and returns the resulting dict."""
result: Dict[int, str] = {}
with open(file_path, newline='') as fh:
reader = csv.reader(fh)
line_num = 0
for row in reader:
line_num += 1
assert (
len(row) == 2 and ID_RE.search(row[0]) and row[1]
), f"Row {line_num} of {file_path} is not in the form <ID>,<NAME>: {row!r}"
new_id = int(row[0])
assert (
new_id not in result
), f"ID {new_id} appears multiple times in {file_path}."
result[new_id] = row[1]
return result
def do_mail(
*,
smtp,
smtp_delay: float,
mode: str,
frm: str,
to: str,
body: str,
subject: Optional[str] = None,
):
global email_count
email_count += 1
log.info(f"---- #{email_count}")
date = format_datetime(datetime.now(timezone.utc))
msg_id = make_msgid()
if mode == "print":
print(f"From: {frm}")
print(f"To: {to}")
print(f"Date: {date}")
if subject:
print(f"Subject: {subject}")
print(f"Message-ID: {msg_id}")
print()
print(body)
elif mode == "send":
msg = EmailMessage()
msg.set_content(body)
msg["From"] = frm
msg["To"] = to
msg["Date"] = date
if subject:
msg["Subject"] = subject
# Message-ID is required, unless you want this error message from the
# sr.ht mail server:
#
# 500 Error: (AttributeError) 'NoneType' object has no attribute
# 'removeprefix' (in reply to end of DATA command)
msg["Message-ID"] = msg_id
smtp.send_message(msg)
time.sleep(smtp_delay)
else:
raise RuntimeError(f"Unknown mode: {mode!r}")
def open_ticket(
*,
smtp,
smtp_delay: float,
mode: str,
srht_owner: str,
srht_tracker: str,
frm: str,
title: str,
body: str,
created_by: Optional[str],
created_at: str,
closed_at: Optional[str],
is_closed: bool,
is_confidential: bool,
label_names: List[str],
milestone_name: Optional[str],
gitlab_ticket_url: str,
) -> int:
global issue_count
lines = []
pheaders = []
pheaders.append(f"Migrated from: {gitlab_ticket_url}")
if created_by:
pheaders.append(f"Created by: {created_by}")
pheaders.append(f"Created at: {created_at}")
if closed_at is not None:
pheaders.append(f"Closed at: {closed_at}")
elif is_closed:
pheaders.append("State: closed")
if milestone_name:
pheaders.append(f"Milestone: {milestone_name}")
if label_names:
pheaders.append("Labels: " + ", ".join(sorted(label_names)))
if is_confidential:
pheaders.append("Confidential: true")
lines.append(" \\\n".join(pheaders))
lines.append("")
lines.append(body)
do_mail(
smtp=smtp,
smtp_delay=smtp_delay,
mode=mode,
frm=frm,
to=f"~{srht_owner}/{srht_tracker}@todo.sr.ht",
subject=title,
body="\n".join(lines),
)
issue_count += 1
return issue_count
def file_missing_ticket(
*,
smtp,
smtp_delay: float,
mode: str,
srht_owner: str,
srht_tracker: str,
frm: str,
issue_id: int,
):
global issue_count
do_mail(
smtp=smtp,
smtp_delay=smtp_delay,
mode=mode,
frm=frm,
to=f"~{srht_owner}/{srht_tracker}@todo.sr.ht",
subject="Missing issue",
body=f"Issue {issue_id} is not known.",
)
issue_count += 1
# TODO Send these emails at the end, so that there isn't such a need for the
# previous issue to be processed promptly.
close_ticket(
smtp=smtp,
smtp_delay=smtp_delay,
mode=mode,
srht_owner=srht_owner,
srht_tracker=srht_tracker,
frm=frm,
issue_id=issue_count,
closed_at=None,
is_closed=False, # Save one line of text.
)
def send_comment(
*,
smtp,
smtp_delay: float,
mode: str,
srht_owner: str,
srht_tracker: str,
frm: str,
issue_id: int,
body: str,
author_name: str,
created_at: str,
last_edited_at: str,
is_system: bool,
is_confidential: bool,
):
lines = []
pheaders = []
# Pseudo-headers, if any.
if is_confidential:
pheaders.append("Confidential: true")
if pheaders:
lines.append(" \\\n".join(pheaders))
lines.append("")
# Authorship note for a regular comment.
if is_system:
lines.append(f"Changed on {created_at} by {author_name}:")
else:
lines.append(f"On {created_at}, {author_name} wrote:")
lines.append("")
lines.append(body)
if last_edited_at and last_edited_at != created_at:
lines.append("")
lines.append(f"(Last edited at {last_edited_at}.)")
do_mail(
smtp=smtp,
smtp_delay=smtp_delay,
mode=mode,
frm=frm,
to=f"~{srht_owner}/{srht_tracker}/{issue_id}@todo.sr.ht",
body="\n".join(lines),
)
def close_ticket(
*,
smtp,
smtp_delay: float,
mode: str,
srht_owner: str,
srht_tracker: str,
frm: str,
issue_id: int,
closed_at: Optional[str],
is_closed: bool,
):
lines = []
if closed_at is not None:
# (Skipping pseudoheaders array here, only have one.)
lines.append(f"Closed at: {closed_at}")
elif is_closed:
lines.append("Ticket closed.")
lines.append("")
lines.append("!resolve fixed")
do_mail(
smtp=smtp,
smtp_delay=smtp_delay,
mode=mode,
frm=frm,
to=f"~{srht_owner}/{srht_tracker}/{issue_id}@todo.sr.ht",
body="\n".join(lines),
)
def run(
*,
smtp,
smtp_delay: float,
mode: str,
srht_owner: str,
srht_tracker: str,
frm: str,
export_dir_path: Path,
gitlab_project_url: str,
labels_file_path: Optional[Path],
skip_unknown_labels: bool,
users_file_path: Optional[Path],
skip_unknown_users: bool,
skip_missing_issues: bool,
create_missing_issues: bool,
include_confidential: bool,
skip_confidential: bool,
):
label_ids_to_names: Optional[Dict[int, str]] = (
read_id_map_file(labels_file_path)
if labels_file_path
else None
)
user_ids_to_names: Optional[Dict[int, str]] = (
read_id_map_file(users_file_path) if users_file_path else None
)
# TODO Might be able to automatically map note.events.author_id to
# note.author.name for a subset of relevant users.
milestone_jsons = []
with open(
export_dir_path / 'milestones.ndjson'
) as milestones_file:
for line in milestones_file:
milestone_jsons.append(json.loads(line))
milestone_ids_to_titles = {}
for milestone_json in milestone_jsons:
milestone_ids_to_titles[milestone_json['iid']] = (
milestone_json['title']
)
issue_jsons = []
with open(export_dir_path / 'issues.ndjson') as issues_file:
for line in issues_file:
issue_jsons.append(json.loads(line))
if skip_confidential:
issue_jsons = [
x for x in issue_jsons if not x.get('confidential')
]
for issue_json in issue_jsons:
issue_json['notes'] = [
n
for n in issue_json['notes']
if not n.get('confidential')
]
elif not include_confidential:
have_confidential_issues = any(
x.get('confidential') for x in issue_jsons
)
have_confidential_notes = any(
n.get('confidential')
for x in issue_jsons
for n in x['notes']
)
confidential_types = []
if have_confidential_issues:
confidential_types.append('issues')
if have_confidential_notes:
confidential_types.append('notes')
assert not (
have_confidential_issues or have_confidential_notes
), (
f"Found confidential {' and '.join(confidential_types)}; please "
f"decide whether these should all be included, then pass either "
f"--include-confidential or --skip-confidential, or edit "
f"issues.ndjson for more fine-grained control."
)
issue_jsons.sort(key=lambda x: x['iid'])
max_issue_id = max(x['iid'] for x in issue_jsons)
present_issue_id_set = {x['iid'] for x in issue_jsons}
missing_issue_ids = (
set(range(1, max_issue_id + 1)) - present_issue_id_set
)
if missing_issue_ids and not (
skip_missing_issues or create_missing_issues
):
if skip_confidential:
because_confidential_msg = " (possibly because some confidential issues were excluded)"
else:
because_confidential_msg = ""
raise RuntimeError(
f"Don't have all issues from 1 to {max_issue_id}{because_confidential_msg}, "
f"please pass --create-missing-issues or --skip-missing-issues to proceed."
)
issues_by_id = {}
for issue_json in issue_jsons:
issues_by_id[issue_json['iid']] = issue_json
# Need to sort notes by date, they seem to come unsorted.
for issue_json in issue_jsons:
issue_json['notes'].sort(key=lambda x: x['created_at'])
log.info("-------- CREATING TICKETS")
issue_id_map: Dict[int, int] = {}
# While we're creating tickets, we can't just loop over the sorted
# issue_jsons. We have to loop over potential issue IDs and handle any that
# are missing as well.
for gitlab_issue_id in range(1, max_issue_id + 1):
if gitlab_issue_id not in issues_by_id:
if create_missing_issues:
file_missing_ticket(
smtp=smtp,
smtp_delay=smtp_delay,
mode=mode,
srht_owner=srht_owner,
srht_tracker=srht_tracker,
frm=frm,
issue_id=gitlab_issue_id,
)
elif not skip_missing_issues:
raise RuntimeError(
f"Internal error, don't know what to do with missing "
f"issue ID {gitlab_issue_id}."
)
continue
issue_json = issues_by_id[gitlab_issue_id]
author_id = issue_json['author_id']
created_by: Optional[str]
if user_ids_to_names is None:
created_by = None
elif author_id in user_ids_to_names:
created_by = user_ids_to_names[author_id]
else:
assert skip_unknown_users, (
f"Unknown author #{author_id} of ticket #{gitlab_issue_id}, "
f"please add to the users file."
)
created_by = None
srht_issue_id = open_ticket(
smtp=smtp,
smtp_delay=smtp_delay,
mode=mode,
srht_owner=srht_owner,
srht_tracker=srht_tracker,
frm=frm,
title=issue_json['title'],
body=issue_json['description'],
created_by=created_by,
created_at=issue_json['created_at'],
closed_at=issue_json['closed_at'],
is_closed=(issue_json['state'] == 'closed'),
is_confidential=(issue_json.get('confidential') is True),
label_names=[
x['label']['title'] for x in issue_json['label_links']
],
milestone_name=issue_json.get('milestone', {}).get(
'title'
)
or None,
gitlab_ticket_url=f"{gitlab_project_url}/-/issues/{gitlab_issue_id}",
)
if not skip_missing_issues:
assert srht_issue_id == gitlab_issue_id, (
f"Internal error, srht_issue_id {srht_issue_id} != "
f"gitlab_issue_id {gitlab_issue_id} "
f"(skip_missing_issues={skip_missing_issues}, "
f"create_missing_issues={create_missing_issues})."
)
issue_id_map[gitlab_issue_id] = srht_issue_id
log.info("-------- CREATING COMMENTS")
for issue_json in issue_jsons:
for note_json in issue_json['notes']:
system_action = note_json.get(
'system_note_metadata', {}
).get('action', None)
body = note_json['note']
# The "Removed" part is a guess here, don't know if that actually shows up.
if label_ids_to_names is not None and (
system_action == 'label'
or re.search(r'^(Added|Removed) ~[0-9]+ label', body)
):
def expand_label(ref):
ref_num = int(ref.group(1))
if ref_num in label_ids_to_names:
return label_ids_to_names[ref_num]
assert (
skip_unknown_labels
), f"Unknown label #{ref_num}, please add to the labels file."
return ref.group(
0
) # Return the original "~id" string.
body = re.sub(r'~([0-9]+)', expand_label, body)
if system_action == 'milestone' or re.search(
r'^Milestone changed to %[0-9]+$', body
):
def expand_milestone(ref):
ref_num = int(ref.group(1))
assert (
ref_num in milestone_ids_to_titles
), f"Unknown milestone #{ref_num}."
return milestone_ids_to_titles[ref_num]
body = re.sub(r'%([0-9]+)', expand_milestone, body)
send_comment(
smtp=smtp,
smtp_delay=smtp_delay,
mode=mode,
srht_owner=srht_owner,
srht_tracker=srht_tracker,
frm=frm,
issue_id=issue_id_map[issue_json['iid']],
body=body,
author_name=note_json['author']['name'],
created_at=note_json['created_at'],
last_edited_at=note_json['last_edited_at'],
is_system=note_json['system'],
is_confidential=(note_json['confidential'] is True),
)
log.info("-------- CLOSING CLOSED ISSUES")
for issue_json in issue_jsons:
if issue_json['state'] == 'closed':
close_ticket(
smtp=smtp,
smtp_delay=smtp_delay,
mode=mode,
srht_owner=srht_owner,
srht_tracker=srht_tracker,
frm=frm,
issue_id=issue_id_map[issue_json['iid']],
closed_at=issue_json['closed_at'],
is_closed=(issue_json['state'] == 'closed'),
)
def main():
parser = argparse.ArgumentParser(
prog='import_issues.py',
description='Import Gitlab issues into Sourcehut via SMTP.',
)
parser.add_argument(
'--srht-owner',
required=True,
help='Owner of the Sorucehut tracker.',
)
parser.add_argument(
'--srht-tracker',
required=True,
help='Name of Sourcehut tracker to submit to.',
)
parser.add_argument(
'--gitlab-project-url',
required=True,
help="The base URL the project on Gitlab.",
)
parser.add_argument(
'--mode',
default='print',
help="Action to take, 'print' or 'send'.",
)
parser.add_argument(
'--from',
help="From address if mode is 'send'.",
)
parser.add_argument(
'--smtp-host',
help="SMTP host to use.",
)
parser.add_argument(
'--smtp-port',
default=None,
help="SMTP port to use.",
)
parser.add_argument(
'--smtp-ssl',
action='store_true',
help="Use SMTP over SSL.",
)
parser.add_argument(
'--smtp-starttls',
action='store_true',
help="Use STARTTLS.",
)
parser.add_argument(
'--smtp-user',
help="SMTP username.",
)
parser.add_argument(
'--smtp-password',
help="SMTP password.",
)
parser.add_argument(
'--smtp-delay',
default=5,
help="Decimal number of seconds to wait after sending each email.",
)
parser.add_argument(
'--labels-file',
help="CSV file mapping label IDs to names.",
)
parser.add_argument(
'--skip-labels',
action='store_true',
help="Skip mapping label IDs to names.",
)
parser.add_argument(
'--skip-unknown-labels',
action='store_true',
help="Skip mapping labels that aren't in the labels file.",
)
parser.add_argument(
'--users-file',
help="CSV file mapping user IDs to names.",
)
parser.add_argument(
'--skip-users',
action='store_true',
help="Skip mapping user IDs to names.",
)
parser.add_argument(
'--skip-unknown-users',
action='store_true',
help="Skip mapping users that aren't in the users file.",
)
parser.add_argument(
'--skip-missing-issues',
action='store_true',
help="Skip missing Gitlab issue IDs; GL and sr.ht IDs will not match.",
)
parser.add_argument(
'--create-missing-issues',
action='store_true',
help="Create missing GL issues in sr.ht to make issue IDs match.",
)
parser.add_argument(
'--include-confidential',
action='store_true',
help="Include confidential tickets and notes.",
)
parser.add_argument(
'--skip-confidential',
action='store_true',
help="Skip confidential tickets and notes.",
)
parser.add_argument(
'export_dir',
help='Exported Gitlab tree/project/ directory containing ndjson files.',
)
args = vars(parser.parse_args())
export_dir = args['export_dir']
assert export_dir, f"Must have a exported project directory."
export_dir_path = Path(export_dir)
assert (
export_dir_path.is_dir()
), f"Project directory is not a directory: {export_dir_path}"
mode = args['mode']
frm = args['from']
labels_file = args['labels_file']
skip_labels = args['skip_labels']
skip_unknown_labels = args['skip_unknown_labels']
assert (
labels_file or skip_labels
), f"One of --labels-file or --skip-labels must be provided."
users_file = args['users_file']
skip_users = args['skip_users']
skip_unknown_users = args['skip_unknown_users']
assert (
skip_users or users_file
), f"One of --users-file or --skip-users must be provided."
skip_missing_issues = args['skip_missing_issues']
create_missing_issues = args['create_missing_issues']
assert not (
skip_missing_issues and create_missing_issues
), f"Can accept at most one of --skip-missing-issues and --create-missing-issues."
include_confidential = args['include_confidential']
skip_confidential = args['skip_confidential']
assert not (
include_confidential and skip_confidential
), f"Can accept at most one of --include-confidential and --skip-confidential."
if mode == 'print':
smtp = None
elif mode == 'send':
smtp_ssl = args['smtp_ssl']
smtp_starttls = args['smtp_starttls']
smtp_host = args['smtp_host'] or os.environ.get(
'SMTP_HOST', 'localhost'
)
smtp_port = args['smtp_port'] or os.environ.get(
'SMTP_PORT', 465 if smtp_ssl else 25
)
smtp_user = args['smtp_user'] or os.environ.get(
'SMTP_USER', None
)
smtp_password = args['smtp_password'] or os.environ.get(
'SMTP_PASSWORD', None
)
assert smtp_user, f"No SMTP user given."
assert smtp_password, f"No SMTP password given."
log.info(
f"Connecting to {smtp_host}:{smtp_port}, user {smtp_user!r}."
)
if smtp_ssl:
smtp = smtplib.SMTP_SSL(host=smtp_host, port=smtp_port)
else:
smtp = smtplib.SMTP(host=smtp_host, port=smtp_port)
# If SMTP isn't working:
# smtp.set_debuglevel(2)
if smtp_starttls:
smtp.starttls()
if smtp_user:
smtp.login(smtp_user, smtp_password)
run(
smtp=smtp,
smtp_delay=float(args['smtp_delay']),
mode=mode,
srht_owner=args['srht_owner'],
srht_tracker=args['srht_tracker'],
frm=frm,
export_dir_path=export_dir_path,
gitlab_project_url=args['gitlab_project_url'].rstrip('/'),
labels_file_path=None if skip_labels else Path(labels_file),
skip_unknown_labels=skip_unknown_labels,
users_file_path=None if skip_users else Path(users_file),
skip_unknown_users=skip_unknown_users,
skip_missing_issues=skip_missing_issues,
create_missing_issues=create_missing_issues,
include_confidential=include_confidential,
skip_confidential=skip_confidential,
)
if mode == 'send':
smtp.quit()
if __name__ == '__main__':
main()