"""
Intro
-----
- For general context and class diagram, refer to :mod:`~sanskrit_data.schema`.
"""
import logging
import sys
from sanskrit_data.schema import common
from sanskrit_data.schema.common import JsonObject, recursively_merge, TYPE_FIELD, update_json_class_index
logging.basicConfig(
level=logging.DEBUG,
format="%(levelname)s: %(asctime)s {%(filename)s:%(lineno)d}: %(message)s "
)
[docs]class UserPermission(JsonObject):
schema = recursively_merge(
JsonObject.schema, {
"properties": {
TYPE_FIELD: {
"enum": ["UserPermission"]
},
"service": {
"type": "string",
"enum": [".*", "ullekhanam"],
"description": "Allowable values should be predetermined regular expressions."
},
"actions": {
"type": "array",
"items": {
"type": "string",
"enum": ["read", "write", "admin"],
},
"description": "Should be an enum in the future."
},
},
}
)
[docs] @classmethod
def from_details(cls, service, actions):
obj = UserPermission()
obj.service = service
obj.actions = actions
return obj
[docs]def hash_password(plain_password):
import bcrypt
# (Using bcrypt, the salt is saved into the hash itself)
return bcrypt.hashpw(plain_password.encode(encoding='utf8'), bcrypt.gensalt()).decode(encoding='utf8')
[docs]class AuthenticationInfo(JsonObject):
schema = recursively_merge(
JsonObject.schema, {
"properties": {
TYPE_FIELD: {
"enum": ["AuthenticationInfo"]
},
"auth_user_id": {
"type": "string"
},
"auth_provider": {
"type": "string",
"enum": ["google", "vedavaapi"]
},
"auth_secret_bcrypt": {
"type": "string",
"description": "This should be hashed, and merits being stored in a database."
},
"auth_secret_plain": {
"type": "string",
"description": "This should NEVER be set when stored in a database; but is good for client-server transmission purposes."
}
}
}
)
VEDAVAAPI_AUTH = "vedavaapi"
def __str__(self):
return self.auth_provider + "____" + self.auth_user_id
[docs] def check_password(self, plain_password):
# Check hased password. Using bcrypt, the salt is saved into the hash itself
import bcrypt
return bcrypt.checkpw(plain_password.encode(encoding='utf8'), self.auth_secret_bcrypt.encode(encoding='utf8'))
[docs] @classmethod
def from_details(cls, auth_user_id, auth_provider, auth_secret_hashed=None):
obj = AuthenticationInfo()
obj.auth_user_id = auth_user_id
obj.auth_provider = auth_provider
if auth_secret_hashed:
obj.auth_secret_hashed = auth_secret_hashed
return obj
[docs] def set_bcrypt_password(self):
if hasattr(self, "auth_secret_plain") and self.auth_secret_plain != "" and self.auth_secret_plain is not None:
self.auth_secret_bcrypt = hash_password(plain_password=self.auth_secret_plain)
delattr(self, "auth_secret_plain")
[docs] def validate_schema(self):
super(AuthenticationInfo, self).validate_schema()
from jsonschema import ValidationError
self.set_bcrypt_password()
if hasattr(self, "auth_secret_hashed") and (self.auth_secret_hashed == "" or self.auth_secret_hashed is None):
raise ValidationError(message="auth_secret_hashed should be non-empty if present.")
[docs]class User(JsonObject):
"""Represents a user of our service."""
schema = recursively_merge(
JsonObject.schema, {
"properties": {
TYPE_FIELD: {
"enum": ["User"]
},
"user_type": {
"type": "string",
"enum": ["human", "bot"]
},
"authentication_infos": {
"type": "array",
"items": AuthenticationInfo.schema,
},
"permissions": {
"type": "array",
"items": UserPermission.schema,
},
},
}
)
[docs] @classmethod
def from_details(cls, user_type, auth_infos, permissions=None):
obj = User()
obj.authentication_infos = auth_infos
obj.user_type = user_type
if permissions:
obj.permissions = permissions
return obj
[docs] def validate_schema(self):
super(User, self).validate_schema()
[docs] def check_permission(self, service, action):
def fullmatch(pattern, string, flags=0):
"""Emulate python-3.4 re.fullmatch()."""
import re
return re.match("(?:" + pattern + r")\Z", string, flags=flags)
if hasattr(self, "permissions"):
for permission in self.permissions:
if fullmatch(pattern=permission.service, string=service):
for permitted_action in permission.actions:
if fullmatch(pattern=permitted_action, string=action):
return True
return False
[docs] def get_user_ids(self):
return [str(auth_info) for auth_info in self.authentication_infos]
# Essential for depickling to work.
update_json_class_index(sys.modules[__name__])
logging.debug(common.json_class_index)