#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Script for comparing two objects
"""
import json
from optparse import OptionParser
import logging
logging.basicConfig(format='%(levelname)s:%(funcName)s:%(message)s', level=logging.INFO)
class Comparator(object):
"""
Main workhorse, the object itself
"""
def __init__(self, fn1=None, fn2=None, excluded_attrs=()):
if fn1:
self.obj1 = json.load(fn1)
if fn2:
self.obj2 = json.load(fn2)
self.excluded_attributes = excluded_attrs
if (fn1 and fn2):
logging.debug("self.obj1 = %s\nself.obj2 = %s\nself.excluded_attrs = %s", \
(self.obj1, self.obj2, self.excluded_attributes))
@staticmethod
def _get_keys(obj):
"""
Getter for the current object's keys.
"""
out = set()
for key in obj.keys():
out.add(key)
return out
@staticmethod
def _is_scalar(value):
"""
Primitive version, relying on the fact that JSON cannot
contain any more complicated data structures.
"""
return not isinstance(value, (list, tuple, dict))
def _compare_arrays(self, old_arr, new_arr):
inters = min(old_arr, new_arr)
result = {
u"append": {},
u"remove": {},
u"update": {}
}
for idx in range(len(inters)):
# changed objects, new value is new_arr
if (type(old_arr[idx]) != type(new_arr[idx])):
result['update'][idx] = new_arr[idx]
# another simple variant ... scalars
elif (self._is_scalar(old_arr)):
if old_arr[idx] != new_arr[idx]:
result['update'][idx] = new_arr[idx]
# recursive arrays
elif (isinstance(old_arr[idx], list)):
res_arr = self._compare_arrays(old_arr[idx], \
new_arr[idx])
if (len(res_arr) > 0):
result['update'][idx] = res_arr
# and now nested dicts
elif isinstance(old_arr[idx], dict):
res_dict = self.compare_dicts(old_arr[idx], new_arr[idx])
if (len(res_dict) > 0):
result['update'][idx] = res_dict
# Clear out unused inters in result
out_result = {}
for key in result:
if len(result[key]) > 0:
out_result[key] = result[key]
return out_result
def compare_dicts(self, old_obj=None, new_obj=None):
"""
The real workhorse
"""
if not old_obj and hasattr(self, "obj1"):
old_obj = self.obj1
if not new_obj and hasattr(self, "obj2"):
new_obj = self.obj2
old_keys = set()
new_keys = set()
if old_obj and len(old_obj) > 0:
old_keys = self._get_keys(old_obj)
if new_obj and len(new_obj) > 0:
new_keys = self._get_keys(new_obj)
keys = old_keys | new_keys
result = {
u"append": {},
u"remove": {},
u"update": {}
}
for name in keys:
# Explicitly excluded arguments
if (name in self.excluded_attributes):
continue
# old_obj is missing
if name not in old_obj:
result['append'][name] = new_obj[name]
# new_obj is missing
elif name not in new_obj:
result['remove'][name] = old_obj[name]
# changed objects, new value is new_obj
elif (type(old_obj[name]) != type(new_obj[name])):
result['update'][name] = new_obj[name]
# last simple variant ... scalars
elif (self._is_scalar(old_obj[name])):
if old_obj[name] != new_obj[name]:
result['update'][name] = new_obj[name]
# now arrays
elif (isinstance(old_obj[name], list)):
res_arr = self._compare_arrays(old_obj[name], \
new_obj[name])
if (len(res_arr) > 0):
result['update'][name] = res_arr
# and now nested dicts
elif isinstance(old_obj[name], dict):
res_dict = self.compare_dicts(old_obj[name], new_obj[name])
if (len(res_dict) > 0):
result['update'][name] = res_dict
# Clear out unused keys in result
out_result = {}
for key in result:
if len(result[key]) > 0:
out_result[key] = result[key]
return out_result
if __name__ == "__main__":
usage = "usage: %prog [options] old.json new.json"
parser = OptionParser(usage=usage)
parser.add_option("-x", "--exclude",
action="append", dest="exclude", metavar="ATTR", default=[],
help="attributes which should be ignored when comparing")
(options, args) = parser.parse_args()
logging.debug("options = %s", str(options))
logging.debug("args = %s", str(args))
if len(args) != 2:
parser.error("Script requires two positional arguments, names for old and new JSON file.")
diff = Comparator(file(args[0]), file(args[1]), options.exclude)
print json.dumps(diff.compare_dicts(), indent=4, ensure_ascii=False)