Module geoengine.permissions

A wrapper for the GeoEngine permissions API.

Expand source code
'''
A wrapper for the GeoEngine permissions API.
'''

from __future__ import annotations
import ast
from uuid import UUID
from typing import Dict, List, Union
from enum import Enum

import geoengine_openapi_client.models.role
import geoengine_openapi_client.models
import geoengine_openapi_client.api
import geoengine_openapi_client

from geoengine.error import GeoEngineException
from geoengine.resource_identifier import Resource
from geoengine.auth import get_session


class RoleId:
    '''A wrapper for a role id'''
    __role_id: UUID

    def __init__(self, role_id: UUID) -> None:
        self.__role_id = role_id

    @classmethod
    def from_response(cls, response: Dict[str, str]) -> RoleId:
        '''Parse a http response to an `RoleId`'''

        if 'id' not in response:
            raise GeoEngineException(response)

        role_id = response['id']

        return RoleId(UUID(role_id))

    def __eq__(self, other) -> bool:
        '''Checks if two role ids are equal'''
        if not isinstance(other, self.__class__):
            return False

        return self.__role_id == other.__role_id  # pylint: disable=protected-access

    def __str__(self) -> str:
        return str(self.__role_id)

    def __repr__(self) -> str:
        return repr(self.__role_id)


class Role:
    '''A wrapper for a role'''
    name: str
    id: RoleId

    def __init__(self, role_id: Union[UUID, RoleId, str], role_name: str):
        ''' Create a role with name and id '''

        if isinstance(role_id, UUID):
            real_id = RoleId(role_id)
        elif isinstance(role_id, str):
            real_id = RoleId(UUID(role_id))
        else:
            real_id = role_id

        self.id = real_id
        self.name = role_name

    @classmethod
    def from_response(cls, response: geoengine_openapi_client.models.role.Role) -> Role:
        '''Parse a http response to an `RoleId`'''

        role_id = response.id
        role_name = response.name

        return Role(role_id, role_name)

    def __eq__(self, other) -> bool:
        '''Checks if two role ids are equal'''
        if not isinstance(other, self.__class__):
            return False

        return self.id == other.id and self.name == other.name

    def role_id(self) -> RoleId:
        '''get the role id'''
        return self.id

    def __repr__(self) -> str:
        return 'id: ' + repr(self.id) + ', name: ' + repr(self.name)


class UserId:
    '''A wrapper for a role id'''

    def __init__(self, user_id: UUID) -> None:
        self.__user_id = user_id

    @classmethod
    def from_response(cls, response: Dict[str, str]) -> UserId:
        '''Parse a http response to an `UserId`'''
        print(response)
        if 'id' not in response:
            raise GeoEngineException(response)

        user_id = response['id']

        return UserId(UUID(user_id))

    def __eq__(self, other) -> bool:
        '''Checks if two role ids are equal'''
        if not isinstance(other, self.__class__):
            return False

        return self.__user_id == other.__user_id  # pylint: disable=protected-access

    def __str__(self) -> str:
        return str(self.__user_id)

    def __repr__(self) -> str:
        return repr(self.__user_id)


class PermissionListing:
    """
    PermissionListing
    """
    permission: Permission
    resource: Resource
    role: Role

    def __init__(self, permission: Permission, resource: Resource, role: Role):
        ''' Create  a PermissionListing '''
        self.permission = permission
        self.resource = resource
        self.role = role

    @classmethod
    def from_response(cls, response: geoengine_openapi_client.models.PermissionListing) -> PermissionListing:
        ''' Transforms a response PermissionListing to a PermissionListing '''
        return PermissionListing(
            permission=Permission.from_response(response.permission),
            resource=Resource.from_response(response.resource),
            role=Role.from_response(response.role)
        )

    def __eq__(self, other) -> bool:
        '''Checks if two listings are equal'''
        if not isinstance(other, self.__class__):
            return False
        return self.permission == other.permission and self.resource == other.resource and self.role == other.role

    def __repr__(self) -> str:
        return 'Role: ' + repr(self.role) + ', ' \
            + 'Resource: ' + repr(self.resource) + ', ' \
            + 'Permission: ' + repr(self.permission)


class Permission(str, Enum):
    '''A permission'''
    READ = 'Read'
    OWNER = 'Owner'

    def to_api_dict(self) -> geoengine_openapi_client.Permission:
        '''Convert to a dict for the API'''
        return geoengine_openapi_client.Permission(self.value)

    @classmethod
    def from_response(cls, response: geoengine_openapi_client.Permission) -> Permission:
        return Permission(response)


