aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRyan Huber <rhuber@gmail.com>2014-10-25 14:39:14 -0700
committerRyan Huber <rhuber@gmail.com>2014-10-25 14:39:14 -0700
commitd06a7ccccc8224a5f745000e9a2bfedb80a712ec (patch)
tree2d427bf6bb889ad36f095f2cae57970d748371ba
parent34d242a23d8a34f1b4a6f7d0967b77aa0af27c31 (diff)
downloadwee-slack-d06a7ccccc8224a5f745000e9a2bfedb80a712ec.tar.gz
make history more reliable
-rw-r--r--wee_slack.py133
1 files changed, 87 insertions, 46 deletions
diff --git a/wee_slack.py b/wee_slack.py
index 862226e..6a48a80 100644
--- a/wee_slack.py
+++ b/wee_slack.py
@@ -5,6 +5,7 @@ import json
import sys
import re
import os
+import pickle
import random
import socket
import sha
@@ -812,14 +813,19 @@ def slack_mark_channel_read(channel_id):
channel.find(channel_id).mark_read()
#NOTE: switched to async/curl because sync slowed down the UI
-def async_slack_api_request(domain, token, request, data):
+def async_slack_api_request(domain, token, request, post_data, priority=False):
t = time.time()
+ post_elements = pickle.dumps([domain, token, request, post_data])
request += "?t=%s" % t
- data["token"] = token
- data = urllib.urlencode(data)
- post = {"post": "1", "postfields": data}
+ post_data["token"] = token
+ post_data = urllib.urlencode(post_data)
+ post = {"post": "1", "postfields": post_data}
url = 'https://%s/api/%s' % (domain, request)
- queue.append(['url:%s' % (url), post, 20000, 'url_processor_cb', str(data)])
+ queue_item = ['url:%s' % (url), post, 20000, 'url_processor_cb', post_elements]
+ if priority != True:
+ queue.append(queue_item)
+ else:
+ queue.insert(0, queue_item)
#def async_slack_api_request(request, data):
# t = time.time()
@@ -834,58 +840,93 @@ url_processor_lock=False
#funny, right?
big_data = {}
+class Backoff(object):
+ def __init__(self):
+ self.timer = time.time()
+ self.counter = 0
+ self.backoff = False
+ self.INTERVAL = 1
+ def check(self):
+ if self.backoff == False:
+ return True
+ else:
+ wait_until = (self.counter * self.INTERVAL) + self.timer
+ if time.time() > wait_until:
+ self.backoff = False
+ self.counter = 0
+ return False
+ def back_off(self):
+ if self.counter == 0:
+ self.timer = time.time()
+ self.backoff = True
+ self.counter += 1
+
+backoff = Backoff()
+
def async_queue_cb(data, remaining_calls):
- global url_processor_lock
- if url_processor_lock == False:
- url_processor_lock=True
- if len(queue) > 0:
- item = queue.pop(0)
- try:
- query = urlparse.parse_qs(item[-1])
- if query.has_key("channel") and item[0].find('history') > -1:
- channel = query["channel"][0]
- channel = channels.find(channel)
- channel.server.prnt("downloading channel history for %s" % (channel.name), backlog=True)
- except:
- pass
- if item.__class__ == list:
- #w.hook_process_hashtable(*item)
- command = 'curl --data "%s" %s' % (item[1]["postfields"], item[0][4:])
- w.hook_process(command, 10000, item[3], item[4])
+ if backoff.check():
+ global url_processor_lock
+ if url_processor_lock == False:
+ url_processor_lock=True
+ if len(queue) > 0:
+ item = queue.pop(0)
+ try:
+ query = urlparse.parse_qs(item[1]["postfields"])
+ if query.has_key("channel") and item[0].find('history') > -1:
+ channel = query["channel"][0]
+ channel = channels.find(channel)
+ channel.server.prnt("downloading channel history for %s" % (channel.name), backlog=True)
+ except:
+ pass
+ if item.__class__ == list:
+ #w.hook_process_hashtable(*item)
+ command = 'curl --data "%s" %s' % (item[1]["postfields"], item[0][4:])
+ w.hook_process(command, 10000, item[3], item[4])
+ else:
+ item.mark_read(False)
+ url_processor_lock=False
else:
- item.mark_read(False)
url_processor_lock=False
- else:
- url_processor_lock=False
return w.WEECHAT_RC_OK
def url_processor_cb(data, command, return_code, out, err):
+ data=pickle.loads(data)
global url_processor_lock, big_data
- url_processor_lock=False
- if return_code == 0:
- url_processor_lock=False
identifier = sha.sha(str(data) + command).hexdigest()
if not big_data.has_key(identifier):
big_data[identifier] = ''
big_data[identifier] += out
- try:
- my_json = json.loads(big_data[identifier])
- except:
- my_json = False
- if my_json:
- query = urlparse.parse_qs(data)
- if query.has_key("channel"):
- channel = query["channel"][0]
- if query.has_key("token"):
- token = query["token"][0]
- message_json = json.loads(big_data[identifier])
- del big_data[identifier]
- if message_json.has_key("messages"):
- messages = message_json["messages"].reverse()
- for message in message_json["messages"]:
- message["myserver"] = servers.find(token).domain
- message["channel"] = servers.find(token).channels.find(channel)
- process_message(message)
+ query = data[3]
+ if query.has_key("channel"):
+ channel = query["channel"]
+ if return_code == 0:
+ url_processor_lock=False
+ try:
+# big_data[identifier] = big_data[identifier].encode('ascii', 'ignore')
+ my_json = json.loads(big_data[identifier])
+ except:
+ url_processor_lock=False
+ backoff.back_off()
+ dbg("curl failed, doing again...\n%s" % (big_data[identifier]))
+ async_slack_api_request(*data, priority=True)
+ my_json = False
+ pass
+# my_json = False
+
+ if my_json:
+ query = data[3]
+ if query.has_key("channel"):
+ channel = query["channel"]
+ token = data[1]
+ message_json = json.loads(big_data[identifier])
+ del big_data[identifier]
+ if message_json.has_key("messages"):
+ messages = message_json["messages"].reverse()
+ for message in message_json["messages"]:
+ message["myserver"] = servers.find(token).domain
+ message["channel"] = servers.find(token).channels.find(channel)
+ process_message(message)
+
return w.WEECHAT_RC_OK
#def slack_api_request(request, data):