from __future__ import annotations
import re
from enum import Enum
from typing import TYPE_CHECKING, List, Match, Optional
import weechat
from slack.log import print_exception_once
from slack.python_compatibility import removeprefix, removesuffix
from slack.shared import shared
from slack.slack_user import format_bot_nick, nick_color
from slack.task import gather
from slack.util import with_color
if TYPE_CHECKING:
from slack_api.slack_conversations_history import SlackMessage as SlackMessageDict
from slack.slack_conversation import SlackConversation
from slack.slack_workspace import SlackWorkspace
class MessagePriority(Enum):
LOW = 0
MESSAGE = 1
PRIVATE = 2
HIGHLIGHT = 3
class SlackTs(str):
def __init__(self, ts: str):
self.major, self.minor = [int(x) for x in ts.split(".", 1)]
def __eq__(self, other: object) -> bool:
if not isinstance(other, SlackTs):
return False
return self.major == other.major and self.minor == other.minor
def __hash__(self) -> int:
return hash((self.major, self.minor))
def __repr__(self) -> str:
return f"SlackTs('{self}')"
class SlackMessage:
def __init__(self, conversation: SlackConversation, message_json: SlackMessageDict):
self._message_json = message_json
self._rendered_prefix = None
self._rendered_message = None
self.conversation = conversation
self.ts = SlackTs(message_json["ts"])
self._deleted = False
@property
def workspace(self) -> SlackWorkspace:
return self.conversation.workspace
@property
def is_bot_message(self) -> bool:
return (
"subtype" in self._message_json
and self._message_json["subtype"] == "bot_message"
)
@property
def sender_user_id(self) -> Optional[str]:
if not self.is_bot_message:
return self._message_json.get("user")
@property
def sender_bot_id(self) -> Optional[str]:
if self.is_bot_message:
return self._message_json.get("bot_id")
@property
def priority(self) -> MessagePriority:
return MessagePriority.MESSAGE
@property
def deleted(self) -> bool:
return self._deleted
@deleted.setter
def deleted(self, value: bool):
self._deleted = value
self._rendered_message = None
def update_message_json(self, message_json: SlackMessageDict):
self._message_json = message_json
self._rendered_prefix = None
self._rendered_message = None
async def tags(self, backlog: bool = False) -> str:
nick = await self._nick(colorize=False, only_nick=True)
tags = [f"slack_ts_{self.ts}", f"nick_{nick}"]
if self.sender_user_id:
tags.append(f"slack_user_id_{self.sender_user_id}")
if self.sender_bot_id:
tags.append(f"slack_bot_id_{self.sender_bot_id}")
user = (
await self.workspace.users[self.sender_user_id]
if self.sender_user_id
else None
)
if self._message_json.get("subtype") in ["channel_join", "group_join"]:
tags.append("slack_join")
log_tags = ["log4"]
elif self._message_json.get("subtype") in ["channel_leave", "group_leave"]:
tags.append("slack_part")
log_tags = ["log4"]
else:
tags.append("slack_privmsg")
if self.is_bot_message:
tags.append("bot_message")
if shared.weechat_version >= 0x04000000:
if user:
tags.append(f"prefix_nick_{user.nick_color()}")
else:
tags.append(f"prefix_nick_{nick_color(nick)}")
if user and user.is_self:
tags.append("self_msg")
log_tags = ["notify_none", "no_highlight", "log1"]
else:
log_tags = ["notify_message", "log1"]
if backlog:
tags += ["no_highlight", "notify_none", "logger_backlog", "no_log"]
else:
tags += log_tags
return ",".join(tags)
async def render(self) -> str:
prefix_coro = self.render_prefix()
message_coro = self.render_message()
prefix, message = await gather(prefix_coro, message_coro)
self._rendered = f"{prefix}\t{message}"
return self._rendered
async def _nick(self, colorize: bool = True, only_nick: bool = False) -> str:
if (
"subtype" in self._message_json
and self._message_json["subtype"] == "bot_message"
):
username = self._message_json.get("username")
if username:
return format_bot_nick(username, colorize=colorize, only_nick=only_nick)
else:
bot = await self.workspace.bots[self._message_json["bot_id"]]
return bot.nick(colorize=colorize, only_nick=only_nick)
else:
user = await self.workspace.users[self._message_json["user"]]
return user.nick(colorize=colorize, only_nick=only_nick)
async def _render_prefix(
self, colorize: bool = True, only_nick: bool = False
) -> str:
if self._message_json.get("subtype") in ["channel_join", "group_join"]:
return removesuffix(weechat.prefix("join"), "\t")
elif self._message_json.get("subtype") in ["channel_leave", "group_leave"]:
return removesuffix(weechat.prefix("quit"), "\t")
else:
return await self._nick(colorize=colorize, only_nick=only_nick)
async def render_prefix(self) -> str:
if self._rendered_prefix is not None:
return self._rendered_prefix
self._rendered_prefix = await self._render_prefix()
return self._rendered_prefix
async def _render_message(self) -> str:
if self._deleted:
return with_color(shared.config.color.deleted_message.value, "(deleted)")
elif self._message_json.get("subtype") in [
"channel_join",
"group_join",
"channel_leave",
"group_leave",
]:
is_join = self._message_json.get("subtype") in [
"channel_join",
"group_join",
]
text_action = (
f"{with_color(shared.config.color.message_join.value, 'has joined')}"
if is_join
else f"{with_color(shared.config.color.message_quit.value, 'has left')}"
)
conversation_name = await self.conversation.name_with_prefix(
"short_name_without_padding"
)
text_conversation_name = f"{with_color('chat_channel', conversation_name)}"
inviter_id = self._message_json.get("inviter")
if is_join and inviter_id:
inviter_user = await self.workspace.users[inviter_id]
inviter_text = f" by invitation from {inviter_user.nick(colorize=True)}"
else:
inviter_text = ""
return f"{await self._nick()} {text_action} {text_conversation_name}{inviter_text}"
else:
text = await self._unfurl_refs(self._message_json["text"])
text_edited = (
f" {with_color(shared.config.color.edited_message_suffix.value, '(edited)')}"
if self._message_json.get("edited")
else ""
)
return text + text_edited
async def render_message(self) -> str:
if self._rendered_message is not None:
return self._rendered_message
self._rendered_message = await self._render_message()
return self._rendered_message
def _item_prefix(self, item_id: str):
if item_id.startswith("#") or item_id.startswith("@"):
return item_id[0]
elif item_id.startswith("!subteam^") or item_id in [
"!here",
"!channel",
"!everyone",
]:
return "@"
else:
return ""
async def _lookup_item_id(self, item_id: str):
if item_id.startswith("#"):
conversation = await self.workspace.conversations[
removeprefix(item_id, "#")
]
color = shared.config.color.channel_mention_color.value
name = await conversation.name_with_prefix("short_name_without_padding")
return (color, name)
elif item_id.startswith("@"):
user = await self.workspace.users[removeprefix(item_id, "@")]
color = shared.config.color.user_mention_color.value
return (color, self._item_prefix(item_id) + user.nick())
elif item_id.startswith("!subteam^"):
usergroup = await self.workspace.usergroups[
removeprefix(item_id, "!subteam^")
]
color = shared.config.color.usergroup_mention_color.value
return (color, self._item_prefix(item_id) + usergroup.handle())
elif item_id in ["!here", "!channel", "!everyone"]:
color = shared.config.color.usergroup_mention_color.value
return (color, self._item_prefix(item_id) + removeprefix(item_id, "!"))
async def _unfurl_refs(self, message: str) -> str:
re_mention = re.compile(r"<(?P<id>[^|>]+)(?:\|(?P<fallback_name>[^>]*))?>")
mention_matches = list(re_mention.finditer(message))
mention_ids: List[str] = [match["id"] for match in mention_matches]
items_list = await gather(
*(self._lookup_item_id(mention_id) for mention_id in mention_ids),
return_exceptions=True,
)
items = dict(zip(mention_ids, items_list))
def unfurl_ref(match: Match[str]):
item = items[match["id"]]
if item and not isinstance(item, BaseException):
return with_color(item[0], item[1])
elif match["fallback_name"]:
prefix = self._item_prefix(match["id"])
if match["fallback_name"].startswith(prefix):
return match["fallback_name"]
else:
return prefix + match["fallback_name"]
elif item:
print_exception_once(item)
return match[0]
return re_mention.sub(unfurl_ref, message)