from __future__ import annotations import json from typing import TYPE_CHECKING, Iterable, Mapping, Sequence, Union from urllib.parse import urlencode from slack.error import SlackApiError from slack.http import http_request from slack.shared import shared if TYPE_CHECKING: from slack_api.slack_bots_info import SlackBotInfoResponse, SlackBotsInfoResponse from slack_api.slack_conversations_history import SlackConversationsHistoryResponse from slack_api.slack_conversations_info import SlackConversationsInfoResponse from slack_api.slack_rtm_connect import SlackRtmConnectResponse from slack_api.slack_usergroups_info import SlackUsergroupsInfoResponse from slack_api.slack_users_conversations import SlackUsersConversationsResponse from slack_api.slack_users_info import SlackUserInfoResponse, SlackUsersInfoResponse from slack_edgeapi.slack_usergroups_info import SlackEdgeUsergroupsInfoResponse from slack_edgeapi.slack_users_search import SlackUsersSearchResponse from slack.slack_conversation import SlackConversation from slack.slack_workspace import SlackWorkspace Params = Mapping[str, Union[str, int, bool]] EdgeParams = Mapping[ str, Union[str, int, bool, Sequence[str], Sequence[int], Sequence[bool]] ] class SlackApi: def __init__(self, workspace: SlackWorkspace): self.workspace = workspace def _get_request_options(self): return { "useragent": f"wee_slack {shared.SCRIPT_VERSION}", "httpheader": f"Authorization: Bearer {self.workspace.config.api_token.value}", "cookie": self.workspace.config.api_cookies.value, # TODO: url_encode_if_not_encoded } async def _fetch(self, method: str, params: Params = {}): url = f"https://api.slack.com/api/{method}" options = self._get_request_options() options["postfields"] = urlencode(params) response = await http_request( url, options, self.workspace.config.network_timeout.value * 1000, ) return json.loads(response) async def _fetch_list( self, method: str, list_key: str, params: Params = {}, pages: int = -1, # negative or 0 means all pages ): response = await self._fetch(method, params) next_cursor = response.get("response_metadata", {}).get("next_cursor") if pages != 1 and next_cursor and response["ok"]: new_params = {**params, "cursor": next_cursor} next_pages = await self._fetch_list(method, list_key, new_params, pages - 1) response[list_key].extend(next_pages[list_key]) return response return response async def _fetch_edgeapi(self, method: str, params: EdgeParams = {}): enterprise_id_part = ( f"{self.workspace.enterprise_id}/" if self.workspace.enterprise_id else "" ) url = f"https://edgeapi.slack.com/cache/{enterprise_id_part}{self.workspace.id}/{method}" options = self._get_request_options() options["postfields"] = json.dumps(params) options["httpheader"] += "\nContent-Type: application/json" response = await http_request( url, options, self.workspace.config.network_timeout.value * 1000, ) return json.loads(response) async def fetch_rtm_connect(self): method = "rtm.connect" response: SlackRtmConnectResponse = await self._fetch(method) if response["ok"] is False: raise SlackApiError(self.workspace, method, response) return response async def fetch_conversations_history(self, conversation: SlackConversation): method = "conversations.history" params = {"channel": conversation.id} response: SlackConversationsHistoryResponse = await self._fetch(method, params) if response["ok"] is False: raise SlackApiError(self.workspace, method, response, params) return response async def fetch_conversations_info(self, conversation: SlackConversation): method = "conversations.info" params = {"channel": conversation.id} response: SlackConversationsInfoResponse = await self._fetch(method, params) if response["ok"] is False: raise SlackApiError(self.workspace, method, response, params) return response async def fetch_users_conversations( self, types: str, exclude_archived: bool = True, limit: int = 1000, pages: int = -1, ): method = "users.conversations" params = { "types": types, "exclude_archived": exclude_archived, "limit": limit, } response: SlackUsersConversationsResponse = await self._fetch_list( method, "channels", params, pages, ) if response["ok"] is False: raise SlackApiError(self.workspace, method, response, params) return response async def fetch_user_info(self, user_id: str): method = "users.info" params = {"user": user_id} response: SlackUserInfoResponse = await self._fetch(method, params) if response["ok"] is False: raise SlackApiError(self.workspace, method, response, params) return response async def fetch_users_info(self, user_ids: Iterable[str]): method = "users.info" params = {"users": ",".join(user_ids)} response: SlackUsersInfoResponse = await self._fetch(method, params) if response["ok"] is False: raise SlackApiError(self.workspace, method, response, params) return response async def fetch_bot_info(self, bot_id: str): method = "bots.info" params = {"bot": bot_id} response: SlackBotInfoResponse = await self._fetch(method, params) if response["ok"] is False: raise SlackApiError(self.workspace, method, response, params) return response async def fetch_bots_info(self, bot_ids: Iterable[str]): method = "bots.info" params = {"bots": ",".join(bot_ids)} response: SlackBotsInfoResponse = await self._fetch(method, params) if response["ok"] is False: raise SlackApiError(self.workspace, method, response, params) return response async def fetch_usergroups_list(self): method = "usergroups.list" response: SlackUsergroupsInfoResponse = await self._fetch(method) if response["ok"] is False: raise SlackApiError(self.workspace, method, response) return response async def fetch_usergroups_info(self, usergroup_ids: Sequence[str]): method = "usergroups/info" params = {"ids": usergroup_ids} response: SlackEdgeUsergroupsInfoResponse = await self._fetch_edgeapi( method, params ) if response["ok"] is False: raise SlackApiError(self.workspace, method, response, params) return response async def fetch_users_search(self, query: str): method = "users/search" params = { "include_profile_only_users": True, "query": query, "count": 25, "fuzz": 1, "uax29_tokenizer": False, "filter": "NOT deactivated", } response: SlackUsersSearchResponse = await self._fetch_edgeapi(method, params) if response["ok"] is False: raise SlackApiError(self.workspace, method, response, params) return response