aboutsummaryrefslogblamecommitdiffstats
path: root/slack/slack_message.py
blob: 792b064d8df0c4767df21b704ac5de72d5072990 (plain) (tree)
1
2
3
4
5
6
7
8
9


                                  
                     
                                                       
 

              
                                          
                                                                 
                               
                                                        
                             
                                 

                 

                                                                                      
                                                          
                                                    

 






                            
                   










                                                                      


                                   
 
                   
                                                                                        
                                         

                                     
                                        
                                             
                             




                                          
             






                                                              
                                              






                                                   
 



                                          













                                                                  
                                                       






                                                               
 




                                                           
 









                                                                                   




                                                                   
 
                                     



                                                                  



                                                                               
                            


                             
                                  

                                            
                                                                 

                                               
 
                                                                                 





                                                              
                                                                                        

                                                                             
                                                                       
             
                                                                         
                                                                    
 


                                                            






                                                                                   





                                                           
                                           


                                                                                     



























                                                                                               












                                                                                             
 


                                                              




                                                            



                      





                                                                   
                                                                                    














                                                                                   
                                                      
                                                                                   
                                                            


                                                                              

                                   
                                                  
 
                                          
                                     

                                                            
                                        




                                                             


                                          

                                                  
from __future__ import annotations

import re
from enum import Enum
from typing import TYPE_CHECKING, List, Match, Optional

import weechat

from slack.log import print_exception_once
from slack.python_compatibility import removeprefix, removesuffix
from slack.shared import shared
from slack.slack_user import format_bot_nick, nick_color
from slack.task import gather
from slack.util import with_color

if TYPE_CHECKING:
    from slack_api.slack_conversations_history import SlackMessage as SlackMessageDict

    from slack.slack_conversation import SlackConversation
    from slack.slack_workspace import SlackWorkspace


class MessagePriority(Enum):
    LOW = 0
    MESSAGE = 1
    PRIVATE = 2
    HIGHLIGHT = 3


class SlackTs(str):
    def __init__(self, ts: str):
        self.major, self.minor = [int(x) for x in ts.split(".", 1)]

    def __eq__(self, other: object) -> bool:
        if not isinstance(other, SlackTs):
            return False
        return self.major == other.major and self.minor == other.minor

    def __hash__(self) -> int:
        return hash((self.major, self.minor))

    def __repr__(self) -> str:
        return f"SlackTs('{self}')"