ADMIN_ROLE_ID: RoleId = RoleId(UUID("d5328854-6190-4af9-ad69-4e74b0961ac9"))
REGISTERED_USER_ROLE_ID: RoleId = RoleId(UUID("4e8081b6-8aa6-4275-af0c-2fa2da557d28"))
ANONYMOUS_USER_ROLE_ID: RoleId = RoleId(UUID("fd8e87bf-515c-4f36-8da6-1a53702ff102"))


def add_permission(role: RoleId, resource: Resource, permission: Permission, timeout: int = 60):
    """Add a permission to a resource for a role. Requires admin role."""

    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        permissions_api = geoengine_openapi_client.PermissionsApi(api_client)
        permissions_api.add_permission_handler(geoengine_openapi_client.PermissionRequest(
            role_id=str(role),
            resource=resource.to_api_dict(),
            permission=permission.to_api_dict(),
            _request_timeout=timeout
        ))


def remove_permission(role: RoleId, resource: Resource, permission: Permission, timeout: int = 60):
    """Removes a permission to a resource from a role. Requires admin role."""

    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        permissions_api = geoengine_openapi_client.PermissionsApi(api_client)
        permissions_api.remove_permission_handler(geoengine_openapi_client.PermissionRequest(
            role_id=str(role),
            resource=resource.to_api_dict(),
            permission=permission.to_api_dict(),
            _request_timeout=timeout
        ))


def list_permissions(resource: Resource, timeout: int = 60, offset=0, limit=20) -> List[PermissionListing]:
    '''Lists the roles and permissions assigned to a ressource'''

    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        permission_api = geoengine_openapi_client.PermissionsApi(api_client)
        res = permission_api.get_resource_permissions_handler(
            resource_id=resource.id,
            resource_type=resource.type,
            offset=offset,
            limit=limit,
            _request_timeout=timeout
        )

        return [PermissionListing.from_response(r) for r in res]


def add_role(name: str, timeout: int = 60) -> RoleId:
    """Add a new role. Requires admin role."""

    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        user_api = geoengine_openapi_client.UserApi(api_client)
        response = user_api.add_role_handler(geoengine_openapi_client.AddRole(
            name=name,
            _request_timeout=timeout
        ))

    # TODO: find out why JSON string is faulty
    # parsed_response = json.loads(response)
    parsed_response: dict[str, str] = ast.literal_eval(response)

    return RoleId.from_response(parsed_response)


def remove_role(role: RoleId, timeout: int = 60):
    """Remove a role. Requires admin role."""

    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        user_api = geoengine_openapi_client.UserApi(api_client)
        user_api.remove_role_handler(str(role), _request_timeout=timeout)


def assign_role(role: RoleId, user: UserId, timeout: int = 60):
    """Assign a role to a user. Requires admin role."""

    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        user_api = geoengine_openapi_client.UserApi(api_client)
        user_api.assign_role_handler(str(user), str(role), _request_timeout=timeout)


def revoke_role(role: RoleId, user: UserId, timeout: int = 60):
    """Assign a role to a user. Requires admin role."""

    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        user_api = geoengine_openapi_client.UserApi(api_client)
        user_api.revoke_role_handler(str(user), str(role), _request_timeout=timeout)

Functions

def add_permission(role: RoleId, resource: Resource, permission: Permission, timeout: int = 60)

Add a permission to a resource for a role. Requires admin role.

Expand source code
def add_permission(role: RoleId, resource: Resource, permission: Permission, timeout: int = 60):
    """Add a permission to a resource for a role. Requires admin role."""

    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        permissions_api = geoengine_openapi_client.PermissionsApi(api_client)
        permissions_api.add_permission_handler(geoengine_openapi_client.PermissionRequest(
            role_id=str(role),
            resource=resource.to_api_dict(),
            permission=permission.to_api_dict(),
            _request_timeout=timeout
        ))
def add_role(name: str, timeout: int = 60) ‑> RoleId

Add a new role. Requires admin role.

Expand source code
def add_role(name: str, timeout: int = 60) -> RoleId:
    """Add a new role. Requires admin role."""

    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        user_api = geoengine_openapi_client.UserApi(api_client)
        response = user_api.add_role_handler(geoengine_openapi_client.AddRole(
            name=name,
            _request_timeout=timeout
        ))

    # TODO: find out why JSON string is faulty
    # parsed_response = json.loads(response)
    parsed_response: dict[str, str] = ast.literal_eval(response)

    return RoleId.from_response(parsed_response)
def assign_role(role: RoleId, user: UserId, timeout: int = 60)

Assign a role to a user. Requires admin role.

