Module geoengine.permissions

A wrapper for the GeoEngine permissions API.

Expand source code
'''
A wrapper for the GeoEngine permissions API.
'''

from __future__ import annotations
from enum import Enum

import ast
from typing import Dict, Literal, Any
from uuid import UUID

import geoengine_openapi_client

from geoengine.auth import get_session
from geoengine.datasets import DatasetName
from geoengine.error import GeoEngineException
from geoengine.layers import LayerCollectionId, LayerId


class RoleId:
    '''A wrapper for a role id'''

    def __init__(self, role_id: UUID) -> None:
        self.__role_id = role_id

    @classmethod
    def from_response(cls, response: Dict[str, str]) -> RoleId:
        '''Parse a http response to an `RoleId`'''

        if 'id' not in response:
            raise GeoEngineException(response)

        role_id = response['id']

        return RoleId(UUID(role_id))

    def __eq__(self, other) -> bool:
        '''Checks if two role ids are equal'''
        if not isinstance(other, self.__class__):
            return False

        return self.__role_id == other.__role_id  # pylint: disable=protected-access

    def __str__(self) -> str:
        return str(self.__role_id)

    def __repr__(self) -> str:
        return repr(self.__role_id)


class UserId:
    '''A wrapper for a role id'''

    def __init__(self, user_id: UUID) -> None:
        self.__user_id = user_id

    @classmethod
    def from_response(cls, response: Dict[str, str]) -> UserId:
        '''Parse a http response to an `UserId`'''
        print(response)
        if 'id' not in response:
            raise GeoEngineException(response)

        user_id = response['id']

        return UserId(UUID(user_id))

    def __eq__(self, other) -> bool:
        '''Checks if two role ids are equal'''
        if not isinstance(other, self.__class__):
            return False

        return self.__user_id == other.__user_id  # pylint: disable=protected-access

    def __str__(self) -> str:
        return str(self.__user_id)

    def __repr__(self) -> str:
        return repr(self.__user_id)


class Resource:
    '''A wrapper for a resource id'''

    def __init__(self, resource_type: Literal['dataset', 'layer', 'layerCollection'],
                 resource_id: str) -> None:
        '''Create a resource id'''
        self.__type = resource_type
        self.__id = resource_id

    @classmethod
    def from_layer_id(cls, layer_id: LayerId) -> Resource:
        '''Create a resource id from a layer id'''
        return Resource('layer', str(layer_id))

    @classmethod
    def from_layer_collection_id(cls, layer_collection_id: LayerCollectionId) -> Resource:
        '''Create a resource id from a layer collection id'''
        return Resource('layerCollection', str(layer_collection_id))

    @classmethod
    def from_dataset_name(cls, dataset_name: DatasetName) -> Resource:
        '''Create a resource id from a dataset id'''
        return Resource('dataset', str(dataset_name))

    def to_api_dict(self) -> geoengine_openapi_client.Resource:
        '''Convert to a dict for the API'''
        inner: Any = None

        if self.__type == "layer":
            inner = geoengine_openapi_client.LayerResource(type="layer", id=self.__id)
        elif self.__type == "layerCollection":
            inner = geoengine_openapi_client.LayerCollectionResource(type="layerCollection", id=self.__id)
        elif self.__type == "project":
            inner = geoengine_openapi_client.ProjectResource(type="project", id=self.__id)
        elif self.__type == "dataset":
            inner = geoengine_openapi_client.DatasetResource(type="dataset", id=self.__id)

        return geoengine_openapi_client.Resource(inner)


class Permission(str, Enum):
    '''A permission'''
    READ = 'Read'
    OWNER = 'Owner'

    def to_api_dict(self) -> geoengine_openapi_client.Permission:
        '''Convert to a dict for the API'''
        return geoengine_openapi_client.Permission(self.value)


ADMIN_ROLE_ID: RoleId = RoleId(UUID("d5328854-6190-4af9-ad69-4e74b0961ac9"))
REGISTERED_USER_ROLE_ID: RoleId = RoleId(UUID("4e8081b6-8aa6-4275-af0c-2fa2da557d28"))
ANONYMOUS_USER_ROLE_ID: RoleId = RoleId(UUID("fd8e87bf-515c-4f36-8da6-1a53702ff102"))


