1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
import pytest
from wee_slack import EventRouter, ProcessNotImplemented, SlackRequest
def test_EventRouter(mock_weechat):
# Sending valid json adds to the queue.
e = EventRouter()
e.receive_json('{}')
assert len(e.queue) == 1
# Handling an event removes from the queue.
e = EventRouter()
# Create a function to test we are called
e.proc['testfunc'] = lambda x, y: x
e.receive_json('{"type": "testfunc"}')
e.handle_next()
assert len(e.queue) == 0
# Handling a local event removes from the queue.
e = EventRouter()
# Create a function to test we are called
e.proc['local_testfunc'] = lambda x, y: x
e.receive_json('{"type": "local_testfunc"}')
e.handle_next()
assert len(e.queue) == 0
# Handling an event without an associated processor
# raises an exception.
e = EventRouter()
# Create a function to test we are called
e.receive_json('{"type": "testfunc"}')
with pytest.raises(ProcessNotImplemented):
e.handle_next()
def test_EventRouterReceivedata(mock_weechat):
e = EventRouter()
context = e.store_context(SlackRequest('xoxoxoxox', "rtm.startold", {"meh": "blah"}))
print context
e.receive_httprequest_callback(context, 1, -1, ' {"JSON": "MEH", ', 4)
#print len(e.reply_buffer)
context = e.store_context(SlackRequest('xoxoxoxox', "rtm.startold", {"meh": "blah"}))
print context
e.receive_httprequest_callback(context, 1, -1, ' "JSON2": "MEH", ', 4)
#print len(e.reply_buffer)
context = e.store_context(SlackRequest('xoxoxoxox', "rtm.startold", {"meh": "blah"}))
print context
e.receive_httprequest_callback(context, 1, 0, ' "JSON3": "MEH"}', 4)
#print len(e.reply_buffer)
try:
e.handle_next()
e.handle_next()
e.handle_next()
e.handle_next()
except:
pass
print e.context
#assert False
context = e.store_context(SlackRequest('xoxoxoxox', "rtm.start", {"meh": "blah"}))
rtmstartdata = open('_pytest/data/http/rtm.start.json', 'r').read()
e.receive_httprequest_callback(context, 1, -1, rtmstartdata[:5000], 4)
e.receive_httprequest_callback(context, 1, 0, rtmstartdata[5000:], 4)
e.handle_next()
#print len(e.reply_buffer)
#print e.teams
for t in e.teams:
#print vars(e.teams[t])
for c in e.teams[t].channels:
pass
#print c
for u in e.teams[t].users:
pass
#print vars(u)
# e = EventRouter()
# # Create a function to test we are called
# e.receive_json('{"type": "message"}')
# e.handle_next()
# assert False
#assert False
|