aboutsummaryrefslogtreecommitdiffstats
path: root/slack/slack_api.py
blob: ef522d0b24bfa53b2bac9fa1e2d86951a9237cf6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from __future__ import annotations

import json
from typing import TYPE_CHECKING, Any, Dict, Union
from urllib.parse import urlencode

from slack.http import http_request
from slack.shared import shared

if TYPE_CHECKING:
    from slack_api.slack_conversations_info import SlackConversationsInfoResponse
    from slack_api.slack_users_conversations import SlackUsersConversationsResponse
    from slack_api.slack_users_info import SlackUsersInfoResponse

    from slack.slack_conversation import SlackConversation
    from slack.slack_user import SlackUser
    from slack.slack_workspace import SlackWorkspace


class SlackApi:
    def __init__(self, workspace: SlackWorkspace):
        self.workspace = workspace

    def _get_request_options(self):
        return {
            "useragent": f"wee_slack {shared.SCRIPT_VERSION}",
            "httpheader": f"Authorization: Bearer {self.workspace.config.api_token.value}",
            "cookie": self.workspace.config.api_cookies.value,
        }

    async def _fetch(self, method: str, params: Dict[str, Union[str, int]] = {}):
        url = f"https://api.slack.com/api/{method}?{urlencode(params)}"
        response = await http_request(
            url,
            self._get_request_options(),
            self.workspace.config.slack_timeout.value * 1000,
        )
        return json.loads(response)

    async def _fetch_list(
        self,
        method: str,
        list_key: str,
        params: Dict[str, Union[str, int]] = {},
        pages: int = -1,  # negative or 0 means all pages
    ):
        response = await self._fetch(method, params)
        next_cursor = response.get("response_metadata", {}).get("next_cursor")
        if pages != 1 and next_cursor and response["ok"]:
            params["cursor"] = next_cursor
            next_pages = await self._fetch_list(method, list_key, params, pages - 1)
            response[list_key].extend(next_pages[list_key])
            return response
        return response

    async def fetch_conversations_history(self, conversation: SlackConversation) -> Any:
        return await self._fetch("conversations.history", {"channel": conversation.id})

    async def fetch_conversations_info(
        self, conversation: SlackConversation
    ) -> SlackConversationsInfoResponse:
        return await self._fetch("conversations.info", {"channel": conversation.id})

    async def fetch_users_conversations(
        self,
        types: str,
        exclude_archived: bool = True,
        limit: int = 1000,
        pages: int = -1,
    ) -> SlackUsersConversationsResponse:
        return await self._fetch_list(
            "users.conversations",
            "channels",
            {
                "types": types,
                "exclude_archived": exclude_archived,
                "limit": limit,
            },
            pages,
        )

    async def fetch_users_info(self, user: SlackUser) -> SlackUsersInfoResponse:
        return await self._fetch("users.info", {"user": user.id})