aboutsummaryrefslogblamecommitdiffstats
path: root/slack/slack_api.py
blob: 2f57e618c1985f5c71e297f74ab73aeb2da00f18 (plain) (tree)
1
2
3
4
5
6
7
8
9
10


                                  
                           
                                                                              

                                  
                                     

                                   
                                       
                             
                                           

                 
                                                                                     

                                                                           
                                                           
                                                                                       
                                                                                 
                                                                                       
                                                                                       
                                                            
                                                                   
                                                               
                                                                           
                                                                                   
                                                                                        
                                                                      
                                                                                   
                                                                         
                                                       

                                                          

                                                    
                                            


                                                                            
 
 
                     


                                                  
                                   


                                                                                           
                                                                           

         

                                   

                                   
                                                     
 
                                                                         
                                                                             








































                                                                                      
                                                             


                                                   

                                      
                    
                                                               


                                   
                          


                      
                            
                                    
      


                                                                            
                                                                              
                                                                                   
                                                          
                                                                                        


                                                           
 






                                                                   




                                                                     







                                                                                
                       
 

                                                                                 
                                                     



                                                                                       















                                                                             
 


                                                                         

                                        

                                       







                                                                             
                                                                   
                                     
                                                     



                                                                                    
 


                                          
                                    

                                        
                                                     
                                                                             
                                            




                                                                         



                                        
                                    

                                      
                          

                                                 


                                                                           
                       
                   
                  
         


                                                                         
 

                                                  
                                          



                                                                           
 
                                                                                 
                             
                                                      



                                                                            
 










                                                                           

                                                
                                        



                                                                          
 

                                                            
                                                    



                                                                           
 





                                                                         
 






                                                                    













                                                                         







                                                                          





















                                                                                     





                                            
                                









                                           
                                                 



                                                                          
 






























                                                                                      






















                                                                          
from __future__ import annotations

import json
from itertools import chain
from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Union
from urllib.parse import urlencode

from slack.error import SlackApiError
from slack.http import http_request
from slack.shared import shared
from slack.slack_message import SlackTs
from slack.task import gather
from slack.util import chunked, get_cookies

if TYPE_CHECKING:
    from slack_api.slack_bots_info import SlackBotInfoResponse, SlackBotsInfoResponse
    from slack_api.slack_client_counts import SlackClientCountsResponse
    from slack_api.slack_client_userboot import SlackClientUserbootResponse
    from slack_api.slack_common import SlackGenericResponse
    from slack_api.slack_conversations_history import SlackConversationsHistoryResponse
    from slack_api.slack_conversations_info import SlackConversationsInfoResponse
    from slack_api.slack_conversations_members import SlackConversationsMembersResponse
    from slack_api.slack_conversations_replies import SlackConversationsRepliesResponse
    from slack_api.slack_emoji import SlackEmojiListResponse
    from slack_api.slack_rtm_connect import SlackRtmConnectResponse
    from slack_api.slack_team_info import SlackTeamInfoResponse
    from slack_api.slack_usergroups_info import SlackUsergroupsInfoResponse
    from slack_api.slack_users_conversations import SlackUsersConversationsResponse
    from slack_api.slack_users_info import SlackUserInfoResponse, SlackUsersInfoResponse
    from slack_api.slack_users_prefs import SlackUsersPrefsGetResponse
    from slack_edgeapi.slack_usergroups_info import SlackEdgeUsergroupsInfoResponse
    from slack_edgeapi.slack_users_search import SlackUsersSearchResponse
    from typing_extensions import Literal, assert_never

    from slack.slack_conversation import SlackConversation
    from slack.slack_workspace import SlackWorkspace

Params = Mapping[str, Union[str, int, bool]]
EdgeParams = Mapping[
    str, Union[str, int, bool, Sequence[str], Sequence[int], Sequence[bool]]
]


