from unittest.mock import MagicMock, patch
import pytest
import weechat
from slack.http import HttpError, http_request
from slack.task import FutureProcess, FutureTimer, weechat_task_cb
@patch.object(weechat, "hook_process_hashtable")
def test_http_request_success(mock_method: MagicMock):
url = "http://example.com"
options = {"option": "1"}
timeout = 123
coroutine = http_request(url, options, timeout)
future = coroutine.send(None)
assert isinstance(future, FutureProcess)
mock_method.assert_called_once_with(
f"url:{url}",
{**options, "header": "1"},
timeout,
weechat_task_cb.__name__,
future.id,
)
response = "response"
body = f"HTTP/2 200\r\n\r\n{response}"
with pytest.raises(StopIteration) as excinfo:
coroutine.send(("", 0, body, ""))
assert excinfo.value.value == response
def test_http_request_error_process_return_code():
url = "http://example.com"
coroutine = http_request(url, {}, 0, max_retries=0)
assert isinstance(coroutine.send(None), FutureProcess)
with pytest.raises(HttpError) as excinfo:
coroutine.send(("", -2, "", ""))
assert excinfo.value.url == url
assert excinfo.value.return_code == -2
assert excinfo.value.http_status == 0
assert excinfo.value.error == ""
def test_http_request_error_process_stderr():
url = "http://example.com"
coroutine = http_request(url, {}, 0, max_retries=0)
assert isinstance(coroutine.send(None), FutureProcess)
with pytest.raises(HttpError) as excinfo:
coroutine.send(("", 0, "", "err"))
assert excinfo.value.url == url
assert excinfo.value.return_code == 0
assert excinfo.value.http_status == 0
assert excinfo.value.error == "err"
def test_http_request_error_process_http():
url = "http://example.com"
coroutine = http_request(url, {}, 0, max_retries=0)
assert isinstance(coroutine.send(None), FutureProcess)
response = "response"
body = f"HTTP/2 400\r\n\r\n{response}"
with pytest.raises(HttpError) as excinfo:
coroutine.send(("", 0, body, ""))
assert excinfo.value.url == url
assert excinfo.value.return_code == 0
assert excinfo.value.http_status == 400
assert excinfo.value.error == response
def test_http_request_error_retry_success():
url = "http://example.com"
coroutine = http_request(url, {}, 0, max_retries=2)
assert isinstance(coroutine.send(None), FutureProcess)
assert isinstance(coroutine.send(("", -2, "", "")), FutureTimer)
assert isinstance(coroutine.send((0,)), FutureProcess)
response = "response"
body = f"HTTP/2 200\r\n\r\n{response}"
with pytest.raises(StopIteration) as excinfo:
coroutine.send(("", 0, body, ""))
assert excinfo.value.value == response
def test_http_request_error_retry_error():
url = "http://example.com"
coroutine = http_request(url, {}, 0, max_retries=2)
assert isinstance(coroutine.send(None), FutureProcess)
assert isinstance(coroutine.send(("", -2, "", "")), FutureTimer)
assert isinstance(coroutine.send((0,)), FutureProcess)
assert isinstance(coroutine.send(("", -2, "", "")), FutureTimer)
assert isinstance(coroutine.send((0,)), FutureProcess)
with pytest.raises(HttpError) as excinfo:
coroutine.send(("", -2, "", ""))
assert excinfo.value.url == url
assert excinfo.value.return_code == -2
assert excinfo.value.http_status == 0
assert excinfo.value.error == ""
@patch.object(weechat, "hook_timer")
def test_http_request_ratelimit(mock_method: MagicMock):
url = "http://example.com"
coroutine = http_request(url, {}, 0)
assert isinstance(coroutine.send(None), FutureProcess)
body = "HTTP/2 429\r\nRetry-After: 12\r\n\r\n"
timer = coroutine.send(("", 0, body, ""))
assert isinstance(timer, FutureTimer)
mock_method.assert_called_once_with(12000, 0, 1, weechat_task_cb.__name__, timer.id)
assert isinstance(coroutine.send((0,)), FutureProcess)
with pytest.raises(StopIteration):
coroutine.send(("", 0, "HTTP/2 200\r\n\r\n", ""))