class SlackMessage:
    def __init__(self, conversation: SlackConversation, message_json: SlackMessageDict):
        self._message_json = message_json
        self._rendered_prefix = None
        self._rendered_message = None
        self.conversation = conversation
        self.ts = SlackTs(message_json["ts"])
        self._deleted = False

    @property
    def workspace(self) -> SlackWorkspace:
        return self.conversation.workspace

    @property
    def is_bot_message(self) -> bool:
        return (
            "subtype" in self._message_json
            and self._message_json["subtype"] == "bot_message"
        )

    @property
    def sender_user_id(self) -> Optional[str]:
        if not self.is_bot_message:
            return self._message_json.get("user")

    @property
    def sender_bot_id(self) -> Optional[str]:
        if self.is_bot_message:
            return self._message_json.get("bot_id")

    @property
    def priority(self) -> MessagePriority:
        return MessagePriority.MESSAGE

    @property
    def deleted(self) -> bool:
        return self._deleted

    @deleted.setter
    def deleted(self, value: bool):
        self._deleted = value
        self._rendered_message = None

    def update_message_json(self, message_json: SlackMessageDict):
        self._message_json = message_json
        self._rendered_prefix = None
        self._rendered_message = None

    async def tags(self, backlog: bool = False) -> str:
        nick = await self._nick(colorize=False, only_nick=True)
        tags = [f"slack_ts_{self.ts}", f"nick_{nick}"]

        if self.sender_user_id:
            tags.append(f"slack_user_id_{self.sender_user_id}")
        if self.sender_bot_id:
            tags.append(f"slack_bot_id_{self.sender_bot_id}")

        user = (
            await self.workspace.users[self.sender_user_id]
            if self.sender_user_id
            else None
        )

        if self._message_json.get("subtype") in ["channel_join", "group_join"]:
            tags.append("slack_join")
            log_tags = ["log4"]
        elif self._message_json.get("subtype") in ["channel_leave", "group_leave"]:
            tags.append("slack_part")
            log_tags = ["log4"]
        else:
            tags.append("slack_privmsg")
            if self.is_bot_message:
                tags.append("bot_message")
            if shared.weechat_version >= 0x04000000:
                if user:
                    tags.append(f"prefix_nick_{user.nick_color()}")
                else:
                    tags.append(f"prefix_nick_{nick_color(nick)}")

            if user and user.is_self:
                tags.append("self_msg")
                log_tags = ["notify_none", "no_highlight", "log1"]
            else:
                log_tags = ["notify_message", "log1"]

        if backlog:
            tags += ["no_highlight", "notify_none", "logger_backlog", "no_log"]
        else:
            tags += log_tags

        return ",".join(tags)

    async def render(self) -> str:
        prefix_coro = self.render_prefix()
        message_coro = self.render_message()
        prefix, message = await gather(prefix_coro, message_coro)
        self._rendered = f"{prefix}\t{message}"
        return self._rendered

    async def _nick(self, colorize: bool = True, only_nick: bool = False) -> str:
        if (
            "subtype" in self._message_json
            and self._message_json["subtype"] == "bot_message"
        ):
            username = self._message_json.get("username")
            if username:
                return format_bot_nick(username, colorize=colorize, only_nick=only_nick)
            else:
                bot = await self.workspace.bots[self._message_json["bot_id"]]
                return bot.nick(colorize=colorize, only_nick=only_nick)
        else:
            user = await self.workspace.users[self._message_json["user"]]
            return user.nick(colorize=colorize, only_nick=only_nick)

    async def _render_prefix(
        self, colorize: bool = True, only_nick: bool = False
    ) -> str:
        if self._message_json.get("subtype") in ["channel_join", "group_join"]:
            return removesuffix(weechat.prefix("join"), "\t")
        elif self._message_json.get("subtype") in ["channel_leave", "group_leave"]:
            return removesuffix(weechat.prefix("quit"), "\t")
        else:
            return await self._nick(colorize=colorize, only_nick=only_nick)

    async def render_prefix(self) -> str:
        if self._rendered_prefix is not None:
            return self._rendered_prefix
        self._rendered_prefix = await self._render_prefix()
        return self._rendered_prefix

    async def _render_message(self) -> str:
        if self._deleted:
            return with_color(shared.config.color.deleted_message.value, "(deleted)")
        elif self._message_json.get("subtype") in [
            "channel_join",
            "group_join",
            "channel_leave",
            "group_leave",
        ]:
            is_join = self._message_json.get("subtype") in [
                "channel_join",
                "group_join",
            ]
            text_action = (
                f"{with_color(shared.config.color.message_join.value, 'has joined')}"
                if is_join
                else f"{with_color(shared.config.color.message_quit.value, 'has left')}"
            )
            conversation_name = await self.conversation.name_with_prefix(
                "short_name_without_padding"
            )
            text_conversation_name = f"{with_color('chat_channel', conversation_name)}"

            inviter_id = self._message_json.get("inviter")
            if is_join and inviter_id:
                inviter_user = await self.workspace.users[inviter_id]
                inviter_text = f" by invitation from {inviter_user.nick(colorize=True)}"
            else:
                inviter_text = ""

            return f"{await self._nick()} {text_action} {text_conversation_name}{inviter_text}"
        else:
            text = await self._unfurl_refs(self._message_json["text"])
            text_edited = (
                f" {with_color(shared.config.color.edited_message_suffix.value, '(edited)')}"
                if self._message_json.get("edited")
                else ""
            )
            return text + text_edited

    async def render_message(self) -> str:
        if self._rendered_message is not None:
            return self._rendered_message
        self._rendered_message = await self._render_message()
        return self._rendered_message

    def _item_prefix(self, item_id: str):
        if item_id.startswith("#") or item_id.startswith("@"):
            return item_id[0]
        elif item_id.startswith("!subteam^") or item_id in [
            "!here",
            "!channel",
            "!everyone",
        ]:
            return "@"
        else:
            return ""

    async def _lookup_item_id(self, item_id: str):
        if item_id.startswith("#"):
            conversation = await self.workspace.conversations[
                removeprefix(item_id, "#")
            ]
            color = shared.config.color.channel_mention_color.value
            name = await conversation.name_with_prefix("short_name_without_padding")
            return (color, name)
        elif item_id.startswith("@"):
            user = await self.workspace.users[removeprefix(item_id, "@")]
            color = shared.config.color.user_mention_color.value
            return (color, self._item_prefix(item_id) + user.nick())
        elif item_id.startswith("!subteam^"):
            usergroup = await self.workspace.usergroups[
                removeprefix(item_id, "!subteam^")
            ]
            color = shared.config.color.usergroup_mention_color.value
            return (color, self._item_prefix(item_id) + usergroup.handle())
        elif item_id in ["!here", "!channel", "!everyone"]:
            color = shared.config.color.usergroup_mention_color.value
            return (color, self._item_prefix(item_id) + removeprefix(item_id, "!"))

    async def _unfurl_refs(self, message: str) -> str:
        re_mention = re.compile(r"<(?P<id>[^|>]+)(?:\|(?P<fallback_name>[^>]*))?>")
        mention_matches = list(re_mention.finditer(message))
        mention_ids: List[str] = [match["id"] for match in mention_matches]
        items_list = await gather(
            *(self._lookup_item_id(mention_id) for mention_id in mention_ids),
            return_exceptions=True,
        )
        items = dict(zip(mention_ids, items_list))

        def unfurl_ref(match: Match[str]):
            item = items[match["id"]]
            if item and not isinstance(item, BaseException):
                return with_color(item[0], item[1])
            elif match["fallback_name"]:
                prefix = self._item_prefix(match["id"])
                if match["fallback_name"].startswith(prefix):
                    return match["fallback_name"]
                else:
                    return prefix + match["fallback_name"]
            elif item:
                print_exception_once(item)
            return match[0]

        return re_mention.sub(unfurl_ref, message)