1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
|
from __future__ import annotations
import os
import resource
from io import StringIO
from typing import Dict, Tuple
import weechat
from slack.error import HttpError
from slack.log import DebugMessageType, LogLevel, log
from slack.task import FutureProcess, FutureUrl, sleep, weechat_task_cb
from slack.util import get_callback_name
def available_file_descriptors():
num_current_file_descriptors = len(os.listdir("/proc/self/fd/"))
max_file_descriptors = min(resource.getrlimit(resource.RLIMIT_NOFILE))
return max_file_descriptors - num_current_file_descriptors
async def hook_process_hashtable(
command: str, options: Dict[str, str], timeout: int
) -> Tuple[str, int, str, str]:
future = FutureProcess()
log(
LogLevel.DEBUG,
DebugMessageType.LOG,
f"hook_process_hashtable calling ({future.id}): command: {command}",
)
while available_file_descriptors() < 10:
await sleep(100)
weechat.hook_process_hashtable(
command, options, timeout, get_callback_name(weechat_task_cb), future.id
)
stdout = StringIO()
stderr = StringIO()
return_code = -1
while return_code == -1:
next_future = FutureProcess(future.id)
_, return_code, out, err = await next_future
log(
LogLevel.TRACE,
DebugMessageType.LOG,
f"hook_process_hashtable intermediary response ({next_future.id}): command: {command}",
)
stdout.write(out)
stderr.write(err)
out = stdout.getvalue()
err = stderr.getvalue().strip()
log(
LogLevel.DEBUG,
DebugMessageType.LOG,
f"hook_process_hashtable response ({future.id}): command: {command}, "
f"return_code: {return_code}, response length: {len(out)}"
+ (f", error: {err}" if err else ""),
)
return command, return_code, out, err
async def hook_url(
url: str, options: Dict[str, str], timeout: int
) -> Tuple[str, Dict[str, str], Dict[str, str]]:
future = FutureUrl()
weechat.hook_url(
url, options, timeout, get_callback_name(weechat_task_cb), future.id
)
return await future
async def http_request_process(
url: str, options: Dict[str, str], timeout: int
) -> Tuple[int, str, str]:
options["header"] = "1"
_, return_code, out, err = await hook_process_hashtable(
f"url:{url}", options, timeout
)
if return_code != 0 or err:
raise HttpError(url, options, return_code, None, err)
parts = out.split("\r\n\r\nHTTP/")
headers, body = parts[-1].split("\r\n\r\n", 1)
http_status = int(headers.split(None, 2)[1])
return http_status, headers, body
async def http_request_url(
url: str, options: Dict[str, str], timeout: int
) -> Tuple[int, str, str]:
_, _, output = await hook_url(url, options, timeout)
if "error" in output:
raise HttpError(url, options, None, None, output["error"])
if "response_code" not in output:
raise HttpError(
url,
options,
None,
None,
f"Unexpectedly missing response_code, output: {output}",
)
http_status = int(output["response_code"])
header_parts = output["headers"].split("\r\n\r\nHTTP/")
return http_status, header_parts[-1], output["output"]
async def http_request(
url: str, options: Dict[str, str], timeout: int, max_retries: int = 5
) -> str:
log(
LogLevel.DEBUG,
DebugMessageType.HTTP_REQUEST,
f"requesting: {url}, {options.get('postfields')}",
)
try:
if hasattr(weechat, "hook_url"):
http_status, headers, body = await http_request_url(url, options, timeout)
else:
http_status, headers, body = await http_request_process(
url, options, timeout
)
except HttpError as e:
if max_retries > 0:
log(
LogLevel.INFO,
DebugMessageType.LOG,
f"HTTP error, retrying (max {max_retries} times): "
f"return_code: {e.return_code}, error: {e.error}, url: {url}",
)
await sleep(1000)
return await http_request(url, options, timeout, max_retries - 1)
raise
if http_status == 429:
header_lines = headers.split("\r\n")
for header in header_lines[1:]:
name, value = header.split(":", 1)
if name.lower() == "retry-after":
retry_after = int(value.strip())
log(
LogLevel.INFO,
DebugMessageType.LOG,
f"HTTP ratelimit, retrying in {retry_after} seconds, url: {url}",
)
await sleep(retry_after * 1000)
return await http_request(url, options, timeout)
if http_status >= 400:
raise HttpError(url, options, None, http_status, body)
return body
|