from __future__ import annotations import hashlib from collections import OrderedDict from typing import TYPE_CHECKING, Dict, List, Mapping, NoReturn, Optional, Tuple, Union import weechat from slack.python_compatibility import removeprefix from slack.shared import shared from slack.slack_buffer import SlackBuffer from slack.slack_message import SlackMessage, SlackTs from slack.slack_thread import SlackThread from slack.task import gather, run_async if TYPE_CHECKING: from slack_api.slack_conversations_info import SlackConversationsInfo from slack_rtm.slack_rtm_message import ( SlackMessageChanged, SlackMessageDeleted, SlackMessageReplied, SlackShRoomJoin, SlackShRoomUpdate, ) from typing_extensions import Literal from slack.slack_workspace import SlackWorkspace def invalidate_nicklists(): for workspace in shared.workspaces.values(): for conversation in workspace.open_conversations.values(): conversation.nicklist_needs_refresh = True def sha1_hex(string: str) -> str: return str(hashlib.sha1(string.encode()).hexdigest()) def hash_from_ts(ts: SlackTs) -> str: return sha1_hex(str(ts)) class SlackConversationMessageHashes(Dict[SlackTs, str]): def __init__(self, conversation: SlackConversation): self._conversation = conversation self._inverse_map: Dict[str, SlackTs] = {} def __setitem__(self, key: SlackTs, value: str) -> NoReturn: raise RuntimeError("Set from outside isn't allowed") def __delitem__(self, key: SlackTs) -> None: if key in self: hash_key = self[key] del self._inverse_map[hash_key] super().__delitem__(key) def _setitem(self, key: SlackTs, value: str) -> None: super().__setitem__(key, value) def __missing__(self, key: SlackTs) -> str: hash_len = 3 full_hash = hash_from_ts(key) short_hash = full_hash[:hash_len] while any( existing_hash.startswith(short_hash) for existing_hash in self._inverse_map ): hash_len += 1 short_hash = full_hash[:hash_len] if short_hash[:-1] in self._inverse_map: ts_with_same_hash = self._inverse_map.pop(short_hash[:-1]) other_full_hash = hash_from_ts(ts_with_same_hash) other_short_hash = other_full_hash[:hash_len] while short_hash == other_short_hash: hash_len += 1 short_hash = full_hash[:hash_len] other_short_hash = other_full_hash[:hash_len] self._setitem(ts_with_same_hash, other_short_hash) self._inverse_map[other_short_hash] = ts_with_same_hash other_message = self._conversation.messages[ts_with_same_hash] run_async(self._conversation.rerender_message(other_message)) if other_message.thread_buffer is not None: run_async(other_message.thread_buffer.update_buffer_props()) for reply in other_message.replies.values(): run_async(self._conversation.rerender_message(reply)) self._setitem(key, short_hash) self._inverse_map[short_hash] = key return self[key] def get_ts(self, ts_hash: str) -> Optional[SlackTs]: hash_without_prefix = removeprefix(ts_hash, "$") return self._inverse_map.get(hash_without_prefix) class SlackConversation(SlackBuffer): def __init__( self, workspace: SlackWorkspace, info: SlackConversationsInfo, ): super().__init__() self._workspace = workspace self._info = info self._members: Optional[List[str]] = None self._messages: OrderedDict[SlackTs, SlackMessage] = OrderedDict() self.nicklist_needs_refresh = True self.message_hashes = SlackConversationMessageHashes(self) @classmethod async def create(cls, workspace: SlackWorkspace, conversation_id: str): info_response = await workspace.api.fetch_conversations_info(conversation_id) return cls(workspace, info_response["channel"]) @property def id(self) -> str: return self._info["id"] @property def workspace(self) -> SlackWorkspace: return self._workspace @property def context(self) -> Literal["conversation", "thread"]: return "conversation" @property def messages(self) -> Mapping[SlackTs, SlackMessage]: return self._messages @property def type(self) -> Literal["channel", "private", "mpim", "im"]: if self._info["is_im"] is True: return "im" elif self._info["is_mpim"] is True: return "mpim" elif self._info["is_private"] is True: return "private" else: return "channel" @property def buffer_type(self) -> Literal["private", "channel"]: return "private" if self.type in ("im", "mpim") else "channel" async def name(self) -> str: if self._info["is_im"] is True: im_user = await self.workspace.users[self._info["user"]] return im_user.nick() elif self._info["is_mpim"] is True: members = await self.load_members(load_all=True) member_users = await gather( *( self.workspace.users[user_id] for user_id in members if user_id != self.workspace.my_user.id ) ) return ",".join(sorted(user.nick() for user in member_users)) else: return self._info["name"] def name_prefix( self, name_type: Literal["full_name", "short_name", "short_name_without_padding"], ) -> str: if self._info["is_im"] is True: if name_type == "short_name": return " " else: return "" elif self._info["is_mpim"]: if name_type == "short_name" or name_type == "short_name_without_padding": return "@" else: return "" else: return "#" async def name_with_prefix( self, name_type: Literal["full_name", "short_name", "short_name_without_padding"], ) -> str: return f"{self.name_prefix(name_type)}{await self.name()}" async def open_if_open(self): if "is_open" in self._info: if self._info["is_open"]: await self.open_buffer() elif self._info.get("is_member"): await self.open_buffer() async def get_name_and_buffer_props(self) -> Tuple[str, Dict[str, str]]: name_without_prefix = await self.name() name = f"{self.name_prefix('full_name')}{name_without_prefix}" short_name = self.name_prefix("short_name") + name_without_prefix return name, { "short_name": short_name, "title": "topic", "input_multiline": "1", "nicklist": "0" if self.type == "im" else "1", "nicklist_display_groups": "0", "localvar_set_type": self.buffer_type, "localvar_set_slack_type": self.type, "localvar_set_nick": self.workspace.my_user.nick(), "localvar_set_channel": name, "localvar_set_server": self.workspace.name, } async def buffer_switched_to(self): await gather(self.nicklist_update(), self.fill_history()) async def open_buffer(self, switch: bool = False): await super().open_buffer(switch) self.workspace.open_conversations[self.id] = self async def rerender_message(self, message: SlackMessage): await super().rerender_message(message) parent_message = message.parent_message if parent_message and parent_message.thread_buffer: await parent_message.thread_buffer.rerender_message(message) async def load_members(self, load_all: bool = False): if self._members is None: members_response = await self._api.fetch_conversations_members( self, limit=None if load_all else 1000 ) self._members = members_response["members"] self.workspace.users.initialize_items(self._members) return self._members async def fetch_replies( self, message: SlackMessage ) -> Optional[List[SlackMessage]]: if not message.is_thread_parent: return replies_response = await self._api.fetch_conversations_replies(message) replies = [ SlackMessage(self, reply) for reply in replies_response["messages"][1:] ] for reply in replies: message.replies[reply.ts] = reply self._messages[reply.ts] = reply message.replies = OrderedDict(sorted(message.replies.items())) self._messages = OrderedDict(sorted(self._messages.items())) message.reply_history_filled = True return replies async def fill_history(self): if self.history_filled or self.history_pending: return with self.loading(): self.history_pending = True history = await self._api.fetch_conversations_history(self) conversation_messages = [ SlackMessage(self, message) for message in history["messages"] ] for message in reversed(conversation_messages): self._messages[message.ts] = message if self.display_thread_replies(): await gather( *(self.fetch_replies(message) for message in conversation_messages) ) self._messages = OrderedDict(sorted(self._messages.items())) messages = [ message for message in self._messages.values() if self.should_display_message(message) ] sender_user_ids = [m.sender_user_id for m in messages if m.sender_user_id] self.workspace.users.initialize_items(sender_user_ids) sender_bot_ids = [m.sender_bot_id for m in messages if m.sender_bot_id] self.workspace.bots.initialize_items(sender_bot_ids) await gather(*(message.render(self.context) for message in messages)) for message in messages: await self.print_message(message, backlog=True) self.history_filled = True self.history_pending = False async def nicklist_update(self): if self.nicklist_needs_refresh: self.nicklist_needs_refresh = False members = await self.load_members() weechat.nicklist_remove_all(self.buffer_pointer) await gather(*(self.nicklist_add_user(user_id) for user_id in members)) async def nicklist_add_user(self, user_id: str): user = await self.workspace.users[user_id] # TODO: weechat.color.nicklist_away color = user.nick_color() if shared.config.look.color_nicks_in_nicklist else "" weechat.nicklist_add_nick( self.buffer_pointer, "", user.nick(), color, "", "", 1 ) def display_thread_replies(self) -> bool: buffer_value = weechat.buffer_get_string( self.buffer_pointer, "localvar_display_thread_replies_in_channel" ) if buffer_value: return bool(weechat.config_string_to_boolean(buffer_value)) return shared.config.look.display_thread_replies_in_channel.value def should_display_message(self, message: SlackMessage) -> bool: return ( not message.is_reply or message.is_thread_broadcast or self.display_thread_replies() ) async def add_new_message(self, message: SlackMessage): # TODO: Remove old messages self._messages[message.ts] = message if self.should_display_message(message): if self.history_filled: await self.print_message(message) else: weechat.buffer_set( self.buffer_pointer, "hotlist", str(message.priority.value) ) parent_message = message.parent_message if parent_message: parent_message.replies[message.ts] = message if parent_message.thread_buffer: await parent_message.thread_buffer.print_message(message) if message.sender_user_id: # TODO: thread buffers user = await self.workspace.users[message.sender_user_id] weechat.hook_signal_send( "typing_set_nick", weechat.WEECHAT_HOOK_SIGNAL_STRING, f"{self.buffer_pointer};off;{user.nick()}", ) async def change_message( self, data: Union[SlackMessageChanged, SlackMessageReplied] ): ts = SlackTs(data["ts"]) message = self._messages.get(ts) if message: message.update_message_json(data["message"]) await self.rerender_message(message) async def delete_message(self, data: SlackMessageDeleted): ts = SlackTs(data["deleted_ts"]) if ts in self.message_hashes: del self.message_hashes[ts] message = self._messages.get(ts) if message: message.deleted = True await self.rerender_message(message) async def update_message_room( self, data: Union[SlackShRoomJoin, SlackShRoomUpdate] ): ts = SlackTs(data["room"]["thread_root_ts"]) message = self._messages.get(ts) if message: message.update_message_json_room(data["room"]) await self.rerender_message(message) async def reaction_add(self, message_ts: SlackTs, reaction: str, user_id: str): message = self._messages.get(message_ts) if message: message.reaction_add(reaction, user_id) await self.rerender_message(message) async def reaction_remove(self, message_ts: SlackTs, reaction: str, user_id: str): message = self._messages.get(message_ts) if message: message.reaction_remove(reaction, user_id) await self.rerender_message(message) async def open_thread(self, thread_hash: str, switch: bool = False): thread_ts = self.message_hashes.get_ts(thread_hash) if thread_ts: thread_message = self.messages[thread_ts] if thread_message.thread_buffer is None: thread_message.thread_buffer = SlackThread(thread_message) await thread_message.thread_buffer.open_buffer(switch)