#!/usr/bin/env python3 # Copyright 2024 Bryan Gardiner # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # Imports issues from an exported Gitlab project into a Sourcehut tracker. # # Reads *.ndjson files from an exported Gitlab project, and recreates tickets # and their histories in a new, empty Sourcehut tracker. Preserves notes # attached to each issue; generally this includes comments, status changes, # labels, milestones, and anything else that is included in as a plain text # note, but definitely doesn't include all available metadata. # # Tickets are created in sr.ht via SMTP, so a working mail setup is required. # Surely using Sourcehut's API would be better. # # There are a few caveats: # # 1. If all issue IDs from 1 to the max ID are available in your export, and the # tracker you import into is a new tracker, then your Gitlab and Sourcehut issue # IDs will match up one-to-one, and mentions of one ticket from another will # work. If not, you need to decide how you want to handle this. You can choose # to create empty Sourcehut tickets for the missing Gitlab issues so that IDs # still match, by passing --create-missing-issues. Blank issues will be created # then closed. Alternatively, you can pass --skip-missing-issues to not create # any extra Sourcehut tickets, but IDs will not line up. If one of these issues # is needed, this program will tell you. # # 2. Because emails are used to create tickets, we also assume that emails are # processed in the order that they are sent, so that tickets don't get created # out of order. This program has no way of knowing if that happens, however, # there is a configurable delay between sending each email, for this reason. # # 3. Gitlab project exports are missing some crucial information, in particular # they don't include ticket author names or label IDs. For best results, # appropriate mappings for your project can be provided manually in CSV files to # --labels-file and --users-file. These CSV files should be headerless, and # each row should contain a label or user ID, followed by the name for that # entity. If you want to skip these, then --skip-labels and --skip-users must # be passed. Some label and user info will still be included, but label # references in comments and issue creator names will be missing. You can run # with incomplete files by passing --skip-unknown-labels or # --skip-unknown-users. # # 4. If your project has confidential issues or comments in it, then you will # need to decide to exclude them with --skip-confidential, or include them all # with --include-confidential. If there are confidential items and you don't # pass either of these options, then an exception will be thrown. If you need # more fine-grained control over confidential items, edit issues.ndjson by hand. # # 5. The projects I have tested this on are small, and don't make use of many of # Gitlab's features. This may bork on more complex projects. # Still here? Here's how to use this: # # First take an export of your Gitlab project from its settings area, then # extract the archive. The important files are tree/project/*.ndjson. # # Let's generate a report of all the emails that would be sent. Preview the # output to make sure things look right, and ensure that the command completes # without error: # # touch labels.csv users.csv # First create these empty files. # # ./import_issues.py \ # --srht-owner=MY_SRHT_USER \ # --srht-tracker=MY_SRHT_TRACKER \ # --gitlab-project-url=https://gitlab.com/ME/PROJECT/ \ # --from='Moi ' \ # --labels-file=labels.csv \ # --users-file=users.csv \ # .../gitlab-export/tree/project \ # >issue-emails.txt # # You may get errors if you are missing label or user mappings, and you haven't # disabled these; add them to the labels.csv or users.csv until you get no more # errors: # # labels.csv: # 123456,Bug # 232323,Feature # ... # # users.csv: # 1234000,John Joe (@jdoe) # ... # # If the issue-emails.txt file looks correct, then you can proceed with sending # emails. Double-check that your tracket is empty to start with, then rerun the # command with "--mode=send" and with your SMTP parameters. SMTP options can be # specified either via parameters --smtp-{host,port,user,password} or the # equivalent SMTP_{HOST,PORT,USER,PASSWORD} environment variables. Pass # --smtp-ssl to enable SSL. Also by default there is a five-second delay # between sending emails, that you may wish to change with --delay. # # ./import_issues.py \ # --srht-owner=MY_SRHT_USER \ # --srht-tracker=MY_SRHT_TRACKER \ # --gitlab-project-url=https://gitlab.com/ME/PROJECT/ \ # --from='Moi ' \ # --labels-file=labels.csv \ # --users-file=users.csv \ # --smtp-host=SMTP_HOSTNAME \ # --smtp-ssl \ # --smtp-user=SMTP_USERNAME \ # --smtp-password=SMTP_PASSWORD \ # .../gitlab-export/tree/project import argparse import csv import json import logging import os import re import subprocess import smtplib import sys import time from email.message import EmailMessage from email.utils import format_datetime, make_msgid from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional logging.basicConfig( format="%(levelname)s:%(funcName)s:%(message)s", level=logging.DEBUG, stream=sys.stdout, ) ID_RE = re.compile(r"^[0-9]+$") def get_labels(tracker: str) -> list[dict[str, str]]: """ collects labels for your named tracker param: tracker: name of the tracker return: list of all labels in the tracker """ query = ( 'query { me { tracker(name: "' + tracker + '") { labels { results { id, name, foregroundColor, backgroundColor, created } } } }}' ) try: ret = subprocess.run( ["hut", "graphql", "todo", "--stdin"], input=query, text=True, check=True, capture_output=True, ) except subprocess.CalledProcessError as ex: raise RuntimeError( f"hut failed with excitcode {ex.returncode} and stderr:\n{ex.stderr}" ) from ex data = json.loads(ret.stdout) return data["me"]["tracker"]["labels"]["results"] log = logging.getLogger() email_count = 0 issue_count = 0 def read_id_map_file(file_path: Path) -> Dict[int, str]: """Reads a CSV file with ID,NAME mappings and returns the resulting dict.""" result: Dict[int, str] = {} with open(file_path, newline="") as fh: reader = csv.reader(fh) line_num = 0 for row in reader: line_num += 1 assert ( len(row) == 2 and ID_RE.search(row[0]) and row[1] ), f"Row {line_num} of {file_path} is not in the form ,: {row!r}" new_id = int(row[0]) assert ( new_id not in result ), f"ID {new_id} appears multiple times in {file_path}." result[new_id] = row[1] return result def do_mail( *, smtp, delay: float, mode: str, frm: str, to: str, body: str, subject: Optional[str] = None, ): global email_count email_count += 1 if mode == "print": verb = "Printing" elif mode == "send": verb = "Sending" else: raise ValueError(f"Unhandled mode {mode!r}.") log.info("%s email #%d.", verb, email_count) date = format_datetime(datetime.now(timezone.utc)) msg_id = make_msgid() indent = " " if mode == "print": print(f"{indent}From: {frm}") print(f"{indent}To: {to}") print(f"{indent}Date: {date}") if subject: print(f"{indent}Subject: {subject}") print(f"{indent}Message-ID: {msg_id}") print() print(indent + re.sub("\n", f"\n{indent}", body)) print() elif mode == "send": msg = EmailMessage() msg.set_content(body) msg["From"] = frm msg["To"] = to msg["Date"] = date if subject: msg["Subject"] = subject # Message-ID is required, unless you want this error message from the # sr.ht mail server: # # 500 Error: (AttributeError) 'NoneType' object has no attribute # 'removeprefix' (in reply to end of DATA command) msg["Message-ID"] = msg_id smtp.send_message(msg) time.sleep(delay) else: raise RuntimeError(f"Unknown mode: {mode!r}") def run_hut(cmds, tracker, msg, args=None, delay=None): log.debug( f"run_hut: cmds = {cmds}, tracker = {tracker}, args = {args}\n\nmsg:\n{msg}" ) if args is None: args = [] if msg is None: res = subprocess.run( ["hut", "todo"] + cmds + ["-t", tracker] + args, check=True, encoding="utf-8", stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) else: res = subprocess.run( ["hut", "todo"] + cmds + ["-t", tracker, "--stdin"] + args, check=True, encoding="utf-8", input=msg, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) time.sleep(delay) return res def open_ticket( *, smtp, delay: float, mode: str, tracker: str, frm: str, title: str, body: str, created_by: Optional[str], created_at: str, closed_at: Optional[str], is_closed: bool, is_confidential: bool, label_names: List[str], milestone_name: Optional[str], gitlab_ticket_url: str, ) -> int: global issue_count lines = [] pheaders = [] pheaders.append(f"Migrated from: {gitlab_ticket_url}") if created_by: pheaders.append(f"Created by: {created_by}") pheaders.append(f"Created at: {created_at}") if closed_at is not None: pheaders.append(f"Closed at: {closed_at}") elif is_closed: pheaders.append("State: closed") if milestone_name: pheaders.append(f"Milestone: {milestone_name}") if label_names: pheaders.append("Labels: " + ", ".join(sorted(label_names))) if is_confidential: pheaders.append("Confidential: true") lines.append(" \\\n".join(pheaders)) lines.append("") lines.append(body) if mode in ["send", "print"]: do_mail( smtp=smtp, delay=delay, mode=mode, frm=frm, to=f"{tracker}@todo.sr.ht", subject=title, body="\n".join(lines), ) elif mode == "hut": msg = title if len(lines): msg += "\n" + "\n".join(lines) run_hut(["ticket", "create"], tracker, msg, delay=delay) issue_count += 1 return issue_count def file_missing_ticket( *, smtp, delay: float, mode: str, tracker: str, frm: str, issue_id: int, ): global issue_count if mode in ["send", "print"]: do_mail( smtp=smtp, delay=delay, mode=mode, frm=frm, to=f"{tracker}@todo.sr.ht", subject="Missing issue", body=f"Issue {issue_id} is not known.", ) elif mode == "hut": msg = f"Missing issue\n\nIssue {issue_id} is not known." run_hut(["ticket", "create"], tracker, msg, delay=delay) issue_count += 1 # TODO Send these emails at the end, so that there isn't such a need for the # previous issue to be processed promptly. close_ticket( smtp=smtp, delay=delay, mode=mode, tracker=tracker, frm=frm, issue_id=issue_count, closed_at=None, is_closed=False, # Save one line of text. ) def send_comment( *, smtp, delay: float, mode: str, tracker: str, frm: str, issue_id: int, body: str, author_name: str, created_at: str, last_edited_at: str, is_system: bool, is_confidential: bool, ): lines = [] pheaders = [] # Pseudo-headers, if any. if is_confidential: pheaders.append("Confidential: true") if pheaders: lines.append(" \\\n".join(pheaders)) lines.append("") # Authorship note for a regular comment. if is_system: lines.append(f"Changed on {created_at} by {author_name}:") else: lines.append(f"On {created_at}, {author_name} wrote:") lines.append("") lines.append(body) if last_edited_at and last_edited_at != created_at: lines.append("") lines.append(f"(Last edited at {last_edited_at}.)") body = "\n".join(lines) if mode in ["send", "print"]: do_mail( smtp=smtp, delay=delay, mode=mode, frm=frm, to=f"{tracker}/{issue_id}@todo.sr.ht", ) elif mode == "hut": run_hut(["ticket", "comment"], tracker, body, [str(issue_id)], delay=delay) def close_ticket( *, smtp, delay: float, mode: str, tracker: str, frm: str, issue_id: int, closed_at: Optional[str], is_closed: bool, ): lines = [] if closed_at is not None: # (Skipping pseudoheaders array here, only have one.) lines.append(f"Closed at: {closed_at}") elif is_closed: lines.append("Ticket closed.") if mode in ["send", "print"]: lines.append("") lines.append("!resolve fixed") do_mail( smtp=smtp, delay=delay, mode=mode, frm=frm, to=f"{tracker}/{issue_id}@todo.sr.ht", body="\n".join(lines), ) elif mode == "hut": run_hut( ["ticket", "update-status"], tracker, None, [str(issue_id), "--resolution", "fixed", "--status", "resolved"], delay=delay, ) def ensure_label( tracker: str, name: str, bg_color: str, fg_color: str = "#FFFFFF", delay=None ): labels = get_labels(tracker.split("/", 1)[1]) if not ([x for x in labels if x["name"] == name]): run_hut( ["label", "create"], tracker, None, ["--background", bg_color, "--foreground", fg_color, name], delay=delay, ) def run( *, smtp, delay: float, mode: str, tracker: str, frm: str, export_dir_path: Path, gitlab_project_url: str, labels_file_path: Optional[Path], skip_unknown_labels: bool, users_file_path: Optional[Path], skip_unknown_users: bool, skip_missing_issues: bool, create_missing_issues: bool, include_confidential: bool, skip_confidential: bool, ): label_ids_to_names: Optional[Dict[int, str]] = ( read_id_map_file(labels_file_path) if labels_file_path else None ) user_ids_to_names: Optional[Dict[int, str]] = ( read_id_map_file(users_file_path) if users_file_path else None ) # TODO Might be able to automatically map note.events.author_id to # note.author.name for a subset of relevant users. milestone_jsons = [] milestones_file_path = export_dir_path / "milestones.ndjson" # This file may not exist if the project has no milestones. if milestones_file_path.exists(): with open(milestones_file_path) as milestones_file: for line in milestones_file: milestone_jsons.append(json.loads(line)) milestone_ids_to_titles = {} for milestone_json in milestone_jsons: milestone_ids_to_titles[milestone_json["iid"]] = milestone_json["title"] issue_jsons = [] with open(export_dir_path / "issues.ndjson") as issues_file: for line in issues_file: issue_jsons.append(json.loads(line)) if skip_confidential: issue_jsons = [x for x in issue_jsons if not x.get("confidential")] for issue_json in issue_jsons: issue_json["notes"] = [ n for n in issue_json["notes"] if not n.get("confidential") ] elif not include_confidential: have_confidential_issues = any(x.get("confidential") for x in issue_jsons) have_confidential_notes = any( n.get("confidential") for x in issue_jsons for n in x["notes"] ) confidential_types = [] if have_confidential_issues: confidential_types.append("issues") if have_confidential_notes: confidential_types.append("notes") assert not (have_confidential_issues or have_confidential_notes), ( f"Found confidential {' and '.join(confidential_types)}; please " f"decide whether these should all be included, then pass either " f"--include-confidential or --skip-confidential, or edit " f"issues.ndjson for more fine-grained control." ) issue_jsons.sort(key=lambda x: x["iid"]) max_issue_id = max(x["iid"] for x in issue_jsons) present_issue_id_set = {x["iid"] for x in issue_jsons} missing_issue_ids = set(range(1, max_issue_id + 1)) - present_issue_id_set if missing_issue_ids and not (skip_missing_issues or create_missing_issues): if skip_confidential: because_confidential_msg = ( " (possibly because some confidential issues were excluded)" ) else: because_confidential_msg = "" raise RuntimeError( f"Don't have all issues from 1 to {max_issue_id}{because_confidential_msg}, " f"please pass --create-missing-issues or --skip-missing-issues to proceed." ) issues_by_id = {} for issue_json in issue_jsons: issues_by_id[issue_json["iid"]] = issue_json # Need to sort notes by date, they seem to come unsorted. for issue_json in issue_jsons: issue_json["notes"].sort(key=lambda x: x["created_at"]) log.info("Creating tickets.") issue_id_map: Dict[int, int] = {} # While we're creating tickets, we can't just loop over the sorted # issue_jsons. We have to loop over potential issue IDs and handle any that # are missing as well. for gitlab_issue_id in range(1, max_issue_id + 1): if gitlab_issue_id not in issues_by_id: if create_missing_issues: file_missing_ticket( smtp=smtp, delay=delay, mode=mode, tracker=tracker, frm=frm, issue_id=gitlab_issue_id, ) elif not skip_missing_issues: raise RuntimeError( f"Internal error, don't know what to do with missing " f"issue ID {gitlab_issue_id}." ) continue issue_json = issues_by_id[gitlab_issue_id] author_id = issue_json["author_id"] created_by: Optional[str] if user_ids_to_names is None: created_by = None elif author_id in user_ids_to_names: created_by = user_ids_to_names[author_id] else: assert skip_unknown_users, ( f"Unknown author #{author_id} of ticket #{gitlab_issue_id}, " f"please add to the users file." ) created_by = None srht_issue_id = open_ticket( smtp=smtp, delay=delay, mode=mode, tracker=tracker, frm=frm, title=issue_json["title"], body=issue_json["description"], created_by=created_by, created_at=issue_json["created_at"], closed_at=issue_json["closed_at"], is_closed=(issue_json["state"] == "closed"), is_confidential=(issue_json.get("confidential") is True), label_names=[x["label"]["title"] for x in issue_json["label_links"]], milestone_name=issue_json.get("milestone", {}).get("title") or None, gitlab_ticket_url=f"{gitlab_project_url}/-/issues/{gitlab_issue_id}", ) if not skip_missing_issues: assert srht_issue_id == gitlab_issue_id, ( f"Internal error, srht_issue_id {srht_issue_id} != " f"gitlab_issue_id {gitlab_issue_id} " f"(skip_missing_issues={skip_missing_issues}, " f"create_missing_issues={create_missing_issues})." ) issue_id_map[gitlab_issue_id] = srht_issue_id log.info("Creating comments.") for issue_json in issue_jsons: for note_json in issue_json["notes"]: system_action = note_json.get("system_note_metadata", {}).get( "action", None ) body = note_json["note"] # The "Removed" part is a guess here, don't know if that actually shows up. if label_ids_to_names is not None and ( system_action == "label" or re.search(r"^(Added|Removed) ~[0-9]+ label", body) ): def expand_label(ref): ref_num = int(ref.group(1)) if ref_num in label_ids_to_names: return label_ids_to_names[ref_num] assert ( skip_unknown_labels ), f"Unknown label #{ref_num}, please add to the labels file." return ref.group(0) # Return the original "~id" string. body = re.sub(r"~([0-9]+)", expand_label, body) if system_action == "milestone" or re.search( r"^Milestone changed to %[0-9]+$", body ): def expand_milestone(ref): ref_num = int(ref.group(1)) assert ( ref_num in milestone_ids_to_titles ), f"Unknown milestone #{ref_num}." return milestone_ids_to_titles[ref_num] body = re.sub(r"%([0-9]+)", expand_milestone, body) send_comment( smtp=smtp, delay=delay, mode=mode, tracker=tracker, frm=frm, issue_id=issue_id_map[issue_json["iid"]], body=body, author_name=note_json["author"]["name"], created_at=note_json["created_at"], last_edited_at=note_json["last_edited_at"], is_system=note_json["system"], is_confidential=(note_json["confidential"] is True), ) log.info("Closing closed issues.") for issue_json in issue_jsons: if issue_json["state"] == "closed": close_ticket( smtp=smtp, delay=delay, mode=mode, tracker=tracker, frm=frm, issue_id=issue_id_map[issue_json["iid"]], closed_at=issue_json["closed_at"], is_closed=(issue_json["state"] == "closed"), ) def main(): parser = argparse.ArgumentParser( prog="import_issues.py", description="Import Gitlab issues into Sourcehut via SMTP.", ) parser.add_argument( "--srht-owner", required=True, help="Owner of the Sorucehut tracker.", ) parser.add_argument( "--srht-tracker", required=True, help="Name of Sourcehut tracker to submit to.", ) parser.add_argument( "--gitlab-project-url", required=True, help="The base URL the project on Gitlab.", ) parser.add_argument( "--mode", choices=["print", "send", "hut"], default="print", help="Action to take.", ) parser.add_argument( "--from", help="From address if mode is 'send'.", ) parser.add_argument( "--smtp-host", help="SMTP host to use.", ) parser.add_argument( "--smtp-port", default=None, help="SMTP port to use.", ) parser.add_argument( "--smtp-ssl", action="store_true", help="Use SMTP over SSL.", ) parser.add_argument( "--smtp-starttls", action="store_true", help="Use STARTTLS.", ) parser.add_argument( "--smtp-user", help="SMTP username.", ) parser.add_argument( "--smtp-password", help="SMTP password.", ) parser.add_argument( "--delay", default=None, help="Decimal number of seconds to wait between accessing the server.", ) parser.add_argument( "--labels-file", help="CSV file mapping label IDs to names.", ) parser.add_argument( "--skip-labels", action="store_true", help="Skip mapping label IDs to names.", ) parser.add_argument( "--skip-unknown-labels", action="store_true", help="Skip mapping labels that aren't in the labels file.", ) parser.add_argument( "--users-file", help="CSV file mapping user IDs to names.", ) parser.add_argument( "--skip-users", action="store_true", help="Skip mapping user IDs to names.", ) parser.add_argument( "--skip-unknown-users", action="store_true", help="Skip mapping users that aren't in the users file.", ) parser.add_argument( "--skip-missing-issues", action="store_true", help="Skip missing Gitlab issue IDs; GL and sr.ht IDs will not match.", ) parser.add_argument( "--create-missing-issues", action="store_true", help="Create missing GL issues in sr.ht to make issue IDs match.", ) parser.add_argument( "--include-confidential", action="store_true", help="Include confidential tickets and notes.", ) parser.add_argument( "--skip-confidential", action="store_true", help="Skip confidential tickets and notes.", ) parser.add_argument( "export_dir", help="Exported Gitlab tree/project/ directory containing ndjson files.", ) args = vars(parser.parse_args()) export_dir = args["export_dir"] assert export_dir, "Must have a exported project directory." export_dir_path = Path(export_dir) assert ( export_dir_path.is_dir() ), f"Project directory is not a directory: {export_dir_path}" mode = args["mode"] frm = args["from"] labels_file = args["labels_file"] skip_labels = args["skip_labels"] skip_unknown_labels = args["skip_unknown_labels"] assert ( labels_file or skip_labels ), "One of --labels-file or --skip-labels must be provided." users_file = args["users_file"] skip_users = args["skip_users"] skip_unknown_users = args["skip_unknown_users"] assert ( skip_users or users_file ), "One of --users-file or --skip-users must be provided." skip_missing_issues = args["skip_missing_issues"] create_missing_issues = args["create_missing_issues"] assert not ( skip_missing_issues and create_missing_issues ), "Can accept at most one of --skip-missing-issues and --create-missing-issues." include_confidential = args["include_confidential"] skip_confidential = args["skip_confidential"] assert not ( include_confidential and skip_confidential ), "Can accept at most one of --include-confidential and --skip-confidential." srht_owner = args["srht_owner"] srht_tracker = args["srht_tracker"] tracker = f"~{srht_owner}/{srht_tracker}" delay = args["delay"] if delay is None: if mode == "hut": delay = 0.5 else: delay = 5 else: delay = float(delay) if mode != "send": smtp = None else: smtp_ssl = args["smtp_ssl"] smtp_starttls = args["smtp_starttls"] smtp_host = args["smtp_host"] or os.environ.get("SMTP_HOST", "localhost") smtp_port = args["smtp_port"] or os.environ.get( "SMTP_PORT", 465 if smtp_ssl else 25 ) smtp_user = args["smtp_user"] or os.environ.get("SMTP_USER", None) smtp_password = args["smtp_password"] or os.environ.get("SMTP_PASSWORD", None) assert smtp_user, "No SMTP user given." assert smtp_password, "No SMTP password given." log.info("Connecting to %s:%d, user %r.", smtp_host, smtp_port, smtp_user) if smtp_ssl: smtp = smtplib.SMTP_SSL(host=smtp_host, port=smtp_port) else: smtp = smtplib.SMTP(host=smtp_host, port=smtp_port) # If SMTP isn't working: # smtp.set_debuglevel(2) if smtp_starttls: smtp.starttls() if smtp_user: smtp.login(smtp_user, smtp_password) run( smtp=smtp, delay=delay, mode=mode, tracker=tracker, frm=frm, export_dir_path=export_dir_path, gitlab_project_url=args["gitlab_project_url"].rstrip("/"), labels_file_path=None if skip_labels else Path(labels_file), skip_unknown_labels=skip_unknown_labels, users_file_path=None if skip_users else Path(users_file), skip_unknown_users=skip_unknown_users, skip_missing_issues=skip_missing_issues, create_missing_issues=create_missing_issues, include_confidential=include_confidential, skip_confidential=skip_confidential, ) if mode == "send": smtp.quit() if __name__ == "__main__": main()