diff options
Diffstat (limited to 'slack/slack_api.py')
-rw-r--r-- | slack/slack_api.py | 100 |
1 files changed, 69 insertions, 31 deletions
diff --git a/slack/slack_api.py b/slack/slack_api.py index 492aae4..f68abd1 100644 --- a/slack/slack_api.py +++ b/slack/slack_api.py @@ -1,9 +1,10 @@ from __future__ import annotations import json -from typing import TYPE_CHECKING, Dict, Iterable, Union +from typing import TYPE_CHECKING, Iterable, Mapping, Union from urllib.parse import urlencode +from slack.error import SlackApiError from slack.http import http_request from slack.shared import shared @@ -18,6 +19,8 @@ if TYPE_CHECKING: from slack.slack_conversation import SlackConversation from slack.slack_workspace import SlackWorkspace +Params = Mapping[str, Union[str, int, bool]] + class SlackApi: def __init__(self, workspace: SlackWorkspace): @@ -30,7 +33,7 @@ class SlackApi: "cookie": self.workspace.config.api_cookies.value, # TODO: url_encode_if_not_encoded } - async def _fetch(self, method: str, params: Dict[str, Union[str, int]] = {}): + async def _fetch(self, method: str, params: Params = {}): url = f"https://api.slack.com/api/{method}" options = self._get_request_options() options["postfields"] = urlencode(params) @@ -45,30 +48,40 @@ class SlackApi: self, method: str, list_key: str, - params: Dict[str, Union[str, int]] = {}, + params: Params = {}, pages: int = -1, # negative or 0 means all pages ): response = await self._fetch(method, params) next_cursor = response.get("response_metadata", {}).get("next_cursor") if pages != 1 and next_cursor and response["ok"]: - params["cursor"] = next_cursor - next_pages = await self._fetch_list(method, list_key, params, pages - 1) + new_params = {**params, "cursor": next_cursor} + next_pages = await self._fetch_list(method, list_key, new_params, pages - 1) response[list_key].extend(next_pages[list_key]) return response return response - async def fetch_rtm_connect(self) -> SlackRtmConnectResponse: - return await self._fetch("rtm.connect") + async def fetch_rtm_connect(self): + method = "rtm.connect" + response: SlackRtmConnectResponse = await self._fetch(method) + if response["ok"] is False: + raise SlackApiError(self.workspace, method, response) + return response - async def fetch_conversations_history( - self, conversation: SlackConversation - ) -> SlackConversationsHistoryResponse: - return await self._fetch("conversations.history", {"channel": conversation.id}) + async def fetch_conversations_history(self, conversation: SlackConversation): + method = "conversations.history" + params = {"channel": conversation.id} + response: SlackConversationsHistoryResponse = await self._fetch(method, params) + if response["ok"] is False: + raise SlackApiError(self.workspace, method, response, params) + return response - async def fetch_conversations_info( - self, conversation: SlackConversation - ) -> SlackConversationsInfoResponse: - return await self._fetch("conversations.info", {"channel": conversation.id}) + async def fetch_conversations_info(self, conversation: SlackConversation): + method = "conversations.info" + params = {"channel": conversation.id} + response: SlackConversationsInfoResponse = await self._fetch(method, params) + if response["ok"] is False: + raise SlackApiError(self.workspace, method, response, params) + return response async def fetch_users_conversations( self, @@ -76,26 +89,51 @@ class SlackApi: exclude_archived: bool = True, limit: int = 1000, pages: int = -1, - ) -> SlackUsersConversationsResponse: - return await self._fetch_list( - "users.conversations", + ): + method = "users.conversations" + params = { + "types": types, + "exclude_archived": exclude_archived, + "limit": limit, + } + response: SlackUsersConversationsResponse = await self._fetch_list( + method, "channels", - { - "types": types, - "exclude_archived": exclude_archived, - "limit": limit, - }, + params, pages, ) + if response["ok"] is False: + raise SlackApiError(self.workspace, method, response, params) + return response - async def fetch_user_info(self, user_id: str) -> SlackUserInfoResponse: - return await self._fetch("users.info", {"user": user_id}) + async def fetch_user_info(self, user_id: str): + method = "users.info" + params = {"user": user_id} + response: SlackUserInfoResponse = await self._fetch(method, params) + if response["ok"] is False: + raise SlackApiError(self.workspace, method, response, params) + return response - async def fetch_users_info(self, user_ids: Iterable[str]) -> SlackUsersInfoResponse: - return await self._fetch("users.info", {"users": ",".join(user_ids)}) + async def fetch_users_info(self, user_ids: Iterable[str]): + method = "users.info" + params = {"users": ",".join(user_ids)} + response: SlackUsersInfoResponse = await self._fetch(method, params) + if response["ok"] is False: + raise SlackApiError(self.workspace, method, response, params) + return response - async def fetch_bot_info(self, bot_id: str) -> SlackBotInfoResponse: - return await self._fetch("bots.info", {"bot": bot_id}) + async def fetch_bot_info(self, bot_id: str): + method = "bots.info" + params = {"bot": bot_id} + response: SlackBotInfoResponse = await self._fetch(method, params) + if response["ok"] is False: + raise SlackApiError(self.workspace, method, response, params) + return response - async def fetch_bots_info(self, bot_ids: Iterable[str]) -> SlackBotsInfoResponse: - return await self._fetch("bots.info", {"bots": ",".join(bot_ids)}) + async def fetch_bots_info(self, bot_ids: Iterable[str]): + method = "bots.info" + params = {"bots": ",".join(bot_ids)} + response: SlackBotsInfoResponse = await self._fetch(method, params) + if response["ok"] is False: + raise SlackApiError(self.workspace, method, response, params) + return response |