Source code for sanskrit_data.db.mongodb
""".. note:: For undocumented classes and methods, please see superclass documentation in :mod:`sanskrit_data.db`."""
import logging
from bson import ObjectId
from sanskrit_data.db import DbInterface, ClientInterface
logging.basicConfig(
level=logging.DEBUG,
format="%(levelname)s: %(asctime)s {%(filename)s:%(lineno)d}: %(message)s "
)
[docs]def get_db_collection_names(db_collection_string):
"""
:param db_collection_string: A string like someDb.someCollection or just someCollection, which is interpreted as someCollection.someCollection.
:return: An object with db and collection names.
"""
name_parts = db_collection_string.split(".")
assert len(name_parts) > 0
obj = {"db": name_parts[0],
"collection": name_parts[0],
}
if len(name_parts) == 2:
obj["collection"] = name_parts[1]
return obj
[docs]class Client(ClientInterface):
def __init__(self, url):
try:
from pymongo import MongoClient
self.client = MongoClient(host=url)
except Exception as e:
logging.error("Error initializing MongoDB database; aborting.")
raise e
[docs] def get_database(self, db_name):
db_details = get_db_collection_names(db_collection_string=db_name)
return self.client[db_details["db"]][db_details["collection"]]
[docs] def get_database_interface(self, db_name):
return Collection(some_collection=self.get_database(db_name=db_name))
[docs] def delete_database(self, db_name):
"""Deletes a collection, does not bother with the database."""
db_details = get_db_collection_names(db_collection_string=db_name)
self.client[db_details["db"]].drop_collection(db_details["collection"])
[docs]class Collection(DbInterface):
def __init__(self, some_collection):
logging.info("Initializing collection :" + str(some_collection))
self.mongo_collection = some_collection
[docs] def find_by_id(self, id):
return self.find_one(find_filter={"_id": id})
[docs] def find_one(self, find_filter):
_fix_id_filter(filter=find_filter)
result = self.mongo_collection.find_one(filter=find_filter)
_fix_id(doc=result)
return result
[docs] def find(self, find_filter):
results = self.mongo_collection.find(find_filter)
for result in results:
_fix_id(doc=result)
yield result
[docs] def update_doc(self, doc):
from pymongo import ReturnDocument
if "_id" in doc:
filter = {"_id": ObjectId(doc["_id"])}
doc.pop("_id", None)
else:
filter = doc
updated_doc = self.mongo_collection.find_one_and_update(filter, {"$set": doc}, upsert=True,
return_document=ReturnDocument.AFTER)
_fix_id(doc=updated_doc)
return updated_doc
[docs] def delete_doc(self, doc_id):
self.mongo_collection.delete_one({"_id": ObjectId(doc_id)})
[docs] def add_index(self, keys_dict, index_name):
self.mongo_collection.create_index(keys=list(keys_dict.items()), name=index_name, background=True)
def _fix_id(doc):
if doc is not None and "_id" in doc:
doc["_id"] = str(doc["_id"])
def _fix_id_filter(filter):
if "_id" in filter:
filter["_id"] = ObjectId(filter["_id"])