Source code for sqlalchemy_authorize.oso.oso_permissions_mixin

from dataclasses import dataclass
from contextlib import contextmanager

from sqlalchemy_authorize.permissions_mixin import BasePermissionsMixin


[docs]class OsoPermissionsMixin(BasePermissionsMixin): """Authorize your fields using Oso_. E.g. (using the ``User`` model defined in :ref:`conftest.py` and the polar policy provided in ``sqlalchemy_authorize.oso.rbac.polar``): >>> admin = User(id="1", username="root", is_admin=True) >>> john_doe = User(username="john_doe", check_create=True) Traceback (most recent call last): oso.exceptions.ForbiddenError: ... >>> with user_set(app, admin): # A context to set `flask.g.user` ... john_doe = User(username="john_doe", check_create=True) ... john_doe.id = "2" >>> john_doe.username, john_doe.id ('john_doe', '2') >>> with user_set(app, john_doe): ... john_doe.username = "doe_john" ... john_doe.id = "3" Traceback (most recent call last): oso.exceptions.ForbiddenError: ... >>> john_doe.username, john_doe.id ('doe_john', '2') .. _Oso: <https://www.osohq.com/> """
[docs] @staticmethod def get_oso(): """Function to get the current oso instance. By default assumes you've attached ``oso`` to the ``app`` during setup. """ from flask import current_app return current_app.oso
[docs] @staticmethod def get_anonymous_user(): """Returns a mock anonymous user. You'll probably want to overload this with a method that creates an anonymous instance of your `User` model. (You need to call ``oso.register_classes``). But if all you're checking in your polar policies is your ``user.id``, then this may suffice. """ return UserMock(id="anon")
[docs] def get_user(self): """Function to get the current user (which will get passed as the actor to ``oso.authorize_fields`` ). By default assumes a user in ``g.user``. """ from flask import g return getattr(g, "user", self.get_anonymous_user())
[docs] def error(self, action: str): """Returns an appropriate exception for the action. :returns: - :exec:`ForbiddenError` for create/update/delete, or a - :exec:`NotFoundError` for reads. """ if action == "read": return self.get_oso().not_found_error() return self.get_oso().forbidden_error
[docs] def authorize_field(self, action, key): # To avoid self-referential death spiral if oso needs to read actor # attributes. user = self.get_user() if user is None: self.get_oso().authorize_field(user, action, self, key) else: with user.exposed(): self.get_oso().authorize_field(user, action, self, key)
[docs]@dataclass class UserMock: id: str
[docs] @staticmethod @contextmanager def exposed(): yield