from contextlib import contextmanager
from typing import List, Union, Optional
from sqlalchemy_authorize.constants import CRUD
from sqlalchemy_authorize.utils import classproperty, is_dunder
[docs]class BasePermissionsMixin:
"""BaseClass to add a field-level authorization policy to a ``db.Model``
E.g.::
class BaseUser(BasePermissionsMixin, db.Model):
__permissions__ = BasePermissionsMixin.load_permissions(
# Public permissions
read=["id", "username"],
# Role-based permissions
self=[
# The user can provide ``username`` and ``fullname``
# to ``__init__`` (as keyword args) and to ``__setattr__``.
(["create", "update"], ["username", "fullname"]),
# The user can read/delete the entire model.
"read",
"delete"
],
friend=[("read", ["fullname"])], # (in addition to public "read")
admin="*" # i.e., all actions on all fields
)
id = db.Column(db.String(128), primary_key=True)
username = db.Column(db.String(128), nullable=False)
fullname = db.Column(db.String(128), nullable=False)
ssn = db.Column(db.String(10), nullable=True)
Permission is denied by default (unlike other flask-authorization
libraries, where you typically have to wrap functions for
authorization to be checked). Also unlike other libraries,
permissions are at the field level. That's key to interoperability
with libraries like :mod:`graphene_sqlalchemy`.
Field-level permissions imply row-level permissions. If you are
allowed to update ``BaseUser.username``, then you are assumed to
have the update permission on ``BaseUser``.
This assumes CRUD actions by default ("create", "read", "update",
"delete"), where:
- "read" fields: columns, composites, properties, relationships, *and methods/functions*.
- "create" & "update" fields: columns or settable properties.
- "delete" usually isn't concerned with individual fields.
TODO: This will still currently fail if the method requires additional permissions.
Using :class:`BaseUser`.
>>> user = BaseUser(id="123")
>>> sorted(user.roles)
['admin', 'public', 'self']
>>> sorted(user.actions)
['create', 'delete', 'read', 'update']
>>> user._protected
True
.. field-level authorization policy: <https://docs.osohq.com/guides/enforcement/field.html>`_.
"""
__permissions__ = None
DEFAULT_ACTIONS = [e.value for e in CRUD]
PUBLIC_ROLE = "public" # The name of the "public" / fallback role.
def __init__(self, *args, protected=True, check_create=False, **kwargs):
"""Checks create permissions on each of the kwargs
before initializing the model if ``check_create``.
>>> BaseUser(id="123", check_create=True) # the current user doesn't have create permissions
Traceback (most recent call last):
PermissionError: ...
>>> BaseUser(id="123", check_create=False)
<BaseUser 123>
The reason we can't check create permissions by default is that
it disrupts the relational part of the ORM.
"""
# ``self.__setattr__`` requires ``_protected`` to resolve.
# This gets around that circular dependency.
super().__setattr__("_protected", False)
super().__setattr__("actions", self.__class__.actions)
# For use with :meth:`Wrapper.allow` and :meth:`Wrapper.deny`
self._allowed_fields = {action: [] for action in self.actions}
self._forbidden_fields = {action: [] for action in self.actions}
if check_create:
with self.protected():
for key in kwargs.keys():
self.authorize("create", key)
# This requires this mixin to be included before SQLAlchemy's declarative base.
# TODO: Filter kwargs for those with ``create`` permissions.
super().__init__(*args, **kwargs)
self._protected = protected
[docs] @classmethod
def load_permissions(cls, *, actions=None, **kwargs):
r"""Convenience method for creating a ``__permissions__``
dict. (You can also just pass a completed dictionary directly.)
Final permissions dictionary is of the shape::
{"<role>": {"<action>": ["field_1", "field_2"], ...}}
where passing an empty list of field is equivalent to
denying that action on the entire model.
E.g.:
>>> from pprint import pprint
>>> pprint(BasePermissionsMixin.load_permissions(
... read=["id", "username"],
... self=[
... (["create", "update"], ["username", "fullname"]),
... "read",
... "delete",
... "custom_action"
... ],
... friend=[("read", ["fullname"])],
... admin="*"
... ))
{'admin': {'create': ['*'],
'custom_action': ['*'],
'delete': ['*'],
'read': ['*'],
'update': ['*']},
'friend': {'read': ['fullname', 'id', 'username']},
'public': {'read': ['id', 'username']},
'self': {'create': ['username', 'fullname'],
'custom_action': ['*'],
'delete': ['*'],
'read': ['*'],
'update': ['username', 'fullname']}}
:param actions: a list of actions to include (needed to
expand wildcards like ``admin="*"``.)
Defaults to CRUD + any custom actions found in
the role fields.
:param read: "public" read permissions. Defaults to None,
i.e., not included / forbidden at row & field levels.
If provided, this is copied into all other "read"
permissions and to a new "public" role.
:param create: "public" create permissions. Should typically
not be used.
:param update: "public" update permissions. Should typically
not be used.
:param delete: "public" delete permissions. Should typically
not be used.
:param kwargs: pairs of roles and permissions. E.g.::
- ``"<role>": { "read": ["field_1", ...]}`` (leave as is)
- ``"<role>": "*"`` (grant all permissions)
- ``"<role>": [ "read", ("update", "id") ]`` (expand and fill
defaults).
:return:
"""
permissions = {}
for action_name in cls.DEFAULT_ACTIONS:
action = kwargs.pop(action_name, None)
if type(action) is list:
permissions["public"] = {action_name: action}
# Get a list of all available actions
# Do this before reading permissions in order to expand wildcards.
if type(actions) is not list:
actions = set(cls.DEFAULT_ACTIONS)
for (role, permission) in kwargs.items():
if type(permission) is str: # "*"
continue
for rule in permission:
if type(rule) is str:
# "<action>"
actions.add(rule)
elif type(rule) is tuple:
# (("<action>" | ["<action_1>", ...]), ["<field_1>", ...])
rule_actions, _ = rule
if type(rule_actions) is str:
# ("<action>", ["<field_1>", ...])
rule_actions = [rule_actions]
# (["<action_1>", ...], ["<field_1>", ...])
actions |= set(rule_actions)
actions = list(actions)
for (role, permission) in kwargs.items():
permissions[role] = {}
if type(permission) is dict:
# Skip expansion & leave as is.
# (``"friend": {"read": ["id", "username", "fullname"]}"``)
permissions[role] = permission
elif permission == "*":
# Allow all actions. (``{"admin": "*"}``)
permissions[role] = {action: ["*"] for action in actions}
else:
# Expand list rule into dict.
for rule in permission:
if type(rule) is str:
# Allow this action on all fields.
# (``"self": ["read"]``)
permissions[role][rule] = ["*"]
else:
assert (
type(rule) is tuple and len(rule) == 2
), "Invalid permission shorthand."
role_actions, fields = rule
if type(role_actions) is str:
role_actions = [role_actions]
# Our ``actions, fields`` tuple now has a form like:
# ``(["create", "update"], ["username", "fullname"])``
for action in role_actions:
# Copy over default read permissions from public (if there are any)
fields += permissions.get("public", {}).get(action, [])
permissions[role][action] = fields
return permissions
@property
def permissions(self) -> dict:
"""Proxy for interacting with permissions dictionary."""
return self.__permissions__
@permissions.setter
def permissions(self, value: dict) -> dict:
"""Setter for permissions dictionary proxy."""
self.__permissions__ = self.load_permissions(**value)
@property
def exempted_fields(self):
return (
list(BasePermissionsMixin.__dict__.keys()) +
[
# Attributes that are set in ``__init__``.
"_protected",
"_allowed_fields",
"_forbidden_fields",
# SQL Alchemy generics.
"_sa_instance_state",
"_sa_class_manager",
"_sa_registry",
"_decl_class_registry",
"permissions",
"metadata",
"registry"
# You'll have to use another solution (like ``oso``)
# if you want row-level authorization in queries.
"query",
"query_class",
]
)
# noinspection PyMethodParameters
@classproperty
def roles(cls) -> List[str]:
"""The roles that are included in ``self.__permissions__``."""
return list(cls.__permissions__.keys())
# noinspection PyMethodParameters
@classproperty
def actions(cls) -> List[str]:
"""The actions that are included in ``self.__permissions__``."""
return list(
set(
[
action
for permission in cls.__permissions__.values()
for action in permission.keys()
]
)
)
@property
def always_allowed_fields(self) -> List[str]:
"""Attributes that do not require authorization.
This is the opposite of :meth:``authorizable_fields``.
"""
return [key for key in dir(self) if not self.requires_authorization(key)]
@property
def authorizable_fields(self) -> List[str]:
"""Attributes that should be checked for authorization
This is the opposite of :meth:``always_authorized_fields``.
For attributes that have been check for authorization,
see :meth:`authorized_fields` or :meth:`authorized_fields_for`.
"""
return [key for key in dir(self) if self.requires_authorization(key)]
[docs] def authorized_fields_for(self, role: str, action: str) -> List[str]:
"""The fields that an actor with ``role`` is allowed to perform
``action`` on.
This is a subset of ``authorizable_fields`` and won't include
fields that are in ``always_allowed_fields``.
"""
fields = self.permissions.get(role, {}).get(action, [])
return fields
[docs] def authorized_fields(self, action):
"""Returns all the ``authorizable_fields`` that the
current user is allowed to perform ``action`` on."""
fields = []
for field in self.authorizable_fields:
# noinspection PyBroadException
try:
self.authorize(action, field)
except:
# The exceptions will vary depending on how you implement
# :meth:`authorize_field`.
continue
fields.append(field)
return field
[docs] def requires_authorization(self, key):
"""Checks whether an attribute ``key`` should be authorized.
Exempted attributes include:
- Methods/attributes on this mixin (``authorizable_fields``,
``always_allowed_fields``, etc.).
- Dunder methods/attributes (i.e., ``__some_method__``).
- SQLAlchemy generics (``_sa_class_manager``, ``_sa_instance_manager``).
.. DANGER::
You can still expose sensitive information through these
unauthorized fields! This project bears no liability if
you forget to be careful.
"""
return (
not is_dunder(key) # E.g.: "__dict__"
and key != "exempted_fields"
and key not in self.exempted_fields
and self._protected
)
[docs] def protect(self):
"""Turns on authorization."""
self._protected = True
return self
[docs] def expose(self):
"""Turns off authorization.
.. NOTE::
It's recommended that you keep on authorization by default,
and turn it off selectively with a context manager like
:meth:`exposed` or (even better) :meth:`allowed`.
"""
self._protected = False
return self
[docs] @contextmanager
def protected(self):
"""Turns on authorization during the current context.
When the context exits, returns to the prior ``_protected``.
>>> user = BaseUser(id="123", protected=False)
>>> user.id
'123'
>>> user.id = "456"
>>> user.id
'456'
>>> with user.protected():
... user.id = "123"
Traceback (most recent call last):
PermissionError: ...
>>> user.id
'456'
.. NOTE::
It's recommended that you keep on authorization by default,
and turn it off selectively with a context manager like
:meth:`exposed` or (even better) :meth:`allowed`.
"""
was_protected = self._protected
self.protect()
try:
yield
finally:
self._protected = was_protected
[docs] @contextmanager
def exposed(self):
"""Turns off authentication during the current context.
.. NOTE::
When possible, consider more granularly relaxing permissions
via :meth:`allowed` to relax particular actions on
particular fields.
"""
was_protected = self._protected
self.expose()
try:
yield
finally:
self._protected = was_protected
[docs] def allow(
self,
action: Union[str, List[str]],
field: Optional[Union[str, List[str]]] = None,
) -> List[str]:
"""Allow ``action`` (s) on ``field`` (s).
.. NOTE::
Consider using :meth:`allowed` instead, to restrict
additional permissions to a ``with`` statement.
:param action: The action(s) to allow.
:param field: Which field(s) to allow ``action``(s) on,
defaults to allow ``action`` on all fields.
:return: ``action`` as list.
"""
if field is None:
field = self.authorizable_fields
elif type(field) is str:
field = [field]
if type(action) is not list:
action = [action]
for a in action:
self._allowed_fields[a] = field
return action
[docs] def deny(
self,
action: Union[str, List[str]],
field: Optional[Union[str, List[str]]] = None,
) -> List[str]:
"""Deny ``action`` (s) on ``field`` (s).
:param action: The action(s) to deny.
:param field: Which field(s) to deny ``action``(s) on,
defaults to deny ``action`` on all fields.
:return: ``action`` as list.
"""
if field is None:
# Allow action on all fields
field = self.authorizable_fields
elif type(field) is str:
field = [field]
if type(action) is not list:
action = [action]
for a in action:
self._forbidden_fields[a] = field
return action
[docs] @contextmanager
def allowed(
self,
action: Union[str, List[str]],
field: Optional[Union[str, List[str]]] = None,
):
"""Allow ``action`` (s) on ``field`` (s) during the
current context.
>>> user = BaseUser(id="123")
>>> user.id = "456"
Traceback (most recent call last):
PermissionError: ...
>>> user.id
'123'
>>> with user.allowed(CRUD.UPDATE.value):
... user.id = "456"
>>> user.id
'456'
>>> with user.allowed(CRUD.UPDATE, "fullname"):
... user.fullname = "John Doe"
... user.username = "jdoe"
Traceback (most recent call last):
PermissionError: ...
>>> with user.allowed(CRUD.READ, ["fullname", "username"]):
... print(user.fullname)
... print(user.username)
John Doe
None
:param action: The action(s) to allow.
:param field: Which field(s) to allow ``action``(s) on,
defaults to allow ``action`` on all fields.
"""
actions = self.allow(action, field)
try:
yield
finally:
for a in actions:
self._allowed_fields[a] = []
[docs] @contextmanager
def denied(
self,
action: Union[str, List[str]],
field: Optional[Union[str, List[str]]] = None,
):
"""Temporarily deny ``action``(s) on ``field``(s)
(optionally restricted to ``field``).
>>> user = BaseUser(id="123")
>>> user.id
'123'
>>> with user.denied(CRUD.READ, "id"):
... user.id
Traceback (most recent call last):
PermissionError: ...
:param action: The action(s) to deny.
:param field: Which field(s) to deny ``action``(s) on,
defaults to deny ``action`` on all fields.
"""
actions = self.deny(action, field)
try:
yield
finally:
for a in actions:
self._forbidden_fields[a] = []
# noinspection PyMethodMayBeStatic
[docs] def error(self, action: str):
"""Returns an appropriate exception for the action.
This method expects to be overloaded. E.g.:
:class:`OsoPermissionsMixin` will raise a
:exec:`ForbiddenError` for create/update/delete, but a
:exec:`NotFoundError` for reads.
:returns: Permission Error (does not raise this error!)
"""
return PermissionError(f"Current user is not allowed to perform '{action}'.")
[docs] def authorize_field(self, action: str, key: str):
"""This is where you actually implement the check.
For an example, see :class:`OsoPermissionsMixin`.
Usually, you can rely on this being called indirectly
(when setting/getting/deleting attributes).
This is meant as a placeholder method, not a working
example, that authorizes only public actions. In practice,
you'll want to implement your role-based / relation-based
/ attribute-based access control here (or use a solution
like :mod:`oso`).
:param action: One of CRUD or a custom action.
:param key: The attribute/field to authorize.
:returns: ``None`` if the action is allowed.
:raises: :exec:`PermissionError` (or some custom error like
:exec:`oso.ForbiddenError`) if not allowed.
"""
if key in self.authorized_fields_for("public", action):
return
raise self.error(action)
[docs] def authorize(self, action, key):
"""Check whether the current user is allowed to perform
``action`` on ``self.model.<key>``.
First checks for exceptions to normal ``oso`` rules due
to :meth:`allow` or :meth:`deny`, otherwise passes
the authorization request on to ``oso``.
"""
if key == "requires_authorization" or not self.requires_authorization(key):
return
if key in self._forbidden_fields.get(action, []):
if action == "read":
raise self.error("read")
# If we're not even allowed to read the currently model,
# throw a not found error, otherwise fallback to a
# forbidden error.
self.authorize("read", key)
raise self.error(action)
elif key in self.authorizable_fields and key not in self._allowed_fields.get(
action, []
):
# Required to avoid sending oso into a self-referential death spiral.
with self.exposed():
self.authorize_field(action, key)
return None
def __setattr__(self, key, value):
"""Checks whether the current user is allowed to
set the current attribute before setting."""
self.authorize(CRUD.UPDATE, key)
return super().__setattr__(key, value)
def __getattr__(self, item):
# Pre-initialized, these fields haven't yet been defined
if item in ["_allowed_fields", "_forbidden_fields"]:
return {}
elif item == "_protected":
return False
elif item == "__name__":
return type(self).__name__
elif item in dir(self):
# TODO: Don't know why we sometimes end up here.
self.authorize(CRUD.READ, item)
return self.__dict__[item]
raise AttributeError(f"'{self.__name__}' has no attribute '{item}'")
def __getattribute__(self, item):
"""Checks with authorizer whether the current user is allowed
to read the current attribute before returning the value."""
if item != "authorize":
self.authorize(CRUD.READ, item)
return object.__getattribute__(self, item)
def __delattr__(self, item):
"""Checks with authorizer whether the current user
is allowed to delete the current attribute before returning
the value.
.. NOTE::
This shouldn't be necessary very often, as we're typically
more interested in protecting rows from being deleted than
pseudocolumns in the ORM super().
"""
self.authorize(CRUD.DELETE, item)
super().__delattr__(item)