aboutsummaryrefslogtreecommitdiffstats
path: root/slack
diff options
context:
space:
mode:
authorTrygve Aaberge <trygveaa@gmail.com>2023-08-23 23:10:20 +0200
committerTrygve Aaberge <trygveaa@gmail.com>2024-02-18 11:32:54 +0100
commit2df26d96ec689b965b925709c5ca223c2094d9e7 (patch)
treef150b22f09e78e70deb050c96c8fdcf3d00eccca /slack
parentfed3a064a788b94f1c0b420bf276c158f8197ddf (diff)
downloadwee-slack-2df26d96ec689b965b925709c5ca223c2094d9e7.tar.gz
Use blocks to render messages
Diffstat (limited to 'slack')
-rw-r--r--slack/config.py7
-rw-r--r--slack/error.py9
-rw-r--r--slack/slack_message.py301
3 files changed, 305 insertions, 12 deletions
diff --git a/slack/config.py b/slack/config.py
index 864ac18..5ea0cbc 100644
--- a/slack/config.py
+++ b/slack/config.py
@@ -90,6 +90,13 @@ class SlackConfigSectionColor:
WeeChatColor("blue"),
)
+ self.render_error = WeeChatOption(
+ self._section,
+ "render_error",
+ "color for displaying rendering errors in a message",
+ WeeChatColor("red"),
+ )
+
self.user_mention_color = WeeChatOption(
self._section,
"user_mention_color",
diff --git a/slack/error.py b/slack/error.py
index c978584..1a13504 100644
--- a/slack/error.py
+++ b/slack/error.py
@@ -92,8 +92,8 @@ def format_exception_only_str(exc: BaseException) -> str:
return format_exception_only(exc)[-1].strip()
-def store_and_format_exception(e: BaseException):
- uncaught_error = UncaughtError(e)
+def store_and_format_uncaught_error(uncaught_error: UncaughtError) -> str:
+ e = uncaught_error.exception
shared.uncaught_errors.append(uncaught_error)
stack_msg_command = f"/slack debug error {uncaught_error.id}"
stack_msg = f"run `{stack_msg_command}` for the stack trace"
@@ -120,3 +120,8 @@ def store_and_format_exception(e: BaseException):
)
else:
return f"Unknown error occurred: {format_exception_only_str(e)} ({stack_msg})"
+
+
+def store_and_format_exception(e: BaseException) -> str:
+ uncaught_error = UncaughtError(e)
+ return store_and_format_uncaught_error(uncaught_error)
diff --git a/slack/slack_message.py b/slack/slack_message.py
index ab72582..356f05a 100644
--- a/slack/slack_message.py
+++ b/slack/slack_message.py
@@ -2,11 +2,12 @@ from __future__ import annotations
import re
from enum import Enum
-from typing import TYPE_CHECKING, List, Match, Optional
+from typing import TYPE_CHECKING, List, Match, Optional, Union
import weechat
-from slack.log import print_exception_once
+from slack.error import UncaughtError, store_and_format_uncaught_error
+from slack.log import print_error, print_exception_once
from slack.python_compatibility import removeprefix, removesuffix
from slack.shared import shared
from slack.slack_user import format_bot_nick, nick_color
@@ -15,13 +16,54 @@ from slack.util import with_color
if TYPE_CHECKING:
from slack_api.slack_conversations_history import SlackMessage as SlackMessageDict
- from slack_api.slack_conversations_history import SlackMessageReaction
+ from slack_api.slack_conversations_history import (
+ SlackMessageBlock,
+ SlackMessageBlockCompositionText,
+ SlackMessageBlockElementImage,
+ SlackMessageBlockRichTextElement,
+ SlackMessageBlockRichTextList,
+ SlackMessageBlockRichTextSection,
+ SlackMessageReaction,
+ )
from typing_extensions import assert_never
from slack.slack_conversation import SlackConversation
from slack.slack_workspace import SlackWorkspace
+def convert_int_to_letter(num: int) -> str:
+ letter = ""
+ while num > 0:
+ num -= 1
+ letter = chr((num % 26) + 97) + letter
+ num //= 26
+ return letter
+
+
+def convert_int_to_roman(num: int) -> str:
+ roman_numerals = {
+ 1000: "m",
+ 900: "cm",
+ 500: "d",
+ 400: "cd",
+ 100: "c",
+ 90: "xc",
+ 50: "l",
+ 40: "xl",
+ 10: "x",
+ 9: "ix",
+ 5: "v",
+ 4: "iv",
+ 1: "i",
+ }
+ roman_numeral = ""
+ for value, symbol in roman_numerals.items():
+ while num >= value:
+ roman_numeral += symbol
+ num -= value
+ return roman_numeral
+
+
class MessagePriority(Enum):
LOW = 0
MESSAGE = 1
@@ -237,11 +279,16 @@ class SlackMessage:
return f"{await self._nick()} {text_action} {text_conversation_name}{inviter_text}"
- elif self._message_json.get("subtype") == "me_message":
- text = await self._unfurl_refs(self._message_json["text"])
- return f"{await self._nick()} {text}"
else:
- return await self._unfurl_refs(self._message_json["text"])
+ if "blocks" in self._message_json:
+ text = await self._render_blocks(self._message_json["blocks"])
+ else:
+ text = await self._unfurl_refs(self._message_json["text"])
+
+ if self._message_json.get("subtype") == "me_message":
+ return f"{await self._nick()} {text}"
+ else:
+ return text
async def _render_message(self) -> str:
if self._deleted:
@@ -322,7 +369,7 @@ class SlackMessage:
return re_mention.sub(unfurl_ref, message)
- def _get_emoji(self, emoji_name: str) -> str:
+ def _get_emoji(self, emoji_name: str, skin_tone: Optional[int] = None) -> str:
emoji_name_with_colons = f":{emoji_name}:"
if shared.config.look.render_emoji_as.value == "name":
return emoji_name_with_colons
@@ -331,10 +378,19 @@ class SlackMessage:
if emoji_item is None:
return emoji_name_with_colons
+ skin_tone_item = (
+ emoji_item.get("skinVariations", {}).get(str(skin_tone))
+ if skin_tone
+ else None
+ )
+ emoji_unicode = (
+ skin_tone_item["unicode"] if skin_tone_item else emoji_item["unicode"]
+ )
+
if shared.config.look.render_emoji_as.value == "emoji":
- return emoji_item["unicode"]
+ return emoji_unicode
elif shared.config.look.render_emoji_as.value == "both":
- return f"{emoji_item['unicode']}({emoji_name_with_colons})"
+ return f"{emoji_unicode}({emoji_name_with_colons})"
else:
assert_never(shared.config.look.render_emoji_as.value)
@@ -378,3 +434,228 @@ class SlackMessage:
)
else:
return ""
+
+ async def _render_blocks(self, blocks: List[SlackMessageBlock]) -> str:
+ block_texts: List[str] = []
+ for block in blocks:
+ try:
+ if block["type"] == "section":
+ fields = block.get("fields", [])
+ if "text" in block:
+ fields.insert(0, block["text"])
+ block_texts.extend(
+ self._render_block_element(field) for field in fields
+ )
+ elif block["type"] == "actions":
+ texts: List[str] = []
+ for element in block["elements"]:
+ if element["type"] == "button":
+ texts.append(self._render_block_element(element["text"]))
+ if "url" in element:
+ texts.append(element["url"])
+ else:
+ text = (
+ f'<Unsupported block action type "{element["type"]}">'
+ )
+ texts.append(
+ with_color(shared.config.color.render_error.value, text)
+ )
+ block_texts.append(" | ".join(texts))
+ elif block["type"] == "call":
+ url = block["call"]["v1"]["join_url"]
+ block_texts.append("Join via " + url)
+ elif block["type"] == "divider":
+ block_texts.append("---")
+ elif block["type"] == "context":
+ block_texts.append(
+ " | ".join(
+ self._render_block_element(element)
+ for element in block["elements"]
+ )
+ )
+ elif block["type"] == "image":
+ if "title" in block:
+ block_texts.append(self._render_block_element(block["title"]))
+ block_texts.append(self._render_block_element(block))
+ elif block["type"] == "rich_text":
+ for element in block.get("elements", []):
+ if element["type"] == "rich_text_section":
+ rendered = await self._render_block_rich_text_section(
+ element
+ )
+ if rendered:
+ block_texts.append(rendered)
+ elif element["type"] == "rich_text_list":
+ rendered = [
+ "{}{} {}".format(
+ " " * element.get("indent", 0),
+ self._render_block_rich_text_list_prefix(
+ element, item_index
+ ),
+ await self._render_block_rich_text_section(
+ item_element
+ ),
+ )
+ for item_index, item_element in enumerate(
+ element["elements"]
+ )
+ ]
+ block_texts.extend(rendered)
+ elif element["type"] == "rich_text_quote":
+ lines = [
+ f"> {line}"
+ for sub_element in element["elements"]
+ for line in (
+ await self._render_block_rich_text_element(
+ sub_element
+ )
+ ).split("\n")
+ ]
+ block_texts.extend(lines)
+ elif element["type"] == "rich_text_preformatted":
+ texts = [
+ sub_element.get("text", sub_element.get("url", ""))
+ for sub_element in element["elements"]
+ ]
+ if texts:
+ block_texts.append(f"```\n{''.join(texts)}\n```")
+ else:
+ text = f'<Unsupported rich text type "{element["type"]}">'
+ block_texts.append(
+ with_color(shared.config.color.render_error.value, text)
+ )
+ else:
+ text = f'<Unsupported block type "{block["type"]}">'
+ block_texts.append(
+ with_color(shared.config.color.render_error.value, text)
+ )
+ except Exception as e:
+ uncaught_error = UncaughtError(e)
+ print_error(store_and_format_uncaught_error(uncaught_error))
+ text = f"<Error rendering message, error id: {uncaught_error.id}>"
+ block_texts.append(
+ with_color(shared.config.color.render_error.value, text)
+ )
+
+ return "\n".join(block_texts)
+
+ async def _render_block_rich_text_section(
+ self, section: SlackMessageBlockRichTextSection
+ ) -> str:
+ texts: List[str] = []
+ prev_element: SlackMessageBlockRichTextElement = {"type": "text", "text": ""}
+ for element in section["elements"] + [prev_element.copy()]:
+ colors_apply: List[str] = []
+ colors_remove: List[str] = []
+ characters_apply: List[str] = []
+ characters_remove: List[str] = []
+ prev_style = prev_element.get("style", {})
+ cur_style = element.get("style", {})
+ if cur_style.get("bold", False) != prev_style.get("bold", False):
+ if cur_style.get("bold"):
+ colors_apply.append(weechat.color("bold"))
+ characters_apply.append("*")
+ else:
+ colors_remove.append(weechat.color("-bold"))
+ characters_remove.append("*")
+ if cur_style.get("italic", False) != prev_style.get("italic", False):
+ if cur_style.get("italic"):
+ colors_apply.append(weechat.color("italic"))
+ characters_apply.append("_")
+ else:
+ colors_remove.append(weechat.color("-italic"))
+ characters_remove.append("_")
+ if cur_style.get("strike", False) != prev_style.get("strike", False):
+ if cur_style.get("strike"):
+ characters_apply.append("~")
+ else:
+ characters_remove.append("~")
+ if cur_style.get("code", False) != prev_style.get("code", False):
+ if cur_style.get("code"):
+ characters_apply.append("`")
+ else:
+ characters_remove.append("`")
+
+ texts.extend(reversed(characters_remove))
+ texts.extend(reversed(colors_remove))
+ texts.extend(colors_apply)
+ texts.extend(characters_apply)
+ texts.append(await self._render_block_rich_text_element(element))
+ prev_element = element
+
+ text = "".join(texts)
+
+ if text.endswith("\n"):
+ return text[:-1]
+ else:
+ return text
+
+ async def _render_block_rich_text_element(
+ self, element: SlackMessageBlockRichTextElement
+ ) -> str:
+ if element["type"] == "text":
+ return element["text"]
+ elif element["type"] == "link":
+ if "text" in element:
+ if element.get("style", {}).get("code"):
+ return element["text"]
+ else:
+ return f"{element['url']} ({element['text']})"
+ else:
+ return element["url"]
+ elif element["type"] == "emoji":
+ return self._get_emoji(element["name"], element.get("skin_tone"))
+ elif element["type"] == "channel":
+ conversation = await self.workspace.conversations[element["channel_id"]]
+ name = await conversation.name_with_prefix("short_name_without_padding")
+ return with_color(shared.config.color.channel_mention_color.value, name)
+ elif element["type"] == "user":
+ user = await self.workspace.users[element["user_id"]]
+ name = f"@{user.nick()}"
+ return with_color(shared.config.color.user_mention_color.value, name)
+ elif element["type"] == "usergroup":
+ # TODO: Handle error
+ usergroup = await self.workspace.usergroups[element["usergroup_id"]]
+ name = f"@{usergroup.handle()}"
+ return with_color(shared.config.color.usergroup_mention_color.value, name)
+ elif element["type"] == "broadcast":
+ name = f"@{element['range']}"
+ return with_color(shared.config.color.usergroup_mention_color.value, name)
+ else:
+ text = f'<Unsupported rich text element type "{element["type"]}">'
+ return with_color(shared.config.color.render_error.value, text)
+
+ def _render_block_element(
+ self,
+ element: Union[SlackMessageBlockCompositionText, SlackMessageBlockElementImage],
+ ) -> str:
+ if element["type"] == "plain_text" or element["type"] == "mrkdwn":
+ # TODO: Support markdown, and verbatim and emoji properties
+ return element["text"]
+ elif element["type"] == "image":
+ if element.get("alt_text"):
+ return f"{element['image_url']} ({element['alt_text']})"
+ else:
+ return element["image_url"]
+ else:
+ text = f'<Unsupported block element type "{element["type"]}">'
+ return with_color(shared.config.color.render_error.value, text)
+
+ def _render_block_rich_text_list_prefix(
+ self, list_element: SlackMessageBlockRichTextList, item_index: int
+ ) -> str:
+ index = list_element.get("offset", 0) + item_index + 1
+ if list_element["style"] == "ordered":
+ if list_element["indent"] == 0 or list_element["indent"] == 3:
+ return f"{index}."
+ elif list_element["indent"] == 1 or list_element["indent"] == 4:
+ return f"{convert_int_to_letter(index)}."
+ else:
+ return f"{convert_int_to_roman(index)}."
+ else:
+ if list_element["indent"] == 0 or list_element["indent"] == 3:
+ return "•"
+ elif list_element["indent"] == 1 or list_element["indent"] == 4:
+ return "◦"
+ else:
+ return "▪︎"