From 11ccc471bb91e19334fa266f9837f9bb09a1e34d Mon Sep 17 00:00:00 2001 From: Robin Jarry Date: Mon, 15 May 2023 16:07:54 +0200 Subject: contrib: add carddav-query script Add a standalone python script to allow querying contacts from a CardDAV compatible server. The script works with python 3.6+ and has no external dependencies. Link: https://sabre.io/dav/building-a-carddav-client/ Link: https://www.rfc-editor.org/rfc/rfc6352 Signed-off-by: Robin Jarry Tested-by: Tim Culverhouse --- contrib/carddav-query | 268 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 268 insertions(+) create mode 100755 contrib/carddav-query (limited to 'contrib') diff --git a/contrib/carddav-query b/contrib/carddav-query new file mode 100755 index 00000000..f7eaa793 --- /dev/null +++ b/contrib/carddav-query @@ -0,0 +1,268 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: MIT +# Copyright (c) 2023 Robin Jarry + +""" +Query a CardDAV server for contact names and emails. +""" + +import argparse +import base64 +import configparser +import os +import re +import subprocess +import sys +import xml.etree.ElementTree as xml +from urllib import error, parse, request + + +def main(): + try: + args = parse_args() + + C = "urn:ietf:params:xml:ns:carddav" + D = "DAV:" + xml.register_namespace("C", C) + xml.register_namespace("D", D) + + # perform the actual address book query + query = xml.Element(f"{{{C}}}addressbook-query") + prop = xml.SubElement(query, f"{{{D}}}prop") + xml.SubElement(prop, f"{{{D}}}getetag") + data = xml.SubElement(prop, f"{{{C}}}address-data") + xml.SubElement(data, f"{{{C}}}prop", name="FN") + xml.SubElement(data, f"{{{C}}}prop", name="EMAIL") + limit = xml.SubElement(query, f"{{{C}}}limit") + xml.SubElement(limit, f"{{{C}}}nresults").text = str(args.limit) + filtre = xml.SubElement(query, f"{{{C}}}filter", test="anyof") + for term in args.terms: + for attr in "FN", "EMAIL", "NICKNAME", "ORG", "TITLE": + prop = xml.SubElement(filtre, f"{{{C}}}prop-filter", name=attr) + match = xml.SubElement( + prop, f"{{{C}}}text-match", {"match-type": "contains"} + ) + match.text = term + data = http_request_xml( + "REPORT", + args.server_url, + query, + username=args.username, + password=args.password, + debug=args.verbose, + Depth="1", + ) + for vcard in data.iterfind(f".//{{{C}}}address-data"): + for name, email in parse_vcard(vcard.text.strip()): + print(f"{email}\t{name}") + + except Exception as e: + if isinstance(e, error.HTTPError): + if args.verbose: + debug_response(e.fp) + e = e.fp.read().decode() + print(f"error: {e}", file=sys.stderr) + sys.exit(1) + + +def http_request_xml( + method: str, + url: str, + data: xml.Element, + username: str = None, + password: str = None, + debug: bool = False, + **headers, +) -> xml.Element: + req = request.Request( + url=url, + method=method, + headers={ + "Content-Type": 'text/xml; charset="utf-8"', + **headers, + }, + data=xml.tostring(data, encoding="utf-8", xml_declaration=True), + ) + if username is not None and password is not None: + auth = f"{username}:{password}" + auth = base64.standard_b64encode(auth.encode("utf-8")).decode("ascii") + req.add_header("Authorization", f"Basic {auth}") + + if debug: + uri = parse.urlparse(req.full_url) + print(f"> {req.method} {uri.path} HTTP/1.1", file=sys.stderr) + print(f"> Host: {uri.hostname}", file=sys.stderr) + for name, value in req.headers.items(): + print(f"> {name}: {value}", file=sys.stderr) + print(f"{req.data.decode('utf-8')}\n", file=sys.stderr) + + with request.urlopen(req) as resp: + data = resp.read().decode("utf-8") + if debug: + debug_response(resp) + print(f"{data}", file=sys.stderr) + + return xml.fromstring(data) + + +def debug_response(resp): + print(f"< HTTP/1.1 {resp.code}", file=sys.stderr) + for name, value in resp.headers.items(): + print(f"< {name}: {value}", file=sys.stderr) + + +def parse_vcard(txt): + lines = txt.splitlines() + if len(lines) < 4 or lines[0] != "BEGIN:VCARD" or lines[-1] != "END:VCARD": + return + name = None + emails = [] + for line in lines[1:-1]: + if line.startswith("FN:"): + name = line[len("FN:") :].replace("\\,", ",") + continue + match = re.match(r"^(?:ITEM\d+\.)?EMAIL(?:;[\w-]+=[^;:]+)*:(.+@.+)$", line) + if match: + email = match.group(1).lower().replace("\\,", ",") + if email not in emails: + if "TYPE=pref" in line or "PREF=1" in line: + emails.insert(0, email) + else: + emails.append(email) + if name is not None: + for e in emails: + yield name, e + + +def parse_args(): + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "-l", + "--limit", + default=10, + type=int, + help=""" + Maximum number of results returned by the server (default: 10). + If the server does not support limiting, this will be disregarded. + """, + ) + parser.add_argument( + "-v", + "--verbose", + action="store_true", + help=""" + Print debug info on stderr. + """, + ) + parser.add_argument( + "-c", + "--config-file", + metavar="FILE", + default=os.path.expanduser("~/.config/aerc/accounts.conf"), + help=""" + INI configuration file from which to read the CardDAV URL endpoint + (default: ~/.config/aerc/accounts.conf). + """, + ) + parser.add_argument( + "-S", + "--config-section", + metavar="SECTION", + help=""" + INI configuration section where to find CONFIG_KEY. By default the + first section where CONFIG_KEY is found will be used. + """, + ) + parser.add_argument( + "-k", + "--config-key-source", + metavar="KEY_SOURCE", + default="carddav-source", + help=""" + INI configuration key to lookup in CONFIG_SECTION from CONFIG_FILE. + The value must respect the following format: + https?://USERNAME[:PASSWORD]@HOSTNAME/PATH/TO/ADDRESSBOOK. + Both USERNAME and PASSWORD must be percent encoded. + """, + ) + parser.add_argument( + "-C", + "--config-key-cred-cmd", + metavar="KEY_CRED_CMD", + default="carddav-source-cred-cmd", + help=""" + INI configuration key to lookup in CONFIG_SECTION from CONFIG_FILE. The + value is a command that will be used to determine PASSWORD if it is not + present in CONFIG_KEY_SOURCE. + """, + ) + parser.add_argument( + "-s", + "--server-url", + help=""" + CardDAV server URL endpoint. Overrides configuration file. + """, + ) + parser.add_argument( + "-u", + "--username", + help=""" + Username to authenticate on the server. Overrides configuration file. + """, + ) + parser.add_argument( + "-p", + "--password", + help=""" + Password for the specified user. Overrides configuration file. + """, + ) + parser.add_argument( + "terms", + nargs="+", + metavar="TERM", + help=""" + Search term. Will be used to search contacts from their FN (formatted + name), EMAIL, NICKNAME, ORG (company) and TITLE fields. + """, + ) + args = parser.parse_args() + + cfg = configparser.RawConfigParser(strict=False) + cfg.read([args.config_file]) + source = cred_cmd = None + if args.config_section: + source = cfg.get(args.config_section, args.config_key_source, fallback=None) + cred_cmd = cfg.get(args.config_section, args.config_key_cred_cmd, fallback=None) + else: + for sec in cfg.sections(): + source = cfg.get(sec, args.config_key_source, fallback=None) + if source is not None: + cred_cmd = cfg.get(sec, args.config_key_cred_cmd, fallback=None) + break + if source is not None: + try: + u = parse.urlparse(source) + if args.username is None: + args.username = u.username + if args.password is None: + args.password = u.password + if not args.password and cred_cmd is not None: + args.password = subprocess.check_output( + cred_cmd, shell=True, text=True, encoding="utf-8" + ).strip() + if args.server_url is None: + args.server_url = f"{u.scheme}://{u.hostname}" + if u.port is not None: + args.server_url += f":{u.port}" + args.server_url += u.path + except ValueError as e: + parser.error(f"{args.config_file}: {e}") + if args.server_url is None: + parser.error("SERVER_URL is required") + + return args + + +if __name__ == "__main__": + main() -- cgit