#!/usr/bin/env python import os, sys, time, re, pdb from threading import Thread, Lock from helpers import * from operator import itemgetter import traceback class cluster_class: def __init__(self): self.hosts = {} self.index = {} self.daemon_log_counter = [] self.parsers = [] def host_names(self): return ksort(self.hosts) def register_parser(self, parser_class): self.parsers.append(parser_class) def get_parser(self, parser_name): for parser in self.parsers: if parser.__class__.__name__ == parser_name: return parser def get_host(self, host): return self.hosts[host] def tell(self): toret = {} for host in self.hosts: toret[host] = self.hosts[host].tell() return toret def tell_sum(self): toret = 0 for host in self.hosts: toret += self.hosts[host].tell() return toret def size(self): toret = 0 for host in self.hosts: toret += self.hosts[host].size() return toret def seek(self, positions): # make sure positions in argument are valid for host in self.hosts: if host not in positions.keys(): print "cannot find", positions raise "Invalid_Positions" # seek each host to saved position for host in positions: self.hosts[host].seek(positions[host]) return True def seek_beginning(self): for host in self.hosts: self.hosts[host].seek(0) return True def add_log(self, logname): log = logfile_class(logname) hostname = log.hostname() sys.stderr.write("""adding log "%s" for host %s\n""" % (logname, hostname)) if not self.hosts.has_key(hostname): self.hosts[hostname] = host_class() self.hosts[hostname].add_log(log) def get_position_by_date(self, goto_date): try: return self.index[goto_date]["position"] except KeyError: # can't find position in cache, calculate on the fly # for cmp_date in ksort(self.index): if goto_date <= cmp_date: return self.index[cmp_date]["position"] return None def parse(self, threaded = False): if threaded and (not hasattr(self,"parse_t") or self.parse_t == None): self.parse_t = Thread(target=self.parse, name='parse-thread', args = [True] ) self.parse_t.start() return self.parse_t print "parsing begins" daemon_log_counter = {} self.seek_beginning() for date in self: self.index[date.date] = { "position":date.position, "log_counter":{} } for host in self.hosts: self.index[date.date]["log_counter"][host]=0 try: for log in date[host]: self.index[date.date]["log_counter"][host]+=1 for parser_class in self.parsers: parser_class.parse_line(date, log) # count how many logs per daemon try: daemon_log_counter[log.daemon()]+=1 except KeyError: daemon_log_counter[log.daemon()]=1 except "Eof": # no more logs for this host pass self.daemon_log_counter = sorted(daemon_log_counter.items(), key=itemgetter(1), reverse=True) print "parsing ends." def eof(self): for host in self.hosts: if not self.hosts[host].eof(): # print "All logs are not EOF yet", host return False print "All logs are EOF!" return True def __iter__(self): return self def next(self): if self.eof(): raise StopIteration return log_date_class(cluster = self) def instance(self): toret = cluster_class() for host in self.hosts: toret.hosts[host] = host_class() for log in self.hosts[host].logs: toret.hosts[host].logs.append(logfile_class(log.fname)) toret.index = self.index toret.daemon_log_counter = self.daemon_log_counter toret.parsers = self.parsers return toret class log_date_class: def __init__(self, cluster): self.cluster = cluster self.date = None self.hosts = cluster.hosts.keys() self.position = cluster.tell() newtime = None # 1st run, must find out what is the oldest date for each host for host in self.hosts: while True: try: newtime = time.strptime("2007 " + cluster.hosts[host].readline()[0:15], "%Y %b %d %H:%M:%S") except "Eof": break except ValueError: print "parsing error in line", cluster.hosts[host].tell() else: break if newtime: if not self.date or newtime < self.date: self.date = newtime if not cluster.hosts[host].eof(): cluster.hosts[host].backline() # this should almost never happen, but just in case. if not self.date: raise "Strange_Eof" def __str__(self): return time.strftime("%b %d %H:%M:%S", self.date) def __getitem__(self, host): return log_date_host(self.cluster, self.cluster.hosts[host], self.date) def __iter__(self): return self class log_date_host: def __init__(self, cluster, host, date): self.cluster = cluster self.host = host self.date = date self.parent_date = date def __iter__(self): return self def next(self): position = self.host.tell() self.host.readline() try: if time.strptime("2007 " + self.host.cur_line[0:15], "%Y %b %d %H:%M:%S") <= self.date: return log_line_class(self.parent_date, self.host, position, self.host.cur_line) except: return log_line_class(self.parent_date, self.host, position, self.host.cur_line) self.host.backline() raise StopIteration class log_line_class: def __init__(self, date, host, position, line): self.host = host self.position = position self.line = line self.parse = Memoize(self.parse_uncached) self.parent_date = date self.parent_host = host def parse_uncached(self): try: return re.findall(r"""^(... .. ..:..:..) %s ([-_0-9a-zA-Z \.\/\(\)]+)(\[[0-9]+\])?(:)? (.*)$""" % self.hostname(), self.line)[0] except: return [ None, None, None, None, None ] def __str__(self): return self.line def date(self): try: return time.strptime("2007 " + self.line[0:15], "%Y %b %d %H:%M:%S") except: return False def hostname(self): return self.line[16:].split(" ", 1)[0] def daemon(self): return self.parse()[1] def message(self): return self.parse()[4] class host_class: def __init__(self): self.logs = [] self.log_idx = 0 # first log self.log_ptr = 0 # first char self.cur_line = None def __str__(self): return self.hostname() def add_log(self, logfile): for inc in range(0,len(self.logs)): if logfile.time_end() < self.logs[inc].time_begin(): self.logs.insert(inc, logfile) break else: self.logs.append(logfile) def hostname(self): return self.logs[0].hostname() # try: return self.logs[0].hostname() # except: return None def tell(self): sumsize = 0 if self.log_idx > 0: for inc in range(0, self.log_idx): sumsize += self.logs[inc].size() try: sumsize += self.fp().tell() except TypeError: pass return sumsize def size(self): sumsize = 0 for inc in range(0, len(self.logs)): sumsize += self.logs[inc].size() return sumsize def eof(self): if self.tell() >= self.size(): return True return False def seek(self, offset, whence = 0): if whence == 1: offset = self.tell() + offset elif whence == 2: offset = self.size() + offset sumsize = 0 for inc in range(0, len(self.logs)): if offset <= sumsize + self.logs[inc].size(): offset -= sumsize self.log_idx = inc self.log_ptr = offset self.logs[inc].seek(offset) return True sumsize += self.logs[inc].size() raise "Off_Boundaries" def seek_and_read(self, offset, whence = 0): self.seek(offset, whence) return self.readline() def time(self): return time.strptime("2007 " + self.cur_line[0:15], "%Y %b %d %H:%M:%S") def fp(self): return self.logs[self.log_idx] def backline(self): self.seek(-len(self.cur_line), 1) def readline(self): if self.eof(): raise "Eof" while True: position = self.fp().tell() fromfile = self.fp().fname toret = self.fp().readline() if len(toret) == 0: if self.log_idx < len(self.logs): self.log_idx += 1 self.fp().seek(0) continue else: return "" if len(toret) > 0 or toret == "": self.cur_line = toret self.cur_file = fromfile self.cur_pos = position return toret else: print "invalid line", toret class logfile_class: def __init__(self,fname): self.fname = fname self.fp = open(fname) def hostname(self): pos = self.fp.tell() self.seek(0) toret = self.fp.readline()[16:].split(" ")[0] self.fp.seek(pos) return toret def time_begin(self): pos = self.fp.tell() self.fp.seek(0) toret = time.strptime(self.fp.readline()[0:15], "%b %d %H:%M:%S") self.fp.seek(pos) return toret def time_end(self): pos = self.fp.tell() bs = 1024 if self.size() < bs: bs = self.size() self.fp.seek(-bs, 2) buf = self.fp.read(bs) bufsplit = buf.split("\n") bufsplit.reverse() for line in bufsplit: if len(line) == 0: continue try: toret = time.strptime(line[0:15], "%b %d %H:%M:%S") except ValueError: print "Error in conversion"; continue else: break self.fp.seek(pos) return toret def size(self): return os.path.getsize(self.fname) def eof(self): return self.fp.tell() > self.size() def readline(self): return self.fp.readline() def seek(self,pos): # if cmdline["verbose"]: # print "seeking to position %d for file %s" % (pos, self.fname) # traceback.print_stack() self.fp.seek(pos) def tell(self): return self.fp.tell()