class SlackApiCommon:
    def __init__(self, workspace: SlackWorkspace):
        self.workspace = workspace

    def _get_request_options(self):
        return {
            "useragent": f"wee_slack {shared.SCRIPT_VERSION}",
            "httpheader": f"Authorization: Bearer {self.workspace.config.api_token.value}",
            "cookie": get_cookies(self.workspace.config.api_cookies.value),
        }


class SlackEdgeApi(SlackApiCommon):
    @property
    def is_available(self) -> bool:
        return self.workspace.token_type == "session"

    async def _fetch_edgeapi(self, method: str, params: EdgeParams = {}):
        url = f"https://edgeapi.slack.com/cache/{self.workspace.id}/{method}"
        options = self._get_request_options()
        options["postfields"] = json.dumps(params)
        options["httpheader"] += "\nContent-Type: application/json"
        response = await http_request(
            url,
            options,
            self.workspace.config.network_timeout.value * 1000,
        )
        return json.loads(response)

    async def fetch_usergroups_info(self, usergroup_ids: Sequence[str]):
        method = "usergroups/info"
        params: EdgeParams = {"ids": usergroup_ids}
        response: SlackEdgeUsergroupsInfoResponse = await self._fetch_edgeapi(
            method, params
        )
        if response["ok"] is False:
            raise SlackApiError(self.workspace, method, response, params)
        return response

    async def fetch_users_search(self, query: str):
        method = "users/search"
        params: EdgeParams = {
            "include_profile_only_users": True,
            "query": query,
            "count": 25,
            "fuzz": 1,
            "uax29_tokenizer": False,
            "filter": "NOT deactivated",
        }
        response: SlackUsersSearchResponse = await self._fetch_edgeapi(method, params)
        if response["ok"] is False:
            raise SlackApiError(self.workspace, method, response, params)
        return response


class SlackApi(SlackApiCommon):
    def __init__(self, workspace: SlackWorkspace):
        super().__init__(workspace)
        self.edgeapi = SlackEdgeApi(workspace)

    async def _fetch(self, method: str, params: Params = {}):
        url = f"https://api.slack.com/api/{method}"
        options = self._get_request_options()
        options["postfields"] = urlencode(params)
        response = await http_request(
            url,
            options,
            self.workspace.config.network_timeout.value * 1000,
        )
        return json.loads(response)

    async def _fetch_list(
        self,
        method: str,
        list_key: str,
        params: Params = {},
        limit: Optional[int] = None,
    ):
        cur_limit = 1000 if limit is None or limit > 1000 else limit
        response = await self._fetch(method, {**params, "limit": cur_limit})
        remaining = limit - cur_limit if limit is not None else None
        next_cursor = response.get("response_metadata", {}).get("next_cursor")
        if (remaining is None or remaining > 0) and next_cursor and response["ok"]:
            new_params = {**params, "cursor": next_cursor}
            next_pages = await self._fetch_list(method, list_key, new_params, remaining)
            response[list_key].extend(next_pages[list_key])
            return response
        return response

    async def fetch_team_info(self):
        method = "team.info"
        response: SlackTeamInfoResponse = await self._fetch(method)
        if response["ok"] is False:
            raise SlackApiError(self.workspace, method, response)
        return response

    async def fetch_rtm_connect(self):
        method = "rtm.connect"
        response: SlackRtmConnectResponse = await self._fetch(method)
        if response["ok"] is False:
            raise SlackApiError(self.workspace, method, response)
        return response

    async def fetch_users_get_prefs(self, prefs: Optional[str] = None):
        method = "users.prefs.get"
        params: Params = {"prefs": prefs} if prefs else {}
        response: SlackUsersPrefsGetResponse = await self._fetch(method, params)
        if response["ok"] is False:
            raise SlackApiError(self.workspace, method, response)
        return response

    async def fetch_conversations_history(self, conversation: SlackConversation):
        method = "conversations.history"
        params: Params = {"channel": conversation.id}
        response: SlackConversationsHistoryResponse = await self._fetch(method, params)
        if response["ok"] is False:
            raise SlackApiError(self.workspace, method, response, params)
        return response

    async def fetch_conversations_history_after(
        self, conversation: SlackConversation, after: SlackTs
    ):
        method = "conversations.history"
        params: Params = {
            "channel": conversation.id,
            "oldest": after,
            "inclusive": False,
        }
        response: SlackConversationsHistoryResponse = await self._fetch_list(
            method, "messages", params
        )
        if response["ok"] is False:
            raise SlackApiError(self.workspace, method, response, params)
        return response

    async def fetch_conversations_replies(
        self, conversation: SlackConversation, parent_message_ts: SlackTs
    ):
        method = "conversations.replies"
        params: Params = {
            "channel": conversation.id,
            "ts": parent_message_ts,
        }
        response: SlackConversationsRepliesResponse = await self._fetch_list(
            method, "messages", params
        )
        if response["ok"] is False:
            raise SlackApiError(self.workspace, method, response, params)
        return response

    async def fetch_conversations_info(self, conversation_id: str):
        method = "conversations.info"
        params: Params = {"channel": conversation_id}
        response: SlackConversationsInfoResponse = await self._fetch(method, params)
        if response["ok"] is False:
            raise SlackApiError(self.workspace, method, response, params)
        return response

    async def fetch_conversations_members(
        self,
        conversation: SlackConversation,
        limit: Optional[int] = None,
    ):
        method = "conversations.members"
        params: Params = {"channel": conversation.id}
        response: SlackConversationsMembersResponse = await self._fetch_list(
            method, "members", params, limit
        )
        if response["ok"] is False:
            raise SlackApiError(self.workspace, method, response, params)
        return response

    async def fetch_users_conversations(
        self,
        types: str,
        exclude_archived: bool = True,
        limit: Optional[int] = None,
    ):
        method = "users.conversations"
        params: Params = {
            "types": types,
            "exclude_archived": exclude_archived,
        }
        response: SlackUsersConversationsResponse = await self._fetch_list(
            method,
            "channels",
            params,
            limit,
        )
        if response["ok"] is False:
            raise SlackApiError(self.workspace, method, response, params)
        return response

    async def fetch_user_info(self, user_id: str):
        method = "users.info"
        params: Params = {"user": user_id}
        response: SlackUserInfoResponse = await self._fetch(method, params)
        if response["ok"] is False:
            raise SlackApiError(self.workspace, method, response, params)
        return response

    async def _fetch_users_info_without_splitting(self, user_ids: Iterable[str]):
        method = "users.info"
        params: Params = {"users": ",".join(user_ids)}
        response: SlackUsersInfoResponse = await self._fetch(method, params)
        if response["ok"] is False:
            raise SlackApiError(self.workspace, method, response, params)
        return response

    async def fetch_users_info(self, user_ids: Iterable[str]):
        responses = await gather(
            *(
                self._fetch_users_info_without_splitting(user_ids_batch)
                for user_ids_batch in chunked(user_ids, 1000)
            )
        )
        users = list(chain(*(response["users"] for response in responses)))
        response: SlackUsersInfoResponse = {"ok": True, "users": users}
        return response

    async def fetch_bot_info(self, bot_id: str):
        method = "bots.info"
        params: Params = {"bot": bot_id}
        response: SlackBotInfoResponse = await self._fetch(method, params)
        if response["ok"] is False:
            raise SlackApiError(self.workspace, method, response, params)
        return response

    async def fetch_bots_info(self, bot_ids: Iterable[str]):
        method = "bots.info"
        params: Params = {"bots": ",".join(bot_ids)}
        response: SlackBotsInfoResponse = await self._fetch(method, params)
        if response["ok"] is False:
            raise SlackApiError(self.workspace, method, response, params)
        return response

    async def fetch_usergroups_list(self):
        method = "usergroups.list"
        response: SlackUsergroupsInfoResponse = await self._fetch(method)
        if response["ok"] is False:
            raise SlackApiError(self.workspace, method, response)
        return response

    async def fetch_emoji_list(self):
        method = "emoji.list"
        response: SlackEmojiListResponse = await self._fetch(method)
        if response["ok"] is False:
            raise SlackApiError(self.workspace, method, response)
        return response

    async def fetch_client_userboot(self):
        method = "client.userBoot"
        response: SlackClientUserbootResponse = await self._fetch(method)
        if response["ok"] is False:
            raise SlackApiError(self.workspace, method, response)
        return response

    async def fetch_client_counts(self):
        method = "client.counts"
        response: SlackClientCountsResponse = await self._fetch(method)
        if response["ok"] is False:
            raise SlackApiError(self.workspace, method, response)
        return response

    async def conversations_close(self, conversation: SlackConversation):
        method = "conversations.close"
        params: Params = {"channel": conversation.id}
        response: SlackGenericResponse = await self._fetch(method, params)
        if response["ok"] is False:
            raise SlackApiError(self.workspace, method, response, params)
        return response

    async def conversations_mark(self, conversation: SlackConversation, ts: SlackTs):
        method = "conversations.mark"
        params: Params = {"channel": conversation.id, "ts": ts}
        response: SlackGenericResponse = await self._fetch(method, params)
        if response["ok"] is False:
            raise SlackApiError(self.workspace, method, response, params)
        return response

    async def subscriptions_thread_mark(
        self, conversation: SlackConversation, thread_ts: SlackTs, ts: SlackTs
    ):
        method = "subscriptions.thread.mark"
        params: Params = {
            "channel": conversation.id,
            "thread_ts": thread_ts,
            "ts": ts,
            "read": 1,
        }
        response: SlackGenericResponse = await self._fetch(method, params)
        if response["ok"] is False:
            raise SlackApiError(self.workspace, method, response, params)
        return response

    async def chat_post_message(
        self,
        conversation: SlackConversation,
        text: str,
        thread_ts: Optional[SlackTs] = None,
        broadcast: bool = False,
    ):
        method = "chat.postMessage"
        params: Params = {
            "channel": conversation.id,
            "text": text,
            "as_user": True,
            "link_names": True,
        }
        if thread_ts is not None:
            params["thread_ts"] = thread_ts
            params["reply_broadcast"] = broadcast
        response: SlackGenericResponse = await self._fetch(method, params)
        if response["ok"] is False:
            raise SlackApiError(self.workspace, method, response, params)
        return response

    async def chat_update_message(
        self,
        conversation: SlackConversation,
        ts: SlackTs,
        text: str,
    ):
        method = "chat.update"
        params: Params = {
            "channel": conversation.id,
            "ts": ts,
            "text": text,
            "as_user": True,
            "link_names": True,
        }
        response: SlackGenericResponse = await self._fetch(method, params)
        if response["ok"] is False:
            raise SlackApiError(self.workspace, method, response, params)
        return response

    async def chat_delete_message(self, conversation: SlackConversation, ts: SlackTs):
        method = "chat.delete"
        params: Params = {
            "channel": conversation.id,
            "ts": ts,
            "as_user": True,
        }
        response: SlackGenericResponse = await self._fetch(method, params)
        if response["ok"] is False:
            raise SlackApiError(self.workspace, method, response, params)
        return response

    async def reactions_change(
        self,
        conversation: SlackConversation,
        ts: SlackTs,
        name: str,
        change_type: Literal["+", "-"],
    ):
        method = (
            "reactions.add"
            if change_type == "+"
            else "reactions.remove"
            if change_type == "-"
            else assert_never(change_type)
        )
        params: Params = {
            "channel": conversation.id,
            "timestamp": ts,
            "name": name,
        }
        response: SlackGenericResponse = await self._fetch(method, params)
        if response["ok"] is False:
            raise SlackApiError(self.workspace, method, response, params)
        return response