"""
A module containing some data container base classes.
"""
from __future__ import absolute_import
import json
import logging
import sys
from copy import deepcopy
from six import string_types
import jsonpickle
import jsonschema
logging.basicConfig(
level=logging.DEBUG,
format="%(levelname)s: %(asctime)s {%(filename)s:%(lineno)d}: %(message)s "
)
JSONPICKLE_TYPE_FIELD = "py/object"
TYPE_FIELD = "jsonClass"
#: Maps jsonClass values to the containing Python module object. Useful for (de)serialization. Updated using :func:`update_json_class_index` calls at the end of each module file (such as this one) whose classes may be serialized.
json_class_index = {}
[docs]def update_json_class_index(module_in):
"""Call this function to enable (de)serialization.
Usage example: common.update_json_class_index(sys.modules[__name__]).
"""
import inspect
for name, obj in inspect.getmembers(module_in):
if inspect.isclass(obj):
json_class_index[name] = obj
[docs]def check_class(obj, allowed_types):
results = [isinstance(obj, some_type) for some_type in allowed_types]
# logging.debug(results)
return True in results
[docs]def check_list_item_types(some_list, allowed_types):
check_class_results = [check_class(item, allowed_types=allowed_types) for item in some_list]
# logging.debug(check_class_results)
return not (False in check_class_results)
[docs]def recursively_merge(a, b):
assert a.__class__ == b.__class__, str(a.__class__) + " vs " + str(b.__class__)
if isinstance(b, dict) and isinstance(a, dict):
a_and_b = set(a.keys()) & set(b.keys())
every_key = set(a.keys()) | set(b.keys())
return {
k: recursively_merge(a[k], b[k]) if k in a_and_b
else deepcopy(a[k] if k in a else b[k]) for k in every_key}
elif isinstance(b, list) and isinstance(a, list):
return list(set(a + b))
else:
return deepcopy(b)
[docs]class JsonObject(object):
"""The base class of all Json-serializable data container classes, with many utility methods."""
schema = {
"type": "object",
"properties": {
TYPE_FIELD: {
"type": "string",
},
},
"required": [TYPE_FIELD]
}
def __init__(self):
self.set_type()
[docs] @classmethod
def make_from_dict(cls, input_dict):
"""Defines *our* canonical way of constructing a JSON object from a dict.
All other deserialization methods should use this.
Note that this assumes that json_class_index is populated properly!
- ``from sanskrit_data.schema import *`` before using this should take care of it.
:param input_dict:
:return: A subclass of JsonObject
"""
if input_dict is None:
return None
assert TYPE_FIELD in input_dict, "no type field: " + str(input_dict)
dict_without_id = deepcopy(input_dict)
_id = dict_without_id.pop("_id", None)
def recursively_set_jsonpickle_type(some_dict):
wire_type = some_dict.pop(TYPE_FIELD, None)
if wire_type:
some_dict[JSONPICKLE_TYPE_FIELD] = json_class_index[wire_type].__module__ + "." + wire_type
for key, value in iter(some_dict.items()):
if isinstance(value, dict):
recursively_set_jsonpickle_type(value)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
recursively_set_jsonpickle_type(item)
recursively_set_jsonpickle_type(dict_without_id)
new_obj = jsonpickle.decode(json.dumps(dict_without_id))
# logging.debug(new_obj.__class__)
if _id:
new_obj._id = str(_id)
new_obj.set_type_recursively()
return new_obj
[docs] @classmethod
def make_from_dict_list(cls, input_dict_list):
return [cls.make_from_dict(input_dict=input_dict) for input_dict in input_dict_list]
[docs] @classmethod
def make_from_pickledstring(cls, pickle):
obj = cls.make_from_dict(jsonpickle.decode(pickle))
return obj
[docs] @classmethod
def read_from_file(cls, filename):
try:
with open(filename) as fhandle:
obj = cls.make_from_dict(jsonpickle.decode(fhandle.read()))
return obj
except Exception as e:
logging.error("Error reading " + filename + " : ".format(e))
raise e
[docs] def dump_to_file(self, filename):
try:
with open(filename, "w") as f:
f.write(str(self))
except Exception as e:
logging.error("Error writing " + filename + " : ".format(e))
raise e
[docs] @classmethod
def get_wire_typeid(cls):
return cls.__name__
[docs] @classmethod
def get_jsonpickle_typeid(cls):
return cls.__module__ + "." + cls.__name__
[docs] @classmethod
def get_json_map_list(cls, some_list):
return [item.to_json_map() for item in some_list]
[docs] def set_type(self):
# self.class_type = str(self.__class__.__name__)
setattr(self, TYPE_FIELD, self.__class__.get_wire_typeid())
# setattr(self, TYPE_FIELD, self.__class__.__name__)
[docs] def set_type_recursively(self):
self.set_type()
for key, value in iter(self.__dict__.items()):
if isinstance(value, JsonObject):
value.set_type_recursively()
elif isinstance(value, list):
for item in value:
if isinstance(item, JsonObject):
item.set_type_recursively()
[docs] def set_jsonpickle_type_recursively(self):
self.set_type()
for key, value in iter(self.__dict__.items()):
if isinstance(value, JsonObject):
value.set_type_recursively()
elif isinstance(value, list):
for item in value:
if isinstance(item, JsonObject):
item.set_jsonpickle_type_recursively()
def __str__(self):
return json.dumps(self.to_json_map(), sort_keys=True, indent=2)
[docs] def set_from_dict(self, input_dict):
if input_dict:
for key, value in iter(input_dict.items()):
if isinstance(value, list):
setattr(self, key,
[JsonObject.make_from_dict(item) if isinstance(item, dict) else item for item in value])
elif isinstance(value, dict):
setattr(self, key, JsonObject.make_from_dict(value))
else:
setattr(self, key, value)
[docs] def set_from_id(self, db_interface, id):
return self.set_from_dict(db_interface.find_by_id(id=id))
[docs] def to_json_map(self):
"""One convenient way of 'serializing' the object.
So, the type must be properly set.
Many functions accept such json maps, just as they accept strings.
"""
self.set_type_recursively()
json_map = {}
for key, value in iter(self.__dict__.items()):
# logging.debug("%s %s", key, value)
if isinstance(value, JsonObject):
json_map[key] = value.to_json_map()
elif isinstance(value, list):
json_map[key] = [item.to_json_map() if isinstance(item, JsonObject) else item for item in value]
else:
json_map[key] = value
return json_map
[docs] def equals_ignore_id(self, other):
# Makes a unicode copy.
def to_unicode(input):
if isinstance(input, dict):
return {key: to_unicode(value) for key, value in iter(input.items())}
elif isinstance(input, list):
return [to_unicode(element) for element in input]
elif isinstance(input, string_types):
return input.encode('utf-8')
else:
return input
dict1 = to_unicode(self.to_json_map())
dict1.pop("_id", None)
# logging.debug(self.__dict__)
# logging.debug(dict1)
dict2 = to_unicode(other.to_json_map())
dict2.pop("_id", None)
# logging.debug(other.__dict__)
# logging.debug(dict2)
return dict1 == dict2
[docs] def update_collection(self, db_interface):
"""Do JSON validation and write to database."""
self.set_type_recursively()
if hasattr(self, "schema"):
self.validate(db_interface)
updated_doc = db_interface.update_doc(self.to_json_map())
updated_obj = JsonObject.make_from_dict(updated_doc)
return updated_obj
# To delete referrent items also, use appropriate method in JsonObjectNode.
[docs] def delete_in_collection(self, db_interface):
assert hasattr(self, "_id"), "_id not present!"
return db_interface.delete_doc(self._id)
[docs] def validate(self, db_interface=None):
"""Validate the JSON serialization of this object using the schema member. Called before database writes.
:param db_interface: Potentially useful in subclasses to perform validations (eg. is the target_id valid).
This value may not be available: for example when called from the from_details methods.
:return: a boolean.
"""
self.validate_schema()
# Override and call this method to add extra validations.
[docs] def validate_schema(self):
json_map = self.to_json_map()
json_map.pop("_id", None)
# logging.debug(str(self))
from jsonschema import ValidationError
from jsonschema import SchemaError
try:
jsonschema.validate(json_map, self.schema)
# Subobjects could have specialized validation rules, specified using validate_schema overrides. Hence we specially call those methods.
for key, value in iter(self.__dict__.items()):
# logging.debug("%s %s", key, value)
if isinstance(value, JsonObject):
value.validate_schema()
elif isinstance(value, list):
json_map[key] = [item.validate_schema() if isinstance(item, JsonObject) else item for item in value]
else:
pass
except SchemaError as e:
logging.error(jsonpickle.dumps(self.schema))
raise e
except ValidationError as e:
logging.error(e.message)
logging.error(self)
logging.error(self.schema)
logging.error(json_map)
raise e
[docs] @classmethod
def from_id(cls, id, db_interface):
"""Returns None if nothing is found."""
item_dict = db_interface.find_by_id(id=id)
item = None
if item_dict is not None:
item = cls.make_from_dict(item_dict)
return item
[docs]class TargetValidationError(Exception):
def __init__(self, allowed_types, target_obj, targetting_obj):
self.allowed_types = allowed_types
self.target_obj = target_obj
self.targetting_obj = targetting_obj
def __str__(self):
return "%s\n targets object \n" \
"%s,\n" \
"which does not belong to \n" \
"%s" % (self.targetting_obj, self.target_obj, str(self.allowed_types))
# noinspection PyProtectedMember
[docs]class Target(JsonObject):
schema = recursively_merge(JsonObject.schema, {
"type": "object",
"properties": {
TYPE_FIELD: {
"enum": ["Target"]
},
"container_id": {
"type": "string"
}
},
"required": ["container_id"]
})
[docs] def get_target_entity(self, db_interface):
"""Returns null if db_interface doesnt have any such entity."""
return JsonObject.from_id(id=self.container_id, db_interface=db_interface)
[docs] @classmethod
def from_details(cls, container_id):
target = Target()
target.container_id = container_id
target.validate()
return target
[docs] @classmethod
def from_ids(cls, container_ids):
return [Target.from_details(str(container_id)) for container_id in container_ids]
[docs] @classmethod
def from_containers(cls, containers):
return Target.from_ids(container_ids=[container._id for container in containers])
[docs]class JsonObjectWithTarget(JsonObject):
"""A JsonObject with a target field."""
schema = recursively_merge(JsonObject.schema, ({
"type": "object",
"description": "A JsonObject with a target field.",
"properties": {
"targets": {
"type": "array",
"items": Target.schema,
"description": "This field lets us define a directed graph involving JsonObjects stored in a database."
}
}
}))
target_class = Target
[docs] @classmethod
def get_allowed_target_classes(cls):
return []
[docs] def validate_targets(self, targets, allowed_types, db_interface):
if targets and len(targets) > 0 and db_interface is not None:
for target in targets:
target_entity = target.get_target_entity(db_interface=db_interface)
if not check_class(target_entity, allowed_types):
raise TargetValidationError(allowed_types=allowed_types, targetting_obj=self,
target_obj=target_entity)
[docs] def validate(self, db_interface=None):
super(JsonObjectWithTarget, self).validate(db_interface=db_interface)
if hasattr(self, "targets"):
self.validate_targets(targets=self.targets, allowed_types=self.get_allowed_target_classes(),
db_interface=db_interface)
[docs] def get_targetting_entities(self, db_interface, entity_type=None):
# Alas, the below shows that no index is used:
# curl -sg vedavaapi.org:5984/vedavaapi_ullekhanam_db/_explain -H content-type:application/json -d '{"selector": {"targets": {"$elemMatch": {"container_id": "4b9f454f5aa5414e82506525d015ac68"}}}}'|jq
# TODO: Use index.
find_filter = {
"targets": {
"$elemMatch": {
"container_id": str(self._id)
}
}
}
targetting_objs = [JsonObject.make_from_dict(item) for item in db_interface.find(find_filter)]
if entity_type is not None:
targetting_objs = list(filter(lambda obj: isinstance(obj, json_class_index[entity_type]), targetting_objs))
return targetting_objs
# noinspection PyProtectedMember,PyAttributeOutsideInit,PyAttributeOutsideInit,PyTypeChecker
[docs]class JsonObjectNode(JsonObject):
"""Represents a tree (not a general Directed Acyclic Graph) of JsonObjectWithTargets.
`A video describing its use <https://youtu.be/neVeKcxzeQI>`_.
"""
schema = recursively_merge(
JsonObject.schema, {
"id": "JsonObjectNode",
"properties": {
TYPE_FIELD: {
"enum": ["JsonObjectNode"]
},
"content": JsonObject.schema,
"children": {
"type": "array",
"items": {
'type': 'object',
'$ref': "JsonObjectNode"
}
}
},
"required": [TYPE_FIELD]
}
)
"""Recursively valdiate target-types."""
[docs] def validate_children_types(self):
for child in self.children:
if not check_class(self.content, child.content.get_allowed_target_classes()):
raise TargetValidationError(targetting_obj=child, allowed_types=child.content.get_allowed_target_classes(),
target_obj=self.content)
for child in self.children:
child.validate_children_types()
[docs] def validate(self, db_interface=None):
# Note that the below recursively validates ALL members (including content and children).
super(JsonObjectNode, self).validate(db_interface=None)
self.validate_children_types()
[docs] @classmethod
def from_details(cls, content, children=None):
if children is None:
children = []
node = JsonObjectNode()
# logging.debug(content)
# Strangely, without the backend.data_containers, the below test failed on 20170501
node.content = content
# logging.debug(check_list_item_types(children, [JsonObjectNode]))
node.children = children
node.validate(db_interface=None)
return node
[docs] def update_collection(self, db_interface):
# But we don't call self.validate() as child.content.targets mayn't be set.
self.validate_children_types()
# The content is validated within the below call.
self.content = self.content.update_collection(db_interface)
for child in self.children:
if (not hasattr(child.content, "targets")) or child.content.targets is None or len(
child.content.targets) == 0:
child.content.targets = [child.content.target_class()]
assert len(child.content.targets) == 1
child.content.targets[0].container_id = str(self.content._id)
child.update_collection(db_interface)
[docs] def delete_in_collection(self, db_interface):
self.fill_descendents(db_interface=db_interface, depth=100)
for child in self.children:
child.delete_in_collection(db_interface)
# Delete or disconnect children before deleting oneself.
self.content.delete_in_collection(db_interface)
[docs] def fill_descendents(self, db_interface, depth=10, entity_type=None):
targetting_objs = self.content.get_targetting_entities(db_interface=db_interface, entity_type=entity_type)
self.children = []
if depth > 0:
for targetting_obj in targetting_objs:
child = JsonObjectNode.from_details(content=targetting_obj)
child.fill_descendents(db_interface=db_interface, depth=depth - 1)
self.children.append(child)
[docs]class ScriptRendering(JsonObject):
schema = recursively_merge(JsonObject.schema, ({
"type": "object",
"properties": {
TYPE_FIELD: {
"enum": ["ScriptRendering"]
},
"text": {
"type": "string",
},
"encoding_scheme": {
"type": "string",
},
},
"required": ["text"]
}))
[docs] @classmethod
def from_details(cls, text, encoding_scheme=None):
obj = ScriptRendering()
obj.text = text
if encoding_scheme is not None:
obj.encoding_scheme = encoding_scheme
obj.validate()
return obj
[docs]class Text(JsonObject):
schema = recursively_merge(JsonObject.schema, ({
"type": "object",
"properties": {
TYPE_FIELD: {
"enum": ["Text"]
},
"script_renderings": {
"type": "array",
"items": {
"type": ScriptRendering.schema
}
},
"language_code": {
"type": "string",
},
}
}))
[docs] @classmethod
def from_details(cls, script_renderings, language_code=None):
obj = Text()
obj.script_renderings = script_renderings
if language_code is not None:
obj.language_code = language_code
return obj
[docs]def get_schemas(module_in):
import inspect
schemas = {}
for name, obj in inspect.getmembers(module_in):
if inspect.isclass(obj) and hasattr(obj, "schema"):
schemas[name] = obj.schema
return schemas
# Essential for depickling to work.
update_json_class_index(sys.modules[__name__])
logging.debug(json_class_index)