Expand source code
def assign_role(role: RoleId, user: UserId, timeout: int = 60):
    """Assign a role to a user. Requires admin role."""

    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        user_api = geoengine_openapi_client.UserApi(api_client)
        user_api.assign_role_handler(str(user), str(role), _request_timeout=timeout)
def list_permissions(resource: Resource, timeout: int = 60, offset=0, limit=20) ‑> List[PermissionListing]

Lists the roles and permissions assigned to a ressource

Expand source code
def list_permissions(resource: Resource, timeout: int = 60, offset=0, limit=20) -> List[PermissionListing]:
    '''Lists the roles and permissions assigned to a ressource'''

    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        permission_api = geoengine_openapi_client.PermissionsApi(api_client)
        res = permission_api.get_resource_permissions_handler(
            resource_id=resource.id,
            resource_type=resource.type,
            offset=offset,
            limit=limit,
            _request_timeout=timeout
        )

        return [PermissionListing.from_response(r) for r in res]
def remove_permission(role: RoleId, resource: Resource, permission: Permission, timeout: int = 60)

Removes a permission to a resource from a role. Requires admin role.

Expand source code
def remove_permission(role: RoleId, resource: Resource, permission: Permission, timeout: int = 60):
    """Removes a permission to a resource from a role. Requires admin role."""

    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        permissions_api = geoengine_openapi_client.PermissionsApi(api_client)
        permissions_api.remove_permission_handler(geoengine_openapi_client.PermissionRequest(
            role_id=str(role),
            resource=resource.to_api_dict(),
            permission=permission.to_api_dict(),
            _request_timeout=timeout
        ))
def remove_role(role: RoleId, timeout: int = 60)

Remove a role. Requires admin role.

Expand source code
def remove_role(role: RoleId, timeout: int = 60):
    """Remove a role. Requires admin role."""

    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        user_api = geoengine_openapi_client.UserApi(api_client)
        user_api.remove_role_handler(str(role), _request_timeout=timeout)
def revoke_role(role: RoleId, user: UserId, timeout: int = 60)

Assign a role to a user. Requires admin role.

Expand source code
def revoke_role(role: RoleId, user: UserId, timeout: int = 60):
    """Assign a role to a user. Requires admin role."""

    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        user_api = geoengine_openapi_client.UserApi(api_client)
        user_api.revoke_role_handler(str(user), str(role), _request_timeout=timeout)

Classes

class Permission (value, names=None, *, module=None, qualname=None, type=None, start=1)

A permission

Expand source code
class Permission(str, Enum):
    '''A permission'''
    READ = 'Read'
    OWNER = 'Owner'

    def to_api_dict(self) -> geoengine_openapi_client.Permission:
        '''Convert to a dict for the API'''
        return geoengine_openapi_client.Permission(self.value)

    @classmethod
    def from_response(cls, response: geoengine_openapi_client.Permission) -> Permission:
        return Permission(response)

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var OWNER
var READ

Static methods

def from_response(response: geoengine_openapi_client.Permission) ‑> Permission
Expand source code
@classmethod
def from_response(cls, response: geoengine_openapi_client.Permission) -> Permission:
    return Permission(response)

Methods

def to_api_dict(self) ‑> geoengine_openapi_client.models.permission.Permission

Convert to a dict for the API

Expand source code
def to_api_dict(self) -> geoengine_openapi_client.Permission:
    '''Convert to a dict for the API'''
    return geoengine_openapi_client.Permission(self.value)
class PermissionListing (permission: Permission, resource: Resource, role: Role)

PermissionListing

Create a PermissionListing

Expand source code
class PermissionListing:
    """
    PermissionListing
    """
    permission: Permission
    resource: Resource
    role: Role

    def __init__(self, permission: Permission, resource: Resource, role: Role):
        ''' Create  a PermissionListing '''
        self.permission = permission
        self.resource = resource
        self.role = role

    @classmethod
    def from_response(cls, response: geoengine_openapi_client.models.PermissionListing) -> PermissionListing:
        ''' Transforms a response PermissionListing to a PermissionListing '''
        return PermissionListing(
            permission=Permission.from_response(response.permission),
            resource=Resource.from_response(response.resource),
            role=Role.from_response(response.role)
        )

    def __eq__(self, other) -> bool:
        '''Checks if two listings are equal'''
        if not isinstance(other, self.__class__):
            return False
        return self.permission == other.permission and self.resource == other.resource and self.role == other.role

    def __repr__(self) -> str:
        return 'Role: ' + repr(self.role) + ', ' \
            + 'Resource: ' + repr(self.resource) + ', ' \
            + 'Permission: ' + repr(self.permission)

Class variables

var permissionPermission
var resourceResource
var roleRole

Static methods

def from_response(response: geoengine_openapi_client.models.PermissionListing) ‑> PermissionListing

