diff options
author | Trygve Aaberge <trygveaa@gmail.com> | 2023-01-21 19:32:16 +0100 |
---|---|---|
committer | Trygve Aaberge <trygveaa@gmail.com> | 2024-02-18 11:32:53 +0100 |
commit | 73cf96863d6529e98cfa9541da4473029479eae0 (patch) | |
tree | 9037a92950f42d615e8e01302c72a7251a6df647 | |
parent | ec5e23bd7a6a928a1b0aa28fadc9b6554c7b2641 (diff) | |
download | wee-slack-73cf96863d6529e98cfa9541da4473029479eae0.tar.gz |
Make sure tasks raising exceptions are finished
When a task raises an execution we have to make sure to continue the
tasks that are waiting on it, and to re-raise this exception in the
__await__ so it propagates up.
-rw-r--r-- | slack/task.py | 48 |
1 files changed, 27 insertions, 21 deletions
diff --git a/slack/task.py b/slack/task.py index ac3677b..c35e42f 100644 --- a/slack/task.py +++ b/slack/task.py @@ -34,6 +34,8 @@ class Future(Awaitable[T]): def __await__(self) -> Generator[Future[T], T, T]: result = yield self + if isinstance(result, Exception): + raise result self.set_result(result) return result @@ -73,38 +75,42 @@ def weechat_task_cb(data: str, *args: Any) -> int: return weechat.WEECHAT_RC_OK +def process_ended_task(task: Task[Any], response: Any): + task.set_result(response) + if task.id in shared.active_tasks: + tasks = shared.active_tasks.pop(task.id) + for active_task in tasks: + task_runner(active_task, response) + if task.id in shared.active_futures: + del shared.active_futures[task.id] + + def task_runner(task: Task[Any], response: Any): while True: try: - try: - future = task.coroutine.send(response) - except HttpError as e: + future = task.coroutine.send(response) + except Exception as e: + result = e.value if isinstance(e, StopIteration) else e + process_ended_task(task, result) + if isinstance(e, HttpError): print_error( f"Error calling URL {e.url}: return code: {e.return_code}, " f"http status code: {e.http_status_code}, error: {e.error}" ) - return - except SlackApiError as e: + elif isinstance(e, SlackApiError): print_error( f"Error from Slack API method {e.method} for workspace " f"{e.workspace.name}: {e.response}" ) - return - - if future.finished: - response = future.result - else: - shared.active_tasks[future.id].append(task) - shared.active_futures[future.id] = future - break - except StopIteration as e: - task.set_result(e.value) - if task.id in shared.active_tasks: - tasks = shared.active_tasks.pop(task.id) - for active_task in tasks: - task_runner(active_task, e.value) - if task.id in shared.active_futures: - del shared.active_futures[task.id] + elif not isinstance(e, StopIteration): + raise e + return + + if future.finished: + response = future.result + else: + shared.active_tasks[future.id].append(task) + shared.active_futures[future.id] = future break |