from __future__ import annotations
import json
import re
import time
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from urllib.parse import urlencode
import weechat
from slack.http import http_request
from slack.shared import shared
from slack.task import Future, create_task, gather
from slack.util import get_callback_name
if TYPE_CHECKING:
from slack_api import SlackConversationInfoResponse
def get_conversation_from_buffer_pointer(
buffer_pointer: str,
) -> Optional[SlackConversation]:
for workspace in shared.workspaces.values():
for conversation in workspace.conversations.values():
if conversation.buffer_pointer == buffer_pointer:
return conversation
class SlackApi:
def __init__(self, workspace: SlackWorkspace):
self.workspace = workspace
def get_request_options(self):
return {
"useragent": f"wee_slack {shared.SCRIPT_VERSION}",
"httpheader": f"Authorization: Bearer {self.workspace.config.api_token.value}",
"cookie": self.workspace.config.api_cookies.value,
}
async def fetch(self, method: str, params: Dict[str, Union[str, int]] = {}):
url = f"https://api.slack.com/api/{method}?{urlencode(params)}"
response = await http_request(
url,
self.get_request_options(),
self.workspace.config.slack_timeout.value * 1000,
)
return json.loads(response)
async def fetch_list(
self,
method: str,
list_key: str,
params: Dict[str, Union[str, int]] = {},
pages: int = 1, # negative or 0 means all pages
):
response = await self.fetch(method, params)
next_cursor = response.get("response_metadata", {}).get("next_cursor")
if pages != 1 and next_cursor and response["ok"]:
params["cursor"] = next_cursor
next_pages = await self.fetch_list(method, list_key, params, pages - 1)
response[list_key].extend(next_pages[list_key])
return response
return response
class SlackWorkspace:
def __init__(self, name: str):
self.name = name
self.config = shared.config.create_workspace_config(self.name)
self.api = SlackApi(self)
self.is_connected = False
self.nick = "TODO"
# Maybe make private, so you have to use get_user? Maybe make get_user a getter, though don't know if that's a problem since it's async
self.users: Dict[str, Future[SlackUser]] = {}
self.conversations: Dict[str, SlackConversation] = {}
async def connect(self):
# rtm_connect = await self.api.fetch("rtm.connect")
user_channels_response = await self.api.fetch_list(
"users.conversations",
"channels",
{
"exclude_archived": True,
# "types": "public_channel,private_channel,im",
"types": "public_channel",
"limit": 1000,
},
-1,
)
user_channels = user_channels_response["channels"]
for channel in user_channels:
conversation = SlackConversation(self, channel["id"])
self.conversations[channel["id"]] = conversation
create_task(conversation.init())
# print(rtm_connect)
# print([c["name"] for c in user_channels])
self.is_connected = True
weechat.bar_item_update("input_text")
async def create_user(self, id: str) -> SlackUser:
user = SlackUser(self, id)
await user.init()
return user
async def get_user(self, id: str) -> SlackUser:
if id in self.users:
return await self.users[id]
self.users[id] = create_task(self.create_user(id))
return await self.users[id]
class SlackUser:
def __init__(self, workspace: SlackWorkspace, id: str):
self.workspace = workspace
self.id = id
self.name: str
@property
def api(self) -> SlackApi:
return self.workspace.api
async def init(self):
info = await self.api.fetch("users.info", {"user": self.id})
self.name = info["user"]["name"]
def buffer_input_cb(data: str, buffer: str, input_data: str) -> int:
weechat.prnt(buffer, "Text: %s" % input_data)
return weechat.WEECHAT_RC_OK
class SlackConversation:
def __init__(self, workspace: SlackWorkspace, id: str):
self.workspace = workspace
self.id = id
# TODO: buffer_pointer may be accessed by buffer_switch before it's initialized
self.buffer_pointer: str = ""
self.name: str
self.is_loading = False
self.history_filled = False
self.history_pending = False
@property
def api(self) -> SlackApi:
return self.workspace.api
@contextmanager
def loading(self):
self.is_loading = True
weechat.bar_item_update("input_text")
try:
yield
finally:
self.is_loading = False
weechat.bar_item_update("input_text")
async def init(self):
with self.loading():
info = await self.fetch_info()
if info["ok"] != True:
# TODO: Handle error
return
info_channel = info["channel"]
if info_channel["is_im"] == True:
self.name = "IM" # TODO
elif info_channel["is_mpim"] == True:
self.name = "MPIM" # TODO
else:
self.name = info_channel["name"]
self.buffer_pointer = weechat.buffer_new(
self.name, get_callback_name(buffer_input_cb), "", "", ""
)
weechat.buffer_set(self.buffer_pointer, "localvar_set_nick", "nick")
async def fetch_info(self) -> SlackConversationInfoResponse:
with self.loading():
info = await self.api.fetch("conversations.info", {"channel": self.id})
return info
async def fill_history(self):
if self.history_filled or self.history_pending:
return
with self.loading():
self.history_pending = True
history = await self.api.fetch(
"conversations.history", {"channel": self.id}
)
start = time.time()
messages = [SlackMessage(self, message) for message in history["messages"]]
messages_rendered = await gather(
*(message.render_message() for message in messages)
)
for rendered in reversed(messages_rendered):
weechat.prnt(self.buffer_pointer, rendered)
print(f"history w/o fetch took: {time.time() - start}")
self.history_filled = True
self.history_pending = False
class SlackMessage:
def __init__(self, conversation: SlackConversation, message_json: Any):
self.conversation = conversation
self.ts = message_json["ts"]
self.message_json = message_json
@property
def workspace(self) -> SlackWorkspace:
return self.conversation.workspace
@property
def api(self) -> SlackApi:
return self.workspace.api
async def render_message(self):
message = await self.unfurl_refs(self.message_json["text"])
if "user" in self.message_json:
user = await self.workspace.get_user(self.message_json["user"])
prefix = user.name
else:
prefix = "bot"
return f"{prefix}\t{message}"
async def unfurl_refs(self, message: str):
re_user = re.compile("<@([^>]+)>")
user_ids: List[str] = re_user.findall(message)
users_list = await gather(
*(self.workspace.get_user(user_id) for user_id in user_ids)
)
users = dict(zip(user_ids, users_list))
def unfurl_user(user_id: str):
return "@" + users[user_id].name
return re_user.sub(lambda match: unfurl_user(match.group(1)), message)