From d06a7ccccc8224a5f745000e9a2bfedb80a712ec Mon Sep 17 00:00:00 2001 From: Ryan Huber Date: Sat, 25 Oct 2014 14:39:14 -0700 Subject: make history more reliable --- wee_slack.py | 133 ++++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 87 insertions(+), 46 deletions(-) diff --git a/wee_slack.py b/wee_slack.py index 862226e..6a48a80 100644 --- a/wee_slack.py +++ b/wee_slack.py @@ -5,6 +5,7 @@ import json import sys import re import os +import pickle import random import socket import sha @@ -812,14 +813,19 @@ def slack_mark_channel_read(channel_id): channel.find(channel_id).mark_read() #NOTE: switched to async/curl because sync slowed down the UI -def async_slack_api_request(domain, token, request, data): +def async_slack_api_request(domain, token, request, post_data, priority=False): t = time.time() + post_elements = pickle.dumps([domain, token, request, post_data]) request += "?t=%s" % t - data["token"] = token - data = urllib.urlencode(data) - post = {"post": "1", "postfields": data} + post_data["token"] = token + post_data = urllib.urlencode(post_data) + post = {"post": "1", "postfields": post_data} url = 'https://%s/api/%s' % (domain, request) - queue.append(['url:%s' % (url), post, 20000, 'url_processor_cb', str(data)]) + queue_item = ['url:%s' % (url), post, 20000, 'url_processor_cb', post_elements] + if priority != True: + queue.append(queue_item) + else: + queue.insert(0, queue_item) #def async_slack_api_request(request, data): # t = time.time() @@ -834,58 +840,93 @@ url_processor_lock=False #funny, right? big_data = {} +class Backoff(object): + def __init__(self): + self.timer = time.time() + self.counter = 0 + self.backoff = False + self.INTERVAL = 1 + def check(self): + if self.backoff == False: + return True + else: + wait_until = (self.counter * self.INTERVAL) + self.timer + if time.time() > wait_until: + self.backoff = False + self.counter = 0 + return False + def back_off(self): + if self.counter == 0: + self.timer = time.time() + self.backoff = True + self.counter += 1 + +backoff = Backoff() + def async_queue_cb(data, remaining_calls): - global url_processor_lock - if url_processor_lock == False: - url_processor_lock=True - if len(queue) > 0: - item = queue.pop(0) - try: - query = urlparse.parse_qs(item[-1]) - if query.has_key("channel") and item[0].find('history') > -1: - channel = query["channel"][0] - channel = channels.find(channel) - channel.server.prnt("downloading channel history for %s" % (channel.name), backlog=True) - except: - pass - if item.__class__ == list: - #w.hook_process_hashtable(*item) - command = 'curl --data "%s" %s' % (item[1]["postfields"], item[0][4:]) - w.hook_process(command, 10000, item[3], item[4]) + if backoff.check(): + global url_processor_lock + if url_processor_lock == False: + url_processor_lock=True + if len(queue) > 0: + item = queue.pop(0) + try: + query = urlparse.parse_qs(item[1]["postfields"]) + if query.has_key("channel") and item[0].find('history') > -1: + channel = query["channel"][0] + channel = channels.find(channel) + channel.server.prnt("downloading channel history for %s" % (channel.name), backlog=True) + except: + pass + if item.__class__ == list: + #w.hook_process_hashtable(*item) + command = 'curl --data "%s" %s' % (item[1]["postfields"], item[0][4:]) + w.hook_process(command, 10000, item[3], item[4]) + else: + item.mark_read(False) + url_processor_lock=False else: - item.mark_read(False) url_processor_lock=False - else: - url_processor_lock=False return w.WEECHAT_RC_OK def url_processor_cb(data, command, return_code, out, err): + data=pickle.loads(data) global url_processor_lock, big_data - url_processor_lock=False - if return_code == 0: - url_processor_lock=False identifier = sha.sha(str(data) + command).hexdigest() if not big_data.has_key(identifier): big_data[identifier] = '' big_data[identifier] += out - try: - my_json = json.loads(big_data[identifier]) - except: - my_json = False - if my_json: - query = urlparse.parse_qs(data) - if query.has_key("channel"): - channel = query["channel"][0] - if query.has_key("token"): - token = query["token"][0] - message_json = json.loads(big_data[identifier]) - del big_data[identifier] - if message_json.has_key("messages"): - messages = message_json["messages"].reverse() - for message in message_json["messages"]: - message["myserver"] = servers.find(token).domain - message["channel"] = servers.find(token).channels.find(channel) - process_message(message) + query = data[3] + if query.has_key("channel"): + channel = query["channel"] + if return_code == 0: + url_processor_lock=False + try: +# big_data[identifier] = big_data[identifier].encode('ascii', 'ignore') + my_json = json.loads(big_data[identifier]) + except: + url_processor_lock=False + backoff.back_off() + dbg("curl failed, doing again...\n%s" % (big_data[identifier])) + async_slack_api_request(*data, priority=True) + my_json = False + pass +# my_json = False + + if my_json: + query = data[3] + if query.has_key("channel"): + channel = query["channel"] + token = data[1] + message_json = json.loads(big_data[identifier]) + del big_data[identifier] + if message_json.has_key("messages"): + messages = message_json["messages"].reverse() + for message in message_json["messages"]: + message["myserver"] = servers.find(token).domain + message["channel"] = servers.find(token).channels.find(channel) + process_message(message) + return w.WEECHAT_RC_OK #def slack_api_request(request, data): -- cgit