from __future__ import annotations
import json
import pprint
import re
from dataclasses import dataclass
from functools import wraps
from typing import (
TYPE_CHECKING,
Any,
Callable,
Coroutine,
Dict,
Iterable,
List,
Optional,
Tuple,
TypeVar,
Union,
)
import weechat
from slack.error import SlackError, SlackRtmError, UncaughtError
from slack.log import open_debug_buffer, print_error
from slack.python_compatibility import format_exception, removeprefix
from slack.shared import shared
from slack.slack_buffer import SlackBuffer
from slack.slack_conversation import SlackConversation
from slack.slack_message import SlackTs
from slack.slack_thread import SlackThread
from slack.slack_user import SlackUser
from slack.slack_workspace import SlackWorkspace
from slack.task import gather, run_async, sleep
from slack.util import get_callback_name, get_resolved_futures, with_color
from slack.weechat_config import WeeChatOption, WeeChatOptionTypes
if TYPE_CHECKING:
from typing_extensions import Literal
Options = Dict[str, Union[str, Literal[True]]]
WeechatCommandCallback = Callable[[str, str], None]
InternalCommandCallback = Callable[
[str, List[str], Options], Optional[Coroutine[Any, None, None]]
]
T = TypeVar("T")
def print_message_not_found_error(msg_id: str):
if msg_id:
print_error(
"Invalid id given, must be an existing id or a number greater "
+ "than 0 and less than the number of messages in the channel"
)
else:
print_error("No messages found in channel")
# def parse_help_docstring(cmd):
# doc = textwrap.dedent(cmd.__doc__).strip().split("\n", 1)
# cmd_line = doc[0].split(None, 1)
# args = "".join(cmd_line[1:])
# return cmd_line[0], args, doc[1].strip()
def parse_options(args: str):
regex = re.compile("(?:^| )+-([^ =]+)(?:=([^ ]+))?")
pos_args = regex.sub("", args)
options: Options = {
match.group(1): match.group(2) or True for match in regex.finditer(args)
}
return pos_args, options
@dataclass
class Command:
cmd: str
top_level: bool
description: str
args: str
args_description: str
completion: str
cb: WeechatCommandCallback
def weechat_command(
completion: str = "",
min_args: int = 0,
split_all_args: bool = False,
slack_buffer_required: bool = False,
) -> Callable[
[InternalCommandCallback],
WeechatCommandCallback,
]:
def decorator(
f: InternalCommandCallback,
) -> WeechatCommandCallback:
cmd = removeprefix(f.__name__, "command_").replace("_", " ")
top_level = " " not in cmd
@wraps(f)
def wrapper(buffer: str, args: str):
pos_args, options = parse_options(args)
maxsplit = -1 if split_all_args else 0 if min_args == 0 else min_args - 1
split_args = pos_args.split(" ", maxsplit)
if min_args and not pos_args or len(split_args) < min_args:
print_error(
f'Too few arguments for command "/{cmd}" (help on command: /help {cmd})'
)
return
result = f(buffer, split_args, options)
if result is not None:
run_async(result)
return
shared.commands[cmd] = Command(cmd, top_level, "", "", "", completion, wrapper)
return wrapper
return decorator
def list_workspaces(workspace_name: Optional[str] = None, detailed_list: bool = False):
weechat.prnt("", "")
weechat.prnt("", "All workspaces:")
for workspace in shared.workspaces.values():
display_workspace(workspace, detailed_list)
def display_workspace(workspace: SlackWorkspace, detailed_list: bool):
if workspace.is_connected:
weechat.prnt(
"",
f" * "
f"{with_color('chat_server', workspace.name)} "
f"{with_color('chat_delimiters', '[')}"
f"connected"
f"{with_color('chat_delimiters', ']')}"
f", nick: {workspace.my_user.nick.format()}"
f", 0 channel(s), 0 pv",
)
else:
weechat.prnt("", f" {with_color('chat_server', workspace.name)}")
@weechat_command()
def command_slack(buffer: str, args: List[str], options: Options):
"""
slack command
"""
print("ran slack")
async def workspace_connect(workspace: SlackWorkspace):
if workspace.is_connected:
print_error(f'already connected to workspace "{workspace.name}"!')
return
elif workspace.is_connecting:
print_error(f'already connecting to workspace "{workspace.name}"!')
return
await workspace.connect()
@weechat_command("%(slack_workspaces)|-all", split_all_args=True)
async def command_slack_connect(buffer: str, args: List[str], options: Options):
if options.get("all"):
for workspace in shared.workspaces.values():
await workspace.connect()
elif args[0]:
for arg in args:
workspace = shared.workspaces.get(arg)
if workspace is None:
print_error(f'workspace "{arg}" not found')
else:
await workspace_connect(workspace)
else:
slack_buffer = shared.buffers.get(buffer)
if slack_buffer:
await workspace_connect(slack_buffer.workspace)
def workspace_disconnect(workspace: SlackWorkspace):
if not workspace.is_connected and not workspace.is_connecting:
print_error(f'not connected to workspace "{workspace.name}"!')
return
workspace.disconnect()
@weechat_command("%(slack_workspaces)|-all", split_all_args=True)
def command_slack_disconnect(buffer: str, args: List[str], options: Options):
if options.get("all"):
for workspace in shared.workspaces.values():
workspace.disconnect()
elif args[0]:
for arg in args:
workspace = shared.workspaces.get(arg)
if workspace is None:
print_error(f'workspace "{arg}" not found')
else:
workspace_disconnect(workspace)
else:
slack_buffer = shared.buffers.get(buffer)
if slack_buffer:
workspace_disconnect(slack_buffer.workspace)
@weechat_command()
async def command_slack_rehistory(buffer: str, args: List[str], options: Options):
slack_buffer = shared.buffers.get(buffer)
if slack_buffer:
await slack_buffer.rerender_history()
@weechat_command()
def command_slack_workspace(buffer: str, args: List[str], options: Options):
list_workspaces()
@weechat_command("%(slack_workspaces)")
def command_slack_workspace_list(buffer: str, args: List[str], options: Options):
list_workspaces()
@weechat_command("%(slack_workspaces)")
def command_slack_workspace_listfull(buffer: str, args: List[str], options: Options):
list_workspaces(detailed_list=True)
@weechat_command(min_args=1)
def command_slack_workspace_add(buffer: str, args: List[str], options: Options):
name = args[0]
if name in shared.workspaces:
print_error(f'workspace "{name}" already exists, can\'t add it!')
return
shared.workspaces[name] = SlackWorkspace(name)
for option_name, option_value in options.items():
if hasattr(shared.workspaces[name].config, option_name):
config_option: WeeChatOption[WeeChatOptionTypes] = getattr(
shared.workspaces[name].config, option_name
)
value = "on" if option_value is True else option_value
config_option.value_set_as_str(value)
weechat.prnt(
"",
f"{shared.SCRIPT_NAME}: workspace added: {weechat.color('chat_server')}{name}{weechat.color('reset')}",
)
@weechat_command("%(slack_workspaces)", min_args=2)
def command_slack_workspace_rename(buffer: str, args: List[str], options: Options):
old_name = args[0]
new_name = args[1]
workspace = shared.workspaces.get(old_name)
if not workspace:
print_error(f'workspace "{old_name}" not found for "workspace rename" command')
return
workspace.name = new_name
shared.workspaces[new_name] = workspace
del shared.workspaces[old_name]
weechat.prnt(
"",
f"server {with_color('chat_server', old_name)} has been renamed to {with_color('chat_server', new_name)}",
)
# TODO: Rename buffers and config
@weechat_command("%(slack_workspaces)", min_args=1)
def command_slack_workspace_del(buffer: str, args: List[str], options: Options):
name = args[0]
workspace = shared.workspaces.get(name)
if not workspace:
print_error(f'workspace "{name}" not found for "workspace del" command')
return
if workspace.is_connected:
print_error(
f'you can not delete server "{name}" because you are connected to it. Try "/slack disconnect {name}" first.'
)
return
# TODO: Delete config
del shared.workspaces[name]
weechat.prnt(
"",
f"server {with_color('chat_server', name)} has been deleted",
)
async def create_conversation_for_users(
workspace: SlackWorkspace, users: Iterable[SlackUser]
):
user_ids = [user.id for user in users]
conversation_open_response = await workspace.api.conversations_open(user_ids)
conversation_id = conversation_open_response["channel"]["id"]
workspace.conversations.initialize_items(
[conversation_id], {conversation_id: conversation_open_response["channel"]}
)
conversation = await workspace.conversations[conversation_id]
await conversation.open_buffer(switch=True)
@weechat_command("%(nicks)", min_args=1, split_all_args=True)
async def command_slack_query(buffer: str, args: List[str], options: Options):
slack_buffer = shared.buffers.get(buffer)
if slack_buffer is None:
return
nicks = [removeprefix(nick, "@") for nick in args]
all_users = get_resolved_futures(slack_buffer.workspace.users.values())
users = [user for user in all_users if user.nick.raw_nick in nicks]
if len(users) != len(nicks):
found_nicks = [user.nick.raw_nick for user in users]
not_found_nicks = [nick for nick in nicks if nick not in found_nicks]
print_error(
f"No such nick{'s' if len(not_found_nicks) > 1 else ''}: {', '.join(not_found_nicks)}"
)
return
if len(users) == 1:
user = users[0]
all_conversations = get_resolved_futures(
slack_buffer.workspace.conversations.values()
)
for conversation in all_conversations:
if conversation.im_user_id == user.id:
await conversation.open_buffer(switch=True)
return
await create_conversation_for_users(slack_buffer.workspace, users)
def get_conversation_from_args(buffer: str, args: List[str], options: Options):
slack_buffer = shared.buffers.get(buffer)
workspace_name = options.get("workspace")
if workspace_name is True:
print_error("No workspace specified")
return
workspace = (
shared.workspaces.get(workspace_name)
if workspace_name
else slack_buffer.workspace
if slack_buffer is not None
else None
)
if workspace is None:
if workspace_name:
print_error(f'Workspace "{workspace_name}" not found')
else:
print_error(
"Must be run from a slack buffer unless a workspace is specified"
)
return
if len(args) == 0 or not args[0]:
if workspace_name is not None:
print_error(
"Must specify conversaton name when workspace name is specified"
)
return
if isinstance(slack_buffer, SlackConversation):
return slack_buffer
else:
return
conversation_name = args[0].strip()
all_conversations = get_resolved_futures(workspace.conversations.values())
for conversation in all_conversations:
if (
conversation.name_with_prefix("short_name_without_padding")
== conversation_name
or conversation.name() == conversation_name
):
return conversation
print_error(f'Conversation "{conversation_name}" not found')
@weechat_command("")
async def command_slack_join(buffer: str, args: List[str], options: Options):
conversation = get_conversation_from_args(buffer, args, options)
if conversation is not None:
await conversation.api.conversations_join(conversation.id)
await conversation.open_buffer(switch=not options.get("noswitch"))
@weechat_command("")
async def command_slack_part(buffer: str, args: List[str], options: Options):
conversation = get_conversation_from_args(buffer, args, options)
if conversation is not None:
await conversation.part()
@weechat_command("%(threads)", min_args=1)
async def command_slack_thread(buffer: str, args: List[str], options: Options):
slack_buffer = shared.buffers.get(buffer)
if isinstance(slack_buffer, SlackConversation):
await slack_buffer.open_thread(args[0], switch=True)
@weechat_command("-alsochannel|%(threads)", min_args=1)
async def command_slack_reply(buffer: str, args: List[str], options: Options):
slack_buffer = shared.buffers.get(buffer)
broadcast = bool(options.get("alsochannel"))
if isinstance(slack_buffer, SlackThread):
await slack_buffer.post_message(args[0], broadcast=broadcast)
elif isinstance(slack_buffer, SlackConversation):
split_args = args[0].split(" ", 1)
if len(split_args) < 2:
print_error(
'Too few arguments for command "/slack reply" (help on command: /help slack reply)'
)
return
thread_ts = slack_buffer.ts_from_hash_or_index(split_args[0])
await slack_buffer.post_message(split_args[1], thread_ts, broadcast)
@weechat_command("away|active")
async def command_slack_presence(buffer: str, args: List[str], options: Options):
slack_buffer = shared.buffers.get(buffer)
if slack_buffer is None:
return
new_presence = args[0]
if new_presence not in ("active", "away"):
print_error(
f'Error with command "/slack presence {args[0]}" (help on command: /help slack presence)'
)
return
await slack_buffer.api.set_presence(new_presence)
@weechat_command("list")
async def command_slack_mute(buffer: str, args: List[str], options: Options):
slack_buffer = shared.buffers.get(buffer)
if not isinstance(slack_buffer, SlackConversation):
return
if args[0] == "list":
conversations = await gather(
*[
slack_buffer.workspace.conversations[conversation_id]
for conversation_id in slack_buffer.workspace.muted_channels
]
)
conversation_names = sorted(
conversation.name_with_prefix("short_name_without_padding")
for conversation in conversations
)
weechat.prnt("", f"Muted conversations: {', '.join(conversation_names)}")
return
muted_channels = set(slack_buffer.workspace.muted_channels)
muted_channels ^= {slack_buffer.id}
await slack_buffer.api.set_muted_channels(muted_channels)
muted_str = "Muted" if slack_buffer.id in muted_channels else "Unmuted"
weechat.prnt(
"",
f"{muted_str} channel {slack_buffer.name_with_prefix('short_name_without_padding')}",
)
def print_uncaught_error(error: UncaughtError, detailed: bool, options: Options):
weechat.prnt("", f" {error.id} ({error.time}): {error.exception}")
if detailed:
for line in format_exception(error.exception):
weechat.prnt("", f" {line}")
if options.get("data"):
if isinstance(error.exception, SlackRtmError):
weechat.prnt("", f" data: {json.dumps(error.exception.message_json)}")
elif isinstance(error.exception, SlackError):
weechat.prnt("", f" data: {json.dumps(error.exception.data)}")
else:
print_error("This error does not have any data")
@weechat_command("tasks|buffer|open_buffer|errors|error", split_all_args=True)
def command_slack_debug(buffer: str, args: List[str], options: Options):
# TODO: Add message info (message_json)
if args[0] == "tasks":
weechat.prnt("", "Active tasks:")
weechat.prnt("", pprint.pformat(shared.active_tasks))
weechat.prnt("", "Active futures:")
weechat.prnt("", pprint.pformat(shared.active_futures))
elif args[0] == "buffer":
slack_buffer = shared.buffers.get(buffer)
if isinstance(slack_buffer, SlackConversation):
weechat.prnt("", f"Conversation id: {slack_buffer.id}")
elif isinstance(slack_buffer, SlackThread):
weechat.prnt(
"",
f"Conversation id: {slack_buffer.parent.conversation.id}, Thread ts: {slack_buffer.parent.thread_ts}, Thread hash: {slack_buffer.parent.hash}",
)
elif args[0] == "open_buffer":
open_debug_buffer()
elif args[0] == "errors":
num_arg = int(args[1]) if len(args) > 1 and args[1].isdecimal() else 5
num = min(num_arg, len(shared.uncaught_errors))
weechat.prnt("", f"Last {num} errors:")
for error in shared.uncaught_errors[-num:]:
print_uncaught_error(error, False, options)
elif args[0] == "error":
if len(args) > 1:
if args[1].isdecimal() and args[1] != "0":
num = int(args[1])
if num > len(shared.uncaught_errors):
print_error(
f"Only {len(shared.uncaught_errors)} error(s) have occurred"
)
return
error = shared.uncaught_errors[-num]
else:
errors = [e for e in shared.uncaught_errors if e.id == args[1]]
if not errors:
print_error(f"Error {args[1]} not found")
return
error = errors[0]
weechat.prnt("", f"Error {error.id}:")
elif not shared.uncaught_errors:
weechat.prnt("", "No errors have occurred")
return
else:
error = shared.uncaught_errors[-1]
weechat.prnt("", "Last error:")
print_uncaught_error(error, True, options)
@weechat_command("-clear")
async def command_slack_status(buffer: str, args: List[str], options: Options):
status = args[0]
slack_buffer = shared.buffers.get(buffer)
if slack_buffer is not None:
if options.get("clear"):
await slack_buffer.api.clear_user_status()
elif slack_buffer and len(status) > 0:
await slack_buffer.api.set_user_status(status)
else:
print_error(
'Too few arguments for command "/slack status" (help on command: /help slack status)'
)
else:
print_error("Run the command in a slack buffer")
def _get_conversation_from_buffer(
slack_buffer: SlackBuffer,
) -> Optional[SlackConversation]:
if isinstance(slack_buffer, SlackConversation):
return slack_buffer
elif isinstance(slack_buffer, SlackThread):
return slack_buffer.parent.conversation
return None
def _get_linkarchive_url(
slack_buffer: SlackBuffer, message_ts: Optional[SlackTs]
) -> str:
url = f"https://{slack_buffer.workspace.domain}.slack.com/"
conversation = _get_conversation_from_buffer(slack_buffer)
if conversation is not None:
url += f"archives/{conversation.id}/"
if message_ts is not None:
message = conversation.messages[message_ts]
url += f"p{message.ts.major}{message.ts.minor:0>6}"
if message.thread_ts is not None:
url += f"?thread_ts={message.thread_ts}&cid={conversation.id}"
return url
@weechat_command("%(threads)")
def command_slack_linkarchive(buffer: str, args: List[str], options: Options):
"""
/slack linkarchive [message_id]
Place a link to the conversation or message in the input bar.
Use cursor or mouse mode to get the id.
"""
slack_buffer = shared.buffers.get(buffer)
if slack_buffer is None:
return
if args[0]:
ts = slack_buffer.ts_from_hash_or_index(args[0])
if ts is None:
print_message_not_found_error(args[0])
return
else:
ts = None
url = _get_linkarchive_url(slack_buffer, ts)
weechat.command(buffer, f"/input insert {url}")
def find_command(start_cmd: str, args: str) -> Optional[Tuple[Command, str]]:
args_parts = re.finditer("[^ ]+", args)
cmd = start_cmd
cmd_args_startpos = 0
for part in args_parts:
next_cmd = f"{cmd} {part.group(0)}"
if next_cmd not in shared.commands:
cmd_args_startpos = part.start(0)
break
cmd = next_cmd
else:
cmd_args_startpos = len(args)
cmd_args = args[cmd_args_startpos:]
if cmd in shared.commands:
return shared.commands[cmd], cmd_args
return None
def command_cb(data: str, buffer: str, args: str) -> int:
found_cmd_with_args = find_command(data, args)
if found_cmd_with_args:
command = found_cmd_with_args[0]
cmd_args = found_cmd_with_args[1]
command.cb(buffer, cmd_args)
else:
print_error(
f'Error with command "/{data} {args}" (help on command: /help {data})'
)
return weechat.WEECHAT_RC_OK
async def mark_read(slack_buffer: SlackBuffer):
# Sleep so the read marker is updated before we run slack_buffer.mark_read
await sleep(1)
await slack_buffer.mark_read()
def buffer_set_unread_cb(data: str, buffer: str, command: str) -> int:
slack_buffer = shared.buffers.get(buffer)
if slack_buffer:
run_async(mark_read(slack_buffer))
return weechat.WEECHAT_RC_OK
def register_commands():
weechat.hook_command_run(
"/buffer set unread", get_callback_name(buffer_set_unread_cb), ""
)
weechat.hook_command_run(
"/buffer set unread *", get_callback_name(buffer_set_unread_cb), ""
)
weechat.hook_command_run(
"/input set_unread_current_buffer", get_callback_name(buffer_set_unread_cb), ""
)
for cmd, command in shared.commands.items():
if command.top_level:
weechat.hook_command(
command.cmd,
command.description,
command.args,
command.args_description,
"%(slack_commands)|%*",
get_callback_name(command_cb),
cmd,
)