Source code for sanskrit_data.schema.common

"""
A module containing some data container base classes.
"""

from __future__ import absolute_import

import json
import logging
import sys
from copy import deepcopy

from six import string_types
import jsonpickle
import jsonschema

logging.basicConfig(
  level=logging.DEBUG,
  format="%(levelname)s: %(asctime)s {%(filename)s:%(lineno)d}: %(message)s "
)

JSONPICKLE_TYPE_FIELD = "py/object"
TYPE_FIELD = "jsonClass"

#: Maps jsonClass values to the containing Python module object. Useful for (de)serialization. Updated using :func:`update_json_class_index` calls at the end of each module file (such as this one) whose classes may be serialized.
json_class_index = {}


[docs]def update_json_class_index(module_in): """Call this function to enable (de)serialization. Usage example: common.update_json_class_index(sys.modules[__name__]). """ import inspect for name, obj in inspect.getmembers(module_in): if inspect.isclass(obj): json_class_index[name] = obj
[docs]def check_class(obj, allowed_types): results = [isinstance(obj, some_type) for some_type in allowed_types] # logging.debug(results) return True in results
[docs]def check_list_item_types(some_list, allowed_types): check_class_results = [check_class(item, allowed_types=allowed_types) for item in some_list] # logging.debug(check_class_results) return not (False in check_class_results)
[docs]def recursively_merge(a, b): assert a.__class__ == b.__class__, str(a.__class__) + " vs " + str(b.__class__) if isinstance(b, dict) and isinstance(a, dict): a_and_b = set(a.keys()) & set(b.keys()) every_key = set(a.keys()) | set(b.keys()) return { k: recursively_merge(a[k], b[k]) if k in a_and_b else deepcopy(a[k] if k in a else b[k]) for k in every_key} elif isinstance(b, list) and isinstance(a, list): return list(set(a + b)) else: return deepcopy(b)
[docs]class JsonObject(object): """The base class of all Json-serializable data container classes, with many utility methods.""" schema = { "type": "object", "properties": { TYPE_FIELD: { "type": "string", }, }, "required": [TYPE_FIELD] } def __init__(self): self.set_type()
[docs] @classmethod def make_from_dict(cls, input_dict): """Defines *our* canonical way of constructing a JSON object from a dict. All other deserialization methods should use this. Note that this assumes that json_class_index is populated properly! - ``from sanskrit_data.schema import *`` before using this should take care of it. :param input_dict: :return: A subclass of JsonObject """ if input_dict is None: return None assert TYPE_FIELD in input_dict, "no type field: " + str(input_dict) dict_without_id = deepcopy(input_dict) _id = dict_without_id.pop("_id", None) def recursively_set_jsonpickle_type(some_dict): wire_type = some_dict.pop(TYPE_FIELD, None) if wire_type: some_dict[JSONPICKLE_TYPE_FIELD] = json_class_index[wire_type].__module__ + "." + wire_type for key, value in iter(some_dict.items()): if isinstance(value, dict): recursively_set_jsonpickle_type(value) elif isinstance(value, list): for item in value: if isinstance(item, dict): recursively_set_jsonpickle_type(item) recursively_set_jsonpickle_type(dict_without_id) new_obj = jsonpickle.decode(json.dumps(dict_without_id)) # logging.debug(new_obj.__class__) if _id: new_obj._id = str(_id) new_obj.set_type_recursively() return new_obj
[docs] @classmethod def make_from_dict_list(cls, input_dict_list): return [cls.make_from_dict(input_dict=input_dict) for input_dict in input_dict_list]
[docs] @classmethod def make_from_pickledstring(cls, pickle): obj = cls.make_from_dict(jsonpickle.decode(pickle)) return obj
[docs] @classmethod def read_from_file(cls, filename): try: with open(filename) as fhandle: obj = cls.make_from_dict(jsonpickle.decode(fhandle.read())) return obj except Exception as e: logging.error("Error reading " + filename + " : ".format(e)) raise e
[docs] def dump_to_file(self, filename): try: with open(filename, "w") as f: f.write(str(self)) except Exception as e: logging.error("Error writing " + filename + " : ".format(e)) raise e
[docs] @classmethod def get_wire_typeid(cls): return cls.__name__
[docs] @classmethod def get_jsonpickle_typeid(cls): return cls.__module__ + "." + cls.__name__
[docs] @classmethod def get_json_map_list(cls, some_list): return [item.to_json_map() for item in some_list]
[docs] def set_type(self): # self.class_type = str(self.__class__.__name__) setattr(self, TYPE_FIELD, self.__class__.get_wire_typeid())
# setattr(self, TYPE_FIELD, self.__class__.__name__)
[docs] def set_type_recursively(self): self.set_type() for key, value in iter(self.__dict__.items()): if isinstance(value, JsonObject): value.set_type_recursively() elif isinstance(value, list): for item in value: if isinstance(item, JsonObject): item.set_type_recursively()
[docs] def set_jsonpickle_type_recursively(self): self.set_type() for key, value in iter(self.__dict__.items()): if isinstance(value, JsonObject): value.set_type_recursively() elif isinstance(value, list): for item in value: if isinstance(item, JsonObject): item.set_jsonpickle_type_recursively()
def __str__(self): return json.dumps(self.to_json_map(), sort_keys=True, indent=2)
[docs] def set_from_dict(self, input_dict): if input_dict: for key, value in iter(input_dict.items()): if isinstance(value, list): setattr(self, key, [JsonObject.make_from_dict(item) if isinstance(item, dict) else item for item in value]) elif isinstance(value, dict): setattr(self, key, JsonObject.make_from_dict(value)) else: setattr(self, key, value)
[docs] def set_from_id(self, db_interface, id): return self.set_from_dict(db_interface.find_by_id(id=id))
[docs] def to_json_map(self): """One convenient way of 'serializing' the object. So, the type must be properly set. Many functions accept such json maps, just as they accept strings. """ self.set_type_recursively() json_map = {} for key, value in iter(self.__dict__.items()): # logging.debug("%s %s", key, value) if isinstance(value, JsonObject): json_map[key] = value.to_json_map() elif isinstance(value, list): json_map[key] = [item.to_json_map() if isinstance(item, JsonObject) else item for item in value] else: json_map[key] = value return json_map
[docs] def equals_ignore_id(self, other): # Makes a unicode copy. def to_unicode(input): if isinstance(input, dict): return {key: to_unicode(value) for key, value in iter(input.items())} elif isinstance(input, list): return [to_unicode(element) for element in input] elif isinstance(input, string_types): return input.encode('utf-8') else: return input dict1 = to_unicode(self.to_json_map()) dict1.pop("_id", None) # logging.debug(self.__dict__) # logging.debug(dict1) dict2 = to_unicode(other.to_json_map()) dict2.pop("_id", None) # logging.debug(other.__dict__) # logging.debug(dict2) return dict1 == dict2
[docs] def update_collection(self, db_interface): """Do JSON validation and write to database.""" self.set_type_recursively() if hasattr(self, "schema"): self.validate(db_interface) updated_doc = db_interface.update_doc(self.to_json_map()) updated_obj = JsonObject.make_from_dict(updated_doc) return updated_obj
# To delete referrent items also, use appropriate method in JsonObjectNode.
[docs] def delete_in_collection(self, db_interface): assert hasattr(self, "_id"), "_id not present!" return db_interface.delete_doc(self._id)
[docs] def validate(self, db_interface=None): """Validate the JSON serialization of this object using the schema member. Called before database writes. :param db_interface: Potentially useful in subclasses to perform validations (eg. is the target_id valid). This value may not be available: for example when called from the from_details methods. :return: a boolean. """ self.validate_schema()
# Override and call this method to add extra validations.
[docs] def validate_schema(self): json_map = self.to_json_map() json_map.pop("_id", None) # logging.debug(str(self)) from jsonschema import ValidationError from jsonschema import SchemaError try: jsonschema.validate(json_map, self.schema) # Subobjects could have specialized validation rules, specified using validate_schema overrides. Hence we specially call those methods. for key, value in iter(self.__dict__.items()): # logging.debug("%s %s", key, value) if isinstance(value, JsonObject): value.validate_schema() elif isinstance(value, list): json_map[key] = [item.validate_schema() if isinstance(item, JsonObject) else item for item in value] else: pass except SchemaError as e: logging.error(jsonpickle.dumps(self.schema)) raise e except ValidationError as e: logging.error(e.message) logging.error(self) logging.error(self.schema) logging.error(json_map) raise e
[docs] @classmethod def from_id(cls, id, db_interface): """Returns None if nothing is found.""" item_dict = db_interface.find_by_id(id=id) item = None if item_dict is not None: item = cls.make_from_dict(item_dict) return item
[docs]class TargetValidationError(Exception): def __init__(self, allowed_types, target_obj, targetting_obj): self.allowed_types = allowed_types self.target_obj = target_obj self.targetting_obj = targetting_obj def __str__(self): return "%s\n targets object \n" \ "%s,\n" \ "which does not belong to \n" \ "%s" % (self.targetting_obj, self.target_obj, str(self.allowed_types))
# noinspection PyProtectedMember
[docs]class Target(JsonObject): schema = recursively_merge(JsonObject.schema, { "type": "object", "properties": { TYPE_FIELD: { "enum": ["Target"] }, "container_id": { "type": "string" } }, "required": ["container_id"] })
[docs] def get_target_entity(self, db_interface): """Returns null if db_interface doesnt have any such entity.""" return JsonObject.from_id(id=self.container_id, db_interface=db_interface)
[docs] @classmethod def from_details(cls, container_id): target = Target() target.container_id = container_id target.validate() return target
[docs] @classmethod def from_ids(cls, container_ids): return [Target.from_details(str(container_id)) for container_id in container_ids]
[docs] @classmethod def from_containers(cls, containers): return Target.from_ids(container_ids=[container._id for container in containers])
[docs]class JsonObjectWithTarget(JsonObject): """A JsonObject with a target field.""" schema = recursively_merge(JsonObject.schema, ({ "type": "object", "description": "A JsonObject with a target field.", "properties": { "targets": { "type": "array", "items": Target.schema, "description": "This field lets us define a directed graph involving JsonObjects stored in a database." } } })) target_class = Target
[docs] @classmethod def get_allowed_target_classes(cls): return []
[docs] def validate_targets(self, targets, allowed_types, db_interface): if targets and len(targets) > 0 and db_interface is not None: for target in targets: target_entity = target.get_target_entity(db_interface=db_interface) if not check_class(target_entity, allowed_types): raise TargetValidationError(allowed_types=allowed_types, targetting_obj=self, target_obj=target_entity)
[docs] def validate(self, db_interface=None): super(JsonObjectWithTarget, self).validate(db_interface=db_interface) if hasattr(self, "targets"): self.validate_targets(targets=self.targets, allowed_types=self.get_allowed_target_classes(), db_interface=db_interface)
[docs] def get_targetting_entities(self, db_interface, entity_type=None): # Alas, the below shows that no index is used: # curl -sg vedavaapi.org:5984/vedavaapi_ullekhanam_db/_explain -H content-type:application/json -d '{"selector": {"targets": {"$elemMatch": {"container_id": "4b9f454f5aa5414e82506525d015ac68"}}}}'|jq # TODO: Use index. find_filter = { "targets": { "$elemMatch": { "container_id": str(self._id) } } } targetting_objs = [JsonObject.make_from_dict(item) for item in db_interface.find(find_filter)] if entity_type is not None: targetting_objs = list(filter(lambda obj: isinstance(obj, json_class_index[entity_type]), targetting_objs)) return targetting_objs
# noinspection PyProtectedMember,PyAttributeOutsideInit,PyAttributeOutsideInit,PyTypeChecker
[docs]class JsonObjectNode(JsonObject): """Represents a tree (not a general Directed Acyclic Graph) of JsonObjectWithTargets. `A video describing its use <https://youtu.be/neVeKcxzeQI>`_. """ schema = recursively_merge( JsonObject.schema, { "id": "JsonObjectNode", "properties": { TYPE_FIELD: { "enum": ["JsonObjectNode"] }, "content": JsonObject.schema, "children": { "type": "array", "items": { 'type': 'object', '$ref': "JsonObjectNode" } } }, "required": [TYPE_FIELD] } ) """Recursively valdiate target-types."""
[docs] def validate_children_types(self): for child in self.children: if not check_class(self.content, child.content.get_allowed_target_classes()): raise TargetValidationError(targetting_obj=child, allowed_types=child.content.get_allowed_target_classes(), target_obj=self.content) for child in self.children: child.validate_children_types()
[docs] def validate(self, db_interface=None): # Note that the below recursively validates ALL members (including content and children). super(JsonObjectNode, self).validate(db_interface=None) self.validate_children_types()
[docs] @classmethod def from_details(cls, content, children=None): if children is None: children = [] node = JsonObjectNode() # logging.debug(content) # Strangely, without the backend.data_containers, the below test failed on 20170501 node.content = content # logging.debug(check_list_item_types(children, [JsonObjectNode])) node.children = children node.validate(db_interface=None) return node
[docs] def update_collection(self, db_interface): # But we don't call self.validate() as child.content.targets mayn't be set. self.validate_children_types() # The content is validated within the below call. self.content = self.content.update_collection(db_interface) for child in self.children: if (not hasattr(child.content, "targets")) or child.content.targets is None or len( child.content.targets) == 0: child.content.targets = [child.content.target_class()] assert len(child.content.targets) == 1 child.content.targets[0].container_id = str(self.content._id) child.update_collection(db_interface)
[docs] def delete_in_collection(self, db_interface): self.fill_descendents(db_interface=db_interface, depth=100) for child in self.children: child.delete_in_collection(db_interface) # Delete or disconnect children before deleting oneself. self.content.delete_in_collection(db_interface)
[docs] def fill_descendents(self, db_interface, depth=10, entity_type=None): targetting_objs = self.content.get_targetting_entities(db_interface=db_interface, entity_type=entity_type) self.children = [] if depth > 0: for targetting_obj in targetting_objs: child = JsonObjectNode.from_details(content=targetting_obj) child.fill_descendents(db_interface=db_interface, depth=depth - 1) self.children.append(child)
[docs]class ScriptRendering(JsonObject): schema = recursively_merge(JsonObject.schema, ({ "type": "object", "properties": { TYPE_FIELD: { "enum": ["ScriptRendering"] }, "text": { "type": "string", }, "encoding_scheme": { "type": "string", }, }, "required": ["text"] }))
[docs] @classmethod def from_details(cls, text, encoding_scheme=None): obj = ScriptRendering() obj.text = text if encoding_scheme is not None: obj.encoding_scheme = encoding_scheme obj.validate() return obj
[docs]class Text(JsonObject): schema = recursively_merge(JsonObject.schema, ({ "type": "object", "properties": { TYPE_FIELD: { "enum": ["Text"] }, "script_renderings": { "type": "array", "items": { "type": ScriptRendering.schema } }, "language_code": { "type": "string", }, } }))
[docs] @classmethod def from_details(cls, script_renderings, language_code=None): obj = Text() obj.script_renderings = script_renderings if language_code is not None: obj.language_code = language_code return obj
[docs]def get_schemas(module_in): import inspect schemas = {} for name, obj in inspect.getmembers(module_in): if inspect.isclass(obj) and hasattr(obj, "schema"): schemas[name] = obj.schema return schemas
# Essential for depickling to work. update_json_class_index(sys.modules[__name__]) logging.debug(json_class_index)