1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
from __future__ import annotations
from typing import (
Any,
Awaitable,
Coroutine,
Generator,
List,
Optional,
Tuple,
TypeVar,
Union,
)
from uuid import uuid4
import weechat
from slack.shared import shared
from slack.util import get_callback_name
T = TypeVar("T")
class Future(Awaitable[T]):
def __init__(self, future_id: Optional[str] = None):
if future_id is None:
self.id = str(uuid4())
else:
self.id = future_id
self._finished = False
self._result: Optional[T] = None
def __await__(self) -> Generator[Future[T], T, T]:
result = yield self
self.set_result(result)
return result
@property
def finished(self):
return self._finished
@property
def result(self):
return self._result
def set_result(self, result: T):
self._result = result
self._finished = True
class FutureProcess(Future[Tuple[str, int, str, str]]):
pass
class FutureTimer(Future[Tuple[int]]):
pass
class Task(Future[T]):
def __init__(self, coroutine: Coroutine[Future[Any], Any, T]):
super().__init__()
self.coroutine = coroutine
def weechat_task_cb(data: str, *args: Any) -> int:
future = shared.active_futures.pop(data)
future.set_result(args)
tasks = shared.active_tasks.pop(data)
for task in tasks:
task_runner(task, args)
return weechat.WEECHAT_RC_OK
def task_runner(task: Task[Any], response: Any):
while True:
try:
future = task.coroutine.send(response)
if future.finished:
response = future.result
else:
shared.active_tasks[future.id].append(task)
shared.active_futures[future.id] = future
break
except StopIteration as e:
task.set_result(e.value)
if task.id in shared.active_tasks:
tasks = shared.active_tasks.pop(task.id)
for active_task in tasks:
task_runner(active_task, e.value)
if task.id in shared.active_futures:
del shared.active_futures[task.id]
break
def create_task(coroutine: Coroutine[Future[Any], Any, T]) -> Task[T]:
task = Task(coroutine)
task_runner(task, None)
return task
async def gather(*requests: Union[Future[T], Coroutine[Any, Any, T]]) -> List[T]:
# TODO: Should probably propagate first exception
tasks = [
create_task(request) if isinstance(request, Coroutine) else request
for request in requests
]
return [await task for task in tasks]
async def sleep(milliseconds: int):
future = FutureTimer()
weechat.hook_timer(
milliseconds, 0, 1, get_callback_name(weechat_task_cb), future.id
)
return await future
|