from unittest.mock import MagicMock, patch import pytest import weechat import slack.http from slack.http import hook_process_hashtable from slack.task import FutureProcess, FutureTimer, weechat_task_cb from slack.util import get_callback_name @patch.object(weechat, "hook_process_hashtable") def test_hook_process_hashtable(mock_method: MagicMock): command = "command" options = {"option": "1"} timeout = 123 coroutine = hook_process_hashtable(command, options, timeout) future = coroutine.send(None) assert isinstance(future, FutureProcess) mock_method.assert_called_once_with( command, options, timeout, get_callback_name(weechat_task_cb), future.id ) with pytest.raises(StopIteration) as excinfo: coroutine.send((command, 0, "out", "err")) assert excinfo.value.value == (command, 0, "out", "err") @patch.object(weechat, "hook_process_hashtable") def test_hook_process_hashtable_chunked(mock_method: MagicMock): command = "command" options = {"option": "1"} timeout = 123 coroutine = hook_process_hashtable(command, options, timeout) future = coroutine.send(None) assert isinstance(future, FutureProcess) mock_method.assert_called_once_with( command, options, timeout, get_callback_name(weechat_task_cb), future.id ) assert isinstance(coroutine.send((command, -1, "o1", "e1")), FutureProcess) assert isinstance(coroutine.send((command, -1, "o2", "e2")), FutureProcess) with pytest.raises(StopIteration) as excinfo: coroutine.send((command, 0, "o3", "e3")) assert excinfo.value.value == (command, 0, "o1o2o3", "e1e2e3") @patch.object(slack.http, "available_file_descriptors") def test_hook_process_hashtable_wait_on_max_file_descriptors( mock_available_file_descriptors: MagicMock, ): mock_available_file_descriptors.return_value = 0 coroutine = hook_process_hashtable("", {}, 0) future = coroutine.send(None) assert isinstance(future, FutureTimer) mock_available_file_descriptors.return_value = 9 assert isinstance(coroutine.send((0,)), FutureTimer) mock_available_file_descriptors.return_value = 10 assert isinstance(coroutine.send((0,)), FutureProcess)