Source code for sanskrit_data.db.couchdb

""".. note:: For undocumented classes and methods, please see superclass documentation in :mod:`sanskrit_data.db`.
"""

from __future__ import absolute_import

import copy
import logging

from sanskrit_data.db import DbInterface, ClientInterface

logging.basicConfig(
  level=logging.DEBUG,
  format="%(levelname)s: %(asctime)s {%(filename)s:%(lineno)d}: %(message)s "
)


[docs]def strip_revision_in_copy(doc_map): """ Strip the _rev field in a deep copy of doc_map and return it. :param dict doc_map: A dict representation of a JSON document. :return: doc_map itself without _rev """ new_doc = copy.deepcopy(doc_map) new_doc.pop("_rev", None) return new_doc
[docs]class CloudantApiClient(ClientInterface): def __init__(self, url): # logging.debug(url) import yurl parse_result = yurl.URL(url=url) import re url_without_credentials = re.sub(parse_result.username + ":" + parse_result.authorization, "", url) from cloudant.client import CouchDB self.client = CouchDB(user=parse_result.username, auth_token=parse_result.authorization, url=url_without_credentials, connect=True, auto_renew=True) # logging.debug(self.client) assert self.client is not None, logging.error(self.client)
[docs] def get_database(self, db_name): # noinspection PyTypeChecker db = self.client.get(db_name, default=None) if db is not None: return db else: return self.client.create_database(db_name)
[docs] def get_database_interface(self, db_name): return CloudantApiDatabase(db=self.get_database(db_name=db_name))
[docs] def delete_database(self, db_name): self.client.delete_database(db_name)
[docs]class CloudantApiDatabase(DbInterface): def __init__(self, db): logging.info("Initializing db :" + str(db)) self.db = db
[docs] def update_doc(self, doc): super(CloudantApiDatabase, self).update_doc(doc=doc) if "_id" not in doc: from uuid import uuid4 doc["_id"] = uuid4().hex if self.exists(doc_id=doc["_id"]): db_doc = self.db[doc["_id"]] db_doc.fetch() new_doc = doc new_doc["_rev"] = db_doc["_rev"] db_doc.clear() db_doc.update(new_doc) db_doc.save() return copy.deepcopy(strip_revision_in_copy(db_doc)) else: new_doc = self.db.create_document(data=doc, throw_on_exists=False) return strip_revision_in_copy(new_doc)
[docs] def exists(self, doc_id): try: db_doc = self.db[doc_id] return db_doc.exists() except KeyError: return False
[docs] def delete_doc(self, doc_id): """Beware: This leaves the document in the local cache! But other methods in this class should compensate.""" if self.exists(doc_id=doc_id): db_doc = self.db[doc_id] db_doc.fetch() db_doc.delete() else: logging.warn("Trying to delete non-existant doc " + doc_id) pass
[docs] def find_by_id(self, id): try: db_doc = self.db[id] if db_doc.exists(): db_doc.fetch() return strip_revision_in_copy(doc_map=db_doc) else: return None except KeyError: return None
[docs] def find(self, find_filter): from cloudant.query import Query query = Query(self.db, selector=find_filter) for doc in query.result: yield strip_revision_in_copy(doc_map=doc)
[docs] def find_by_indexed_key(self, index_name, key): raise Exception("Not implemented")
[docs] @staticmethod def get_index_doc_name(name): return '_design/' + name
[docs] def update_index(self, name, fields, upsert=False): self.db.create_query_index(design_document_id=self.get_index_doc_name(name=name), index_name=name, fields=fields) self.db.get_query_indexes() # TODO: This only exists in cloudant database. raise Exception("Not implemented: need to check if index is created.")
[docs]class CouchdbApiClient(ClientInterface): """.. note:: Prefer :class:`CloudantApiClient`.""" def __init__(self, url): from couchdb import Server self.server = Server(url=url) # noinspection PyBroadException
[docs] def get_database(self, db_name): try: return self.server[db_name] except: return self.server.create(db_name)
[docs] def get_database_interface(self, db_name): return CouchdbApiDatabase(db=self.get_database(db_name=db_name))
[docs] def delete_database(self, db_name): self.server.delete(db_name)
[docs]class CouchdbApiDatabase(DbInterface): """.. note:: Prefer :class:`CloudantApiDatabase`.""" def __init__(self, db): logging.info("Initializing db :" + str(db)) self.db = db
[docs] def set_revision(self, doc_map): from couchdb import ResourceNotFound try: doc_map["_rev"] = self.db[doc_map["_id"]]["_rev"] except ResourceNotFound: pass
[docs] def update_doc(self, doc): super(CouchdbApiDatabase, self).update_doc(doc=doc) if not hasattr(doc, "_id"): from uuid import uuid4 doc._id = uuid4().hex self.set_revision(doc_map=doc) # logging.debug(doc) result_tuple = self.db.save(doc) assert result_tuple[0] == doc._id, logging.error(str(result_tuple[0]) + " vs " + doc._id) return doc
[docs] def delete_doc(self, doc_id): from couchdb import ResourceNotFound try: map_to_delete = self.db[doc_id] self.db.delete(map_to_delete) except ResourceNotFound: pass
[docs] def find_by_id(self, id): from couchdb import ResourceNotFound try: return strip_revision_in_copy(self.db[id]) except ResourceNotFound: return None
[docs] def find_by_indexed_key(self, index_name, key): raise Exception("not implemented")
[docs] def find(self, find_filter): for row in self.db.find(query=find_filter): yield strip_revision_in_copy(row.doc)