def add_permission(role: RoleId, resource: Resource, permission: Permission, timeout: int = 60):
    """Add a permission to a resource for a role. Requires admin role."""

    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        permissions_api = geoengine_openapi_client.PermissionsApi(api_client)
        permissions_api.add_permission_handler(geoengine_openapi_client.PermissionRequest(
            role_id=str(role),
            resource=resource.to_api_dict(),
            permission=permission.to_api_dict(),
            _request_timeout=timeout
        ))


def remove_permission(role: RoleId, resource: Resource, permission: Permission, timeout: int = 60):
    """Removes a permission to a resource from a role. Requires admin role."""

    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        permissions_api = geoengine_openapi_client.PermissionsApi(api_client)
        permissions_api.remove_permission_handler(geoengine_openapi_client.PermissionRequest(
            role_id=str(role),
            resource=resource.to_api_dict(),
            permission=permission.to_api_dict(),
            _request_timeout=timeout
        ))


def add_role(name: str, timeout: int = 60) -> RoleId:
    """Add a new role. Requires admin role."""

    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        user_api = geoengine_openapi_client.UserApi(api_client)
        response = user_api.add_role_handler(geoengine_openapi_client.AddRole(
            name=name,
            _request_timeout=timeout
        ))

    # TODO: find out why JSON string is faulty
    # parsed_response = json.loads(response)
    parsed_response: dict[str, str] = ast.literal_eval(response)

    return RoleId.from_response(parsed_response)


def remove_role(role: RoleId, timeout: int = 60):
    """Remove a role. Requires admin role."""

    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        user_api = geoengine_openapi_client.UserApi(api_client)
        user_api.remove_role_handler(str(role), _request_timeout=timeout)


def assign_role(role: RoleId, user: UserId, timeout: int = 60):
    """Assign a role to a user. Requires admin role."""

    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        user_api = geoengine_openapi_client.UserApi(api_client)
        user_api.assign_role_handler(str(user), str(role), _request_timeout=timeout)


def revoke_role(role: RoleId, user: UserId, timeout: int = 60):
    """Assign a role to a user. Requires admin role."""

    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        user_api = geoengine_openapi_client.UserApi(api_client)
        user_api.revoke_role_handler(str(user), str(role), _request_timeout=timeout)

Functions

def add_permission(role: RoleId, resource: Resource, permission: Permission, timeout: int = 60)

Add a permission to a resource for a role. Requires admin role.

Expand source code
def add_permission(role: RoleId, resource: Resource, permission: Permission, timeout: int = 60):
    """Add a permission to a resource for a role. Requires admin role."""

    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        permissions_api = geoengine_openapi_client.PermissionsApi(api_client)
        permissions_api.add_permission_handler(geoengine_openapi_client.PermissionRequest(
            role_id=str(role),
            resource=resource.to_api_dict(),
            permission=permission.to_api_dict(),
            _request_timeout=timeout
        ))
def add_role(name: str, timeout: int = 60) ‑> RoleId

Add a new role. Requires admin role.

Expand source code
def add_role(name: str, timeout: int = 60) -> RoleId:
    """Add a new role. Requires admin role."""

    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        user_api = geoengine_openapi_client.UserApi(api_client)
        response = user_api.add_role_handler(geoengine_openapi_client.AddRole(
            name=name,
            _request_timeout=timeout
        ))

    # TODO: find out why JSON string is faulty
    # parsed_response = json.loads(response)
    parsed_response: dict[str, str] = ast.literal_eval(response)

    return RoleId.from_response(parsed_response)
def assign_role(role: RoleId, user: UserId, timeout: int = 60)

Assign a role to a user. Requires admin role.

Expand source code
def assign_role(role: RoleId, user: UserId, timeout: int = 60):
    """Assign a role to a user. Requires admin role."""

    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        user_api = geoengine_openapi_client.UserApi(api_client)
        user_api.assign_role_handler(str(user), str(role), _request_timeout=timeout)
def remove_permission(role: RoleId, resource: Resource, permission: Permission, timeout: int = 60)

Removes a permission to a resource from a role. Requires admin role.

Expand source code
def remove_permission(role: RoleId, resource: Resource, permission: Permission, timeout: int = 60):
    """Removes a permission to a resource from a role. Requires admin role."""

    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        permissions_api = geoengine_openapi_client.PermissionsApi(api_client)
        permissions_api.remove_permission_handler(geoengine_openapi_client.PermissionRequest(
            role_id=str(role),
            resource=resource.to_api_dict(),
            permission=permission.to_api_dict(),
            _request_timeout=timeout
        ))
def remove_role(role: RoleId, timeout: int = 60)

Remove a role. Requires admin role.

Expand source code
def remove_role(role: RoleId, timeout: int = 60):
    """Remove a role. Requires admin role."""

    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        user_api = geoengine_openapi_client.UserApi(api_client)
        user_api.remove_role_handler(str(role), _request_timeout=timeout)
def revoke_role(role: RoleId, user: UserId, timeout: int = 60)

Assign a role to a user. Requires admin role.

Expand source code
def revoke_role(role: RoleId, user: UserId, timeout: int = 60):
    """Assign a role to a user. Requires admin role."""

    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        user_api = geoengine_openapi_client.UserApi(api_client)
        user_api.revoke_role_handler(str(user), str(role), _request_timeout=timeout)

Classes

class Permission (value, names=None, *, module=None, qualname=None, type=None, start=1)

A permission

Expand source code
class Permission(str, Enum):
    '''A permission'''
    READ = 'Read'
    OWNER = 'Owner'

    def to_api_dict(self) -> geoengine_openapi_client.Permission:
        '''Convert to a dict for the API'''
        return geoengine_openapi_client.Permission(self.value)

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var OWNER
var READ

Methods

def to_api_dict(self) ‑> geoengine_openapi_client.models.permission.Permission

Convert to a dict for the API

Expand source code
def to_api_dict(self) -> geoengine_openapi_client.Permission:
    '''Convert to a dict for the API'''
    return geoengine_openapi_client.Permission(self.value)
class Resource (resource_type: "Literal['dataset', 'layer', 'layerCollection']", resource_id: str)

A wrapper for a resource id

Create a resource id

Expand source code
class Resource:
    '''A wrapper for a resource id'''

    def __init__(self, resource_type: Literal['dataset', 'layer', 'layerCollection'],
                 resource_id: str) -> None:
        '''Create a resource id'''
        self.__type = resource_type
        self.__id = resource_id

    @classmethod
    def from_layer_id(cls, layer_id: LayerId) -> Resource:
        '''Create a resource id from a layer id'''
        return Resource('layer', str(layer_id))

    @classmethod
    def from_layer_collection_id(cls, layer_collection_id: LayerCollectionId) -> Resource:
        '''Create a resource id from a layer collection id'''
        return Resource('layerCollection', str(layer_collection_id))

    @classmethod
    def from_dataset_name(cls, dataset_name: DatasetName) -> Resource:
        '''Create a resource id from a dataset id'''
        return Resource('dataset', str(dataset_name))

    def to_api_dict(self) -> geoengine_openapi_client.Resource:
        '''Convert to a dict for the API'''
        inner: Any = None

        if self.__type == "layer":
            inner = geoengine_openapi_client.LayerResource(type="layer", id=self.__id)
        elif self.__type == "layerCollection":
            inner = geoengine_openapi_client.LayerCollectionResource(type="layerCollection", id=self.__id)
        elif self.__type == "project":
            inner = geoengine_openapi_client.ProjectResource(type="project", id=self.__id)
        elif self.__type == "dataset":
            inner = geoengine_openapi_client.DatasetResource(type="dataset", id=self.__id)

        return geoengine_openapi_client.Resource(inner)

Static methods

def from_dataset_name(dataset_name: DatasetName) ‑> Resource

Create a resource id from a dataset id

Expand source code
@classmethod
def from_dataset_name(cls, dataset_name: DatasetName) -> Resource:
    '''Create a resource id from a dataset id'''
    return Resource('dataset', str(dataset_name))
def from_layer_collection_id(layer_collection_id: LayerCollectionId) ‑> Resource

Create a resource id from a layer collection id

Expand source code
@classmethod
def from_layer_collection_id(cls, layer_collection_id: LayerCollectionId) -> Resource:
    '''Create a resource id from a layer collection id'''
    return Resource('layerCollection', str(layer_collection_id))
def from_layer_id(layer_id: LayerId) ‑> Resource

Create a resource id from a layer id

Expand source code
@classmethod
def from_layer_id(cls, layer_id: LayerId) -> Resource:
    '''Create a resource id from a layer id'''
    return Resource('layer', str(layer_id))

Methods

def to_api_dict(self) ‑> geoengine_openapi_client.models.resource.Resource

Convert to a dict for the API

Expand source code
def to_api_dict(self) -> geoengine_openapi_client.Resource:
    '''Convert to a dict for the API'''
    inner: Any = None

    if self.__type == "layer":
        inner = geoengine_openapi_client.LayerResource(type="layer", id=self.__id)
    elif self.__type == "layerCollection":
        inner = geoengine_openapi_client.LayerCollectionResource(type="layerCollection", id=self.__id)
    elif self.__type == "project":
        inner = geoengine_openapi_client.ProjectResource(type="project", id=self.__id)
    elif self.__type == "dataset":
        inner = geoengine_openapi_client.DatasetResource(type="dataset", id=self.__id)

    return geoengine_openapi_client.Resource(inner)
class RoleId (role_id: UUID)

A wrapper for a role id

Expand source code
class RoleId:
    '''A wrapper for a role id'''

    def __init__(self, role_id: UUID) -> None:
        self.__role_id = role_id

    @classmethod
    def from_response(cls, response: Dict[str, str]) -> RoleId:
        '''Parse a http response to an `RoleId`'''

        if 'id' not in response:
            raise GeoEngineException(response)

        role_id = response['id']

        return RoleId(UUID(role_id))

    def __eq__(self, other) -> bool:
        '''Checks if two role ids are equal'''
        if not isinstance(other, self.__class__):
            return False

        return self.__role_id == other.__role_id  # pylint: disable=protected-access

    def __str__(self) -> str:
        return str(self.__role_id)

    def __repr__(self) -> str:
        return repr(self.__role_id)

Static methods

def from_response(response: Dict[str, str]) ‑> RoleId

Parse a http response to an RoleId

Expand source code
@classmethod
def from_response(cls, response: Dict[str, str]) -> RoleId:
    '''Parse a http response to an `RoleId`'''

    if 'id' not in response:
        raise GeoEngineException(response)

    role_id = response['id']

    return RoleId(UUID(role_id))
class UserId (user_id: UUID)

A wrapper for a role id

Expand source code
class UserId:
    '''A wrapper for a role id'''

    def __init__(self, user_id: UUID) -> None:
        self.__user_id = user_id

    @classmethod
    def from_response(cls, response: Dict[str, str]) -> UserId:
        '''Parse a http response to an `UserId`'''
        print(response)
        if 'id' not in response:
            raise GeoEngineException(response)

        user_id = response['id']

        return UserId(UUID(user_id))

    def __eq__(self, other) -> bool:
        '''Checks if two role ids are equal'''
        if not isinstance(other, self.__class__):
            return False

        return self.__user_id == other.__user_id  # pylint: disable=protected-access

    def __str__(self) -> str:
        return str(self.__user_id)

    def __repr__(self) -> str:
        return repr(self.__user_id)

Static methods

def from_response(response: Dict[str, str]) ‑> UserId

Parse a http response to an UserId

Expand source code
@classmethod
def from_response(cls, response: Dict[str, str]) -> UserId:
    '''Parse a http response to an `UserId`'''
    print(response)
    if 'id' not in response:
        raise GeoEngineException(response)

    user_id = response['id']

    return UserId(UUID(user_id))