Transforms a response PermissionListing to a PermissionListing

Expand source code
@classmethod
def from_response(cls, response: geoengine_openapi_client.models.PermissionListing) -> PermissionListing:
    ''' Transforms a response PermissionListing to a PermissionListing '''
    return PermissionListing(
        permission=Permission.from_response(response.permission),
        resource=Resource.from_response(response.resource),
        role=Role.from_response(response.role)
    )
class Role (role_id: Union[UUID, RoleId, str], role_name: str)

A wrapper for a role

Create a role with name and id

Expand source code
class Role:
    '''A wrapper for a role'''
    name: str
    id: RoleId

    def __init__(self, role_id: Union[UUID, RoleId, str], role_name: str):
        ''' Create a role with name and id '''

        if isinstance(role_id, UUID):
            real_id = RoleId(role_id)
        elif isinstance(role_id, str):
            real_id = RoleId(UUID(role_id))
        else:
            real_id = role_id

        self.id = real_id
        self.name = role_name

    @classmethod
    def from_response(cls, response: geoengine_openapi_client.models.role.Role) -> Role:
        '''Parse a http response to an `RoleId`'''

        role_id = response.id
        role_name = response.name

        return Role(role_id, role_name)

    def __eq__(self, other) -> bool:
        '''Checks if two role ids are equal'''
        if not isinstance(other, self.__class__):
            return False

        return self.id == other.id and self.name == other.name

    def role_id(self) -> RoleId:
        '''get the role id'''
        return self.id

    def __repr__(self) -> str:
        return 'id: ' + repr(self.id) + ', name: ' + repr(self.name)

Class variables

var idRoleId
var name : str

Static methods

def from_response(response: geoengine_openapi_client.models.role.Role) ‑> Role

Parse a http response to an RoleId

Expand source code
@classmethod
def from_response(cls, response: geoengine_openapi_client.models.role.Role) -> Role:
    '''Parse a http response to an `RoleId`'''

    role_id = response.id
    role_name = response.name

    return Role(role_id, role_name)

Methods

def role_id(self) ‑> RoleId

get the role id

Expand source code
def role_id(self) -> RoleId:
    '''get the role id'''
    return self.id
class RoleId (role_id: UUID)

A wrapper for a role id

Expand source code
class RoleId:
    '''A wrapper for a role id'''
    __role_id: UUID

    def __init__(self, role_id: UUID) -> None:
        self.__role_id = role_id

    @classmethod
    def from_response(cls, response: Dict[str, str]) -> RoleId:
        '''Parse a http response to an `RoleId`'''

        if 'id' not in response:
            raise GeoEngineException(response)

        role_id = response['id']

        return RoleId(UUID(role_id))

    def __eq__(self, other) -> bool:
        '''Checks if two role ids are equal'''
        if not isinstance(other, self.__class__):
            return False

        return self.__role_id == other.__role_id  # pylint: disable=protected-access

    def __str__(self) -> str:
        return str(self.__role_id)

    def __repr__(self) -> str:
        return repr(self.__role_id)

Static methods

def from_response(response: Dict[str, str]) ‑> RoleId

Parse a http response to an RoleId

Expand source code
@classmethod
def from_response(cls, response: Dict[str, str]) -> RoleId:
    '''Parse a http response to an `RoleId`'''

    if 'id' not in response:
        raise GeoEngineException(response)

    role_id = response['id']

    return RoleId(UUID(role_id))
class UserId (user_id: UUID)

A wrapper for a role id

Expand source code
class UserId:
    '''A wrapper for a role id'''

    def __init__(self, user_id: UUID) -> None:
        self.__user_id = user_id

    @classmethod
    def from_response(cls, response: Dict[str, str]) -> UserId:
        '''Parse a http response to an `UserId`'''
        print(response)
        if 'id' not in response:
            raise GeoEngineException(response)

        user_id = response['id']

        return UserId(UUID(user_id))

    def __eq__(self, other) -> bool:
        '''Checks if two role ids are equal'''
        if not isinstance(other, self.__class__):
            return False

        return self.__user_id == other.__user_id  # pylint: disable=protected-access

    def __str__(self) -> str:
        return str(self.__user_id)

    def __repr__(self) -> str:
        return repr(self.__user_id)

Static methods

def from_response(response: Dict[str, str]) ‑> UserId

Parse a http response to an UserId

Expand source code
@classmethod
def from_response(cls, response: Dict[str, str]) -> UserId:
    '''Parse a http response to an `UserId`'''
    print(response)
    if 'id' not in response:
        raise GeoEngineException(response)

    user_id = response['id']

    return UserId(UUID(user_id))