aboutsummaryrefslogtreecommitdiffstats
path: root/slack
diff options
context:
space:
mode:
Diffstat (limited to 'slack')
-rw-r--r--slack/commands.py2
-rw-r--r--slack/slack_api.py90
-rw-r--r--slack/slack_user.py2
-rw-r--r--slack/slack_workspace.py4
4 files changed, 54 insertions, 44 deletions
diff --git a/slack/commands.py b/slack/commands.py
index 69183a2..57a257f 100644
--- a/slack/commands.py
+++ b/slack/commands.py
@@ -456,7 +456,7 @@ async def complete_user_next(
):
if conversation.completion_context == "NO_COMPLETION":
conversation.completion_context = "PENDING_COMPLETION"
- search = await conversation.workspace.api.fetch_users_search(query)
+ search = await conversation.workspace.api.edgeapi.fetch_users_search(query)
if conversation.completion_context != "PENDING_COMPLETION":
return
conversation.completion_context = "ACTIVE_COMPLETION"
diff --git a/slack/slack_api.py b/slack/slack_api.py
index 1404e81..890d4d9 100644
--- a/slack/slack_api.py
+++ b/slack/slack_api.py
@@ -29,7 +29,7 @@ EdgeParams = Mapping[
]
-class SlackApi:
+class SlackApiCommon:
def __init__(self, workspace: SlackWorkspace):
self.workspace = workspace
@@ -40,6 +40,54 @@ class SlackApi:
"cookie": self.workspace.config.api_cookies.value, # TODO: url_encode_if_not_encoded
}
+
+class SlackEdgeApi(SlackApiCommon):
+ async def _fetch_edgeapi(self, method: str, params: EdgeParams = {}):
+ enterprise_id_part = (
+ f"{self.workspace.enterprise_id}/" if self.workspace.enterprise_id else ""
+ )
+ url = f"https://edgeapi.slack.com/cache/{enterprise_id_part}{self.workspace.id}/{method}"
+ options = self._get_request_options()
+ options["postfields"] = json.dumps(params)
+ options["httpheader"] += "\nContent-Type: application/json"
+ response = await http_request(
+ url,
+ options,
+ self.workspace.config.network_timeout.value * 1000,
+ )
+ return json.loads(response)
+
+ async def fetch_usergroups_info(self, usergroup_ids: Sequence[str]):
+ method = "usergroups/info"
+ params: EdgeParams = {"ids": usergroup_ids}
+ response: SlackEdgeUsergroupsInfoResponse = await self._fetch_edgeapi(
+ method, params
+ )
+ if response["ok"] is False:
+ raise SlackApiError(self.workspace, method, response, params)
+ return response
+
+ async def fetch_users_search(self, query: str):
+ method = "users/search"
+ params: EdgeParams = {
+ "include_profile_only_users": True,
+ "query": query,
+ "count": 25,
+ "fuzz": 1,
+ "uax29_tokenizer": False,
+ "filter": "NOT deactivated",
+ }
+ response: SlackUsersSearchResponse = await self._fetch_edgeapi(method, params)
+ if response["ok"] is False:
+ raise SlackApiError(self.workspace, method, response, params)
+ return response
+
+
+class SlackApi(SlackApiCommon):
+ def __init__(self, workspace: SlackWorkspace):
+ super().__init__(workspace)
+ self.edgeapi = SlackEdgeApi(workspace)
+
async def _fetch(self, method: str, params: Params = {}):
url = f"https://api.slack.com/api/{method}"
options = self._get_request_options()
@@ -67,21 +115,6 @@ class SlackApi:
return response
return response
- async def _fetch_edgeapi(self, method: str, params: EdgeParams = {}):
- enterprise_id_part = (
- f"{self.workspace.enterprise_id}/" if self.workspace.enterprise_id else ""
- )
- url = f"https://edgeapi.slack.com/cache/{enterprise_id_part}{self.workspace.id}/{method}"
- options = self._get_request_options()
- options["postfields"] = json.dumps(params)
- options["httpheader"] += "\nContent-Type: application/json"
- response = await http_request(
- url,
- options,
- self.workspace.config.network_timeout.value * 1000,
- )
- return json.loads(response)
-
async def fetch_rtm_connect(self):
method = "rtm.connect"
response: SlackRtmConnectResponse = await self._fetch(method)
@@ -181,28 +214,3 @@ class SlackApi:
if response["ok"] is False:
raise SlackApiError(self.workspace, method, response)
return response
-
- async def fetch_usergroups_info(self, usergroup_ids: Sequence[str]):
- method = "usergroups/info"
- params: EdgeParams = {"ids": usergroup_ids}
- response: SlackEdgeUsergroupsInfoResponse = await self._fetch_edgeapi(
- method, params
- )
- if response["ok"] is False:
- raise SlackApiError(self.workspace, method, response, params)
- return response
-
- async def fetch_users_search(self, query: str):
- method = "users/search"
- params: EdgeParams = {
- "include_profile_only_users": True,
- "query": query,
- "count": 25,
- "fuzz": 1,
- "uax29_tokenizer": False,
- "filter": "NOT deactivated",
- }
- response: SlackUsersSearchResponse = await self._fetch_edgeapi(method, params)
- if response["ok"] is False:
- raise SlackApiError(self.workspace, method, response, params)
- return response
diff --git a/slack/slack_user.py b/slack/slack_user.py
index c7ad0ae..c41f3aa 100644
--- a/slack/slack_user.py
+++ b/slack/slack_user.py
@@ -114,7 +114,7 @@ class SlackUsergroup:
@classmethod
async def create(cls, workspace: SlackWorkspace, id: str):
- info_response = await workspace.api.fetch_usergroups_info([id])
+ info_response = await workspace.api.edgeapi.fetch_usergroups_info([id])
if not info_response["results"] or info_response["results"][0]["id"] != id:
raise SlackError(workspace, f"Couldn't find user group {id}")
return cls(workspace, info_response["results"][0])
diff --git a/slack/slack_workspace.py b/slack/slack_workspace.py
index 675809d..d659317 100644
--- a/slack/slack_workspace.py
+++ b/slack/slack_workspace.py
@@ -154,7 +154,9 @@ class SlackUsergroups(SlackItem[SlackUsergroup, SlackUsergroupInfo]):
async def _fetch_items_info(
self, item_ids: Iterable[str]
) -> Dict[str, SlackUsergroupInfo]:
- response = await self.workspace.api.fetch_usergroups_info(list(item_ids))
+ response = await self.workspace.api.edgeapi.fetch_usergroups_info(
+ list(item_ids)
+ )
return {info["id"]: info for info in response["results"]}
def _create_item_from_info(self, item_info: SlackUsergroupInfo) -> SlackUsergroup: