diff options
author | Trygve Aaberge <trygveaa@gmail.com> | 2022-10-10 12:08:56 +0200 |
---|---|---|
committer | Trygve Aaberge <trygveaa@gmail.com> | 2024-02-18 11:32:52 +0100 |
commit | 7c919da30d2209f84c070e9b7e4540d7cde531d4 (patch) | |
tree | 65b8799eb6c73e6ce2d9b355c8a392013bafd4c4 /slack.py | |
parent | 4e8c994e8e1a2efe818ca594a944c2ce8fa431b0 (diff) | |
download | wee-slack-7c919da30d2209f84c070e9b7e4540d7cde531d4.tar.gz |
Start on new version of wee-slack
Diffstat (limited to 'slack.py')
-rw-r--r-- | slack.py | 343 |
1 files changed, 343 insertions, 0 deletions
diff --git a/slack.py b/slack.py new file mode 100644 index 0000000..4e92ca2 --- /dev/null +++ b/slack.py @@ -0,0 +1,343 @@ +from __future__ import annotations +from enum import IntEnum +from io import StringIO +import json +import os +import resource +from typing import ( + Any, + Dict, + Generator, + Generic, + NamedTuple, + TYPE_CHECKING, + Tuple, + TypeVar, + Union, +) +from typing import Coroutine +from urllib.parse import urlencode +from uuid import uuid4 + +import weechat + + +if TYPE_CHECKING: + from slack_api import ( + SlackConversation, + SlackConversationNotIm, + SlackConversationIm, + ) +else: + # To support running without slack types + SlackConversation = Any + SlackConversationNotIm = Any + SlackConversationIm = Any + +SCRIPT_NAME = "slack" +SCRIPT_AUTHOR = "Trygve Aaberge <trygveaa@gmail.com>" +SCRIPT_VERSION = "3.0.0" +SCRIPT_LICENSE = "MIT" +SCRIPT_DESC = "Extends weechat for typing notification/search/etc on slack.com" +REPO_URL = "https://github.com/wee-slack/wee-slack" + +### Basic classes + +T = TypeVar("T") + + +class LogLevel(IntEnum): + TRACE = 1 + DEBUG = 2 + INFO = 3 + WARN = 4 + ERROR = 5 + FATAL = 6 + + +class HttpError(Exception): + def __init__(self, url: str, return_code: int, http_status: int, error: str): + super().__init__() + self.url = url + self.return_code = return_code + self.http_status = http_status + self.error = error + + +class Future(Generic[T]): + def __init__(self): + self.id = str(uuid4()) + + def __await__(self) -> Generator[Future[T], T, T]: + return (yield self) + + +class FutureProcess(Future[Tuple[str, int, str, str]]): + pass + + +class FutureTimer(Future[Tuple[int]]): + pass + + +class Task(Future[T]): + def __init__(self, coroutine: Coroutine[Future[Any], Any, T], final: bool): + super().__init__() + self.coroutine = coroutine + self.final = final + + +### Helpers + + +def log(level: LogLevel, message: str): + if level >= LogLevel.INFO: + print(level, message) + + +def available_file_descriptors(): + num_current_file_descriptors = len(os.listdir("/proc/self/fd/")) + max_file_descriptors = min(resource.getrlimit(resource.RLIMIT_NOFILE)) + return max_file_descriptors - num_current_file_descriptors + + +### WeeChat callbacks + +active_tasks: Dict[str, Task[Any]] = {} +active_responses: Dict[str, Tuple[Any, ...]] = {} + + +def shutdown_cb(): + return weechat.WEECHAT_RC_OK + + +def weechat_task_cb(data: str, *args: Any) -> int: + task = active_tasks.pop(data) + task_runner(task, args) + return weechat.WEECHAT_RC_OK + + +### WeeChat helpers + + +def task_runner(task: Task[Any], response: Any): + while True: + try: + future = task.coroutine.send(response) + if future.id in active_responses: + response = active_responses.pop(future.id) + else: + if future.id in active_tasks: + raise Exception( + f"future.id in active_tasks, {future.id}, {active_tasks}" + ) + active_tasks[future.id] = task + break + except StopIteration as e: + if task.id in active_tasks: + task = active_tasks.pop(task.id) + response = e.value + else: + if task.id in active_responses: + raise Exception( + f"task.id in active_responses, {task.id}, {active_responses}" + ) + if not task.final: + active_responses[task.id] = e.value + break + + +def create_task( + coroutine: Coroutine[Future[Any], Any, T], final: bool = False +) -> Task[T]: + task = Task(coroutine, final) + task_runner(task, None) + return task + + +async def sleep(milliseconds: int): + future = FutureTimer() + weechat.hook_timer(milliseconds, 0, 1, weechat_task_cb.__name__, future.id) + return await future + + +async def hook_process_hashtable(command: str, options: Dict[str, str], timeout: int): + future = FutureProcess() + log( + LogLevel.DEBUG, + f"hook_process_hashtable calling ({future.id}): command: {command}", + ) + while available_file_descriptors() < 10: + await sleep(10) + weechat.hook_process_hashtable( + command, options, timeout, weechat_task_cb.__name__, future.id + ) + + stdout = StringIO() + stderr = StringIO() + return_code = -1 + + while return_code == -1: + _, return_code, out, err = await future + log( + LogLevel.TRACE, + f"hook_process_hashtable intermediary response ({future.id}): command: {command}", + ) + stdout.write(out) + stderr.write(err) + + out = stdout.getvalue() + err = stderr.getvalue() + log( + LogLevel.DEBUG, + f"hook_process_hashtable response ({future.id}): command: {command}, " + f"return_code: {return_code}, response length: {len(out)}" + + (f", error: {err}" if err else ""), + ) + + return command, return_code, out, err + + +async def http_request( + url: str, options: Dict[str, str], timeout: int, max_retries: int = 5 +) -> str: + options["header"] = "1" + _, return_code, out, err = await hook_process_hashtable( + f"url:{url}", options, timeout + ) + + if return_code != 0 or err: + if max_retries > 0: + log( + LogLevel.INFO, + f"HTTP error, retrying (max {max_retries} times): " + f"return_code: {return_code}, error: {err}, url: {url}", + ) + await sleep(1000) + return await http_request(url, options, timeout, max_retries - 1) + raise HttpError(url, return_code, 0, err) + + headers_end_index = out.index("\r\n\r\n") + headers = out[:headers_end_index].split("\r\n") + http_status = int(headers[0].split(" ")[1]) + + if http_status == 429: + for header in headers[1:]: + name, value = header.split(":", 1) + if name.lower() == "retry-after": + retry_after = int(value.strip()) + log( + LogLevel.INFO, + f"HTTP ratelimit, retrying in {retry_after} seconds, url: {url}", + ) + await sleep(retry_after * 1000) + return await http_request(url, options, timeout) + + body = out[headers_end_index + 4 :] + + if http_status >= 400: + raise HttpError(url, return_code, http_status, body) + + return body + + +### Slack Classes + + +class SlackConfig: + def __init__(self): + self.slack_timeout = 30000 + + +class SlackToken(NamedTuple): + token: str + cookie: Union[str, None] = None + + +class SlackApi: + def __init__(self, token: SlackToken): + self.token = token + + def get_request_options(self): + cookies = f"d={self.token.cookie}" if self.token.cookie else "" + return { + "useragent": f"wee_slack {SCRIPT_VERSION}", + "httpheader": f"Authorization: Bearer {self.token.token}", + "cookie": cookies, + } + + async def fetch(self, method: str, params: Dict[str, Union[str, int]] = {}): + url = f"https://api.slack.com/api/{method}?{urlencode(params)}" + response = await http_request( + url, + self.get_request_options(), + config.slack_timeout, + ) + return json.loads(response) + + async def fetch_list( + self, + method: str, + list_key: str, + params: Dict[str, Union[str, int]] = {}, + pages: int = 1, # negative or 0 means all pages + ): + response = await self.fetch(method, params) + next_cursor = response.get("response_metadata", {}).get("next_cursor") + if pages != 1 and next_cursor and response["ok"]: + params["cursor"] = next_cursor + next_pages = await self.fetch_list(method, list_key, params, pages - 1) + response[list_key].extend(next_pages[list_key]) + return response + return response + + +class SlackTeam: + def __init__(self, token: SlackToken): + self.api = SlackApi(token) + + +class SlackChannelCommonNew: + def __init__(self, team: SlackTeam, slack_info: SlackConversation): + self.team = team + self.api = team.api + self.id = slack_info["id"] + # self.fetch_info() + + async def fetch_info(self): + response = await self.api.fetch("conversations.info", {"channel": self.id}) + print(len(response)) + + +class SlackChannelNew(SlackChannelCommonNew): + def __init__(self, team: SlackTeam, slack_info: SlackConversationNotIm): + super().__init__(team, slack_info) + self.name = slack_info["name"] + + +class SlackIm(SlackChannelCommonNew): + def __init__(self, team: SlackTeam, slack_info: SlackConversationIm): + super().__init__(team, slack_info) + self.user = slack_info["user"] + + +async def init(): + token = SlackToken( + weechat.config_get_plugin("api_token"), weechat.config_get_plugin("api_cookie") + ) + team = SlackTeam(token) + print(team) + + +if __name__ == "__main__": + if weechat.register( + SCRIPT_NAME, + SCRIPT_AUTHOR, + SCRIPT_VERSION, + SCRIPT_LICENSE, + SCRIPT_DESC, + "shutdown_cb", + "", + ): + config = SlackConfig() + create_task(init(), final=True) |