Module geoengine.auth

Module for encapsulating Geo Engine authentication

Expand source code
'''
Module for encapsulating Geo Engine authentication
'''

from __future__ import annotations
from typing import ClassVar, Dict, Optional, Tuple
from uuid import UUID

import os
from dotenv import load_dotenv
import requests as req
from requests.auth import AuthBase

from geoengine.error import GeoEngineException, UninitializedException, NoAdminSessionException


class BearerAuth(AuthBase):  # pylint: disable=too-few-public-methods
    '''A bearer token authentication for `requests`'''

    __token: str

    def __init__(self, token: str):
        self.__token = token

    def __call__(self, r):
        r.headers['Authorization'] = f'Bearer {self.__token}'
        return r


class Session:
    '''
    A Geo Engine session
    '''

    __id: UUID
    __valid_until: Optional[str] = None
    __server_url: str
    __timeout: int = 60

    __admin_token: Optional[UUID] = None

    session: ClassVar[Optional[Session]] = None

    def __init__(self,
                 server_url: str,
                 credentials: Optional[Tuple[str, str]] = None,
                 token: Optional[str] = None,
                 admin_token: Optional[str] = None) -> None:
        '''
        Initialize communication between this library and a Geo Engine instance

        If credentials or a token are provided, the session will be authenticated.
        Credentials and token must not be provided at the same time.

        optional arguments:
         - `(email, password)` as tuple
         - `token` as a string
         - `admin_token` as a string

        optional environment variables:
         - `GEOENGINE_EMAIL`
         - `GEOENGINE_PASSWORD`
         - `GEOENGINE_TOKEN`
         - `GEOENGINE_ADMIN_TOKEN`
        '''

        session = None

        if credentials is not None and token is not None:
            raise GeoEngineException({'message': 'Cannot provide both credentials and token'})

        if credentials is not None:
            session = req.post(f'{server_url}/login', json={"email": credentials[0],
                                                            "password": credentials[1]}, timeout=self.__timeout).json()
        elif "GEOENGINE_EMAIL" in os.environ and "GEOENGINE_PASSWORD" in os.environ:
            session = req.post(f'{server_url}/login',
                               json={"email": os.environ.get("GEOENGINE_EMAIL"),
                                     "password": os.environ.get("GEOENGINE_PASSWORD")},
                               timeout=self.__timeout).json()
        elif token is not None:
            session = req.get(f'{server_url}/session', headers={'Authorization': f'Bearer {token}'},
                              timeout=self.__timeout).json()
        elif "GEOENGINE_TOKEN" in os.environ:
            session = req.get(f'{server_url}/session',
                              headers={'Authorization': f'Bearer {os.environ.get("GEOENGINE_TOKEN")}'},
                              timeout=self.__timeout).json()
        else:
            session = req.post(f'{server_url}/anonymous', timeout=self.__timeout).json()

        if 'error' in session:
            raise GeoEngineException(session)

        self.__id = session['id']

        if 'validUntil' in session:
            self.__valid_until = session['validUntil']

        self.__server_url = server_url

        if admin_token is not None:
            self.__admin_token = UUID(admin_token)
        elif "GEOENGINE_ADMIN_TOKEN" in os.environ:
            self.__admin_token = UUID(os.environ.get("GEOENGINE_ADMIN_TOKEN"))

    def __repr__(self) -> str:
        '''Display representation of a session'''
        r = ''
        r += f'Server:              {self.server_url}\n'
        r += f'Session Id:          {self.__id}\n'

        if self.__valid_until is not None:
            r += f'Session valid until: {self.__valid_until}\n'

        return r

    @property
    def auth_header(self) -> Dict[str, str]:
        '''
        Create an authentication header for the current session
        '''

        return {'Authorization': 'Bearer ' + str(self.__id)}

    @property
    def admin_auth_header(self) -> Dict[str, str]:
        '''
        Create an authentication header for the current session's admin token
        '''

        if self.__admin_token is None:
            raise NoAdminSessionException()

        return {'Authorization': 'Bearer ' + str(self.__admin_token)}

    @property
    def admin_or_normal_auth_header(self) -> Dict[str, str]:
        '''
        Create an admin authentication header if possible, else a normal authentication header for the current session
        '''

        try:
            return self.admin_auth_header
        except NoAdminSessionException:
            return self.auth_header

    @property
    def server_url(self) -> str:
        '''
        Return the server url of the current session
        '''

        return self.__server_url

    def requests_bearer_auth(self) -> BearerAuth:
        '''
        Return a Bearer authentication object for the current session
        '''

        return BearerAuth(str(self.__id))

    def logout(self):
        '''
        Logout the current session
        '''

        req.post(f'{self.server_url}/logout', headers=self.auth_header, timeout=self.__timeout)


def get_session() -> Session:
    '''
    Return the global session if it exists

    Raises an exception otherwise.
    '''

    if Session.session is None:
        raise UninitializedException()

    return Session.session


def initialize(server_url: str,
               credentials: Optional[Tuple[str, str]] = None,
               token: Optional[str] = None,
               admin_token: Optional[str] = None) -> None:
    '''
    Initialize communication between this library and a Geo Engine instance

    If credentials or a token are provided, the session will be authenticated.
    Credentials and token must not be provided at the same time.

    optional arugments: (email, password) as tuple or token as a string
    optional environment variables: GEOENGINE_EMAIL, GEOENGINE_PASSWORD, GEOENGINE_TOKEN
    optional .env file defining: GEOENGINE_EMAIL, GEOENGINE_PASSWORD, GEOENGINE_TOKEN
    '''

    load_dotenv()

    Session.session = Session(server_url, credentials, token, admin_token)


def reset(logout: bool = True) -> None:
    '''
    Resets the current session
    '''

    if Session.session is not None and logout:
        Session.session.logout()

    Session.session = None

Functions

def get_session() ‑> Session

Return the global session if it exists

Raises an exception otherwise.

Expand source code
def get_session() -> Session:
    '''
    Return the global session if it exists

    Raises an exception otherwise.
    '''

    if Session.session is None:
        raise UninitializedException()

    return Session.session
def initialize(server_url: str, credentials: Optional[Tuple[str, str]] = None, token: Optional[str] = None, admin_token: Optional[str] = None) ‑> None

Initialize communication between this library and a Geo Engine instance

If credentials or a token are provided, the session will be authenticated. Credentials and token must not be provided at the same time.

optional arugments: (email, password) as tuple or token as a string optional environment variables: GEOENGINE_EMAIL, GEOENGINE_PASSWORD, GEOENGINE_TOKEN optional .env file defining: GEOENGINE_EMAIL, GEOENGINE_PASSWORD, GEOENGINE_TOKEN

Expand source code
def initialize(server_url: str,
               credentials: Optional[Tuple[str, str]] = None,
               token: Optional[str] = None,
               admin_token: Optional[str] = None) -> None:
    '''
    Initialize communication between this library and a Geo Engine instance

    If credentials or a token are provided, the session will be authenticated.
    Credentials and token must not be provided at the same time.

    optional arugments: (email, password) as tuple or token as a string
    optional environment variables: GEOENGINE_EMAIL, GEOENGINE_PASSWORD, GEOENGINE_TOKEN
    optional .env file defining: GEOENGINE_EMAIL, GEOENGINE_PASSWORD, GEOENGINE_TOKEN
    '''

    load_dotenv()

    Session.session = Session(server_url, credentials, token, admin_token)
def reset(logout: bool = True) ‑> None

Resets the current session

Expand source code
def reset(logout: bool = True) -> None:
    '''
    Resets the current session
    '''

    if Session.session is not None and logout:
        Session.session.logout()

    Session.session = None

Classes

class BearerAuth (token: str)

A bearer token authentication for requests

Expand source code
class BearerAuth(AuthBase):  # pylint: disable=too-few-public-methods
    '''A bearer token authentication for `requests`'''

    __token: str

    def __init__(self, token: str):
        self.__token = token

    def __call__(self, r):
        r.headers['Authorization'] = f'Bearer {self.__token}'
        return r

Ancestors

  • requests.auth.AuthBase
class Session (server_url: str, credentials: Optional[Tuple[str, str]] = None, token: Optional[str] = None, admin_token: Optional[str] = None)

A Geo Engine session

Initialize communication between this library and a Geo Engine instance

If credentials or a token are provided, the session will be authenticated. Credentials and token must not be provided at the same time.

optional arguments: - (email, password) as tuple - token as a string - admin_token as a string

optional environment variables: - GEOENGINE_EMAIL - GEOENGINE_PASSWORD - GEOENGINE_TOKEN - GEOENGINE_ADMIN_TOKEN

Expand source code
class Session:
    '''
    A Geo Engine session
    '''

    __id: UUID
    __valid_until: Optional[str] = None
    __server_url: str
    __timeout: int = 60

    __admin_token: Optional[UUID] = None

    session: ClassVar[Optional[Session]] = None

    def __init__(self,
                 server_url: str,
                 credentials: Optional[Tuple[str, str]] = None,
                 token: Optional[str] = None,
                 admin_token: Optional[str] = None) -> None:
        '''
        Initialize communication between this library and a Geo Engine instance

        If credentials or a token are provided, the session will be authenticated.
        Credentials and token must not be provided at the same time.

        optional arguments:
         - `(email, password)` as tuple
         - `token` as a string
         - `admin_token` as a string

        optional environment variables:
         - `GEOENGINE_EMAIL`
         - `GEOENGINE_PASSWORD`
         - `GEOENGINE_TOKEN`
         - `GEOENGINE_ADMIN_TOKEN`
        '''

        session = None

        if credentials is not None and token is not None:
            raise GeoEngineException({'message': 'Cannot provide both credentials and token'})

        if credentials is not None:
            session = req.post(f'{server_url}/login', json={"email": credentials[0],
                                                            "password": credentials[1]}, timeout=self.__timeout).json()
        elif "GEOENGINE_EMAIL" in os.environ and "GEOENGINE_PASSWORD" in os.environ:
            session = req.post(f'{server_url}/login',
                               json={"email": os.environ.get("GEOENGINE_EMAIL"),
                                     "password": os.environ.get("GEOENGINE_PASSWORD")},
                               timeout=self.__timeout).json()
        elif token is not None:
            session = req.get(f'{server_url}/session', headers={'Authorization': f'Bearer {token}'},
                              timeout=self.__timeout).json()
        elif "GEOENGINE_TOKEN" in os.environ:
            session = req.get(f'{server_url}/session',
                              headers={'Authorization': f'Bearer {os.environ.get("GEOENGINE_TOKEN")}'},
                              timeout=self.__timeout).json()
        else:
            session = req.post(f'{server_url}/anonymous', timeout=self.__timeout).json()

        if 'error' in session:
            raise GeoEngineException(session)

        self.__id = session['id']

        if 'validUntil' in session:
            self.__valid_until = session['validUntil']

        self.__server_url = server_url

        if admin_token is not None:
            self.__admin_token = UUID(admin_token)
        elif "GEOENGINE_ADMIN_TOKEN" in os.environ:
            self.__admin_token = UUID(os.environ.get("GEOENGINE_ADMIN_TOKEN"))

    def __repr__(self) -> str:
        '''Display representation of a session'''
        r = ''
        r += f'Server:              {self.server_url}\n'
        r += f'Session Id:          {self.__id}\n'

        if self.__valid_until is not None:
            r += f'Session valid until: {self.__valid_until}\n'

        return r

    @property
    def auth_header(self) -> Dict[str, str]:
        '''
        Create an authentication header for the current session
        '''

        return {'Authorization': 'Bearer ' + str(self.__id)}

    @property
    def admin_auth_header(self) -> Dict[str, str]:
        '''
        Create an authentication header for the current session's admin token
        '''

        if self.__admin_token is None:
            raise NoAdminSessionException()

        return {'Authorization': 'Bearer ' + str(self.__admin_token)}

    @property
    def admin_or_normal_auth_header(self) -> Dict[str, str]:
        '''
        Create an admin authentication header if possible, else a normal authentication header for the current session
        '''

        try:
            return self.admin_auth_header
        except NoAdminSessionException:
            return self.auth_header

    @property
    def server_url(self) -> str:
        '''
        Return the server url of the current session
        '''

        return self.__server_url

    def requests_bearer_auth(self) -> BearerAuth:
        '''
        Return a Bearer authentication object for the current session
        '''

        return BearerAuth(str(self.__id))

    def logout(self):
        '''
        Logout the current session
        '''

        req.post(f'{self.server_url}/logout', headers=self.auth_header, timeout=self.__timeout)

Class variables

var session : ClassVar[Optional[Session]]

Instance variables

var admin_auth_header : Dict[str, str]

Create an authentication header for the current session's admin token

Expand source code
@property
def admin_auth_header(self) -> Dict[str, str]:
    '''
    Create an authentication header for the current session's admin token
    '''

    if self.__admin_token is None:
        raise NoAdminSessionException()

    return {'Authorization': 'Bearer ' + str(self.__admin_token)}
var admin_or_normal_auth_header : Dict[str, str]

Create an admin authentication header if possible, else a normal authentication header for the current session

Expand source code
@property
def admin_or_normal_auth_header(self) -> Dict[str, str]:
    '''
    Create an admin authentication header if possible, else a normal authentication header for the current session
    '''

    try:
        return self.admin_auth_header
    except NoAdminSessionException:
        return self.auth_header
var auth_header : Dict[str, str]

Create an authentication header for the current session

Expand source code
@property
def auth_header(self) -> Dict[str, str]:
    '''
    Create an authentication header for the current session
    '''

    return {'Authorization': 'Bearer ' + str(self.__id)}
var server_url : str

Return the server url of the current session

Expand source code
@property
def server_url(self) -> str:
    '''
    Return the server url of the current session
    '''

    return self.__server_url

Methods

def logout(self)

Logout the current session

Expand source code
def logout(self):
    '''
    Logout the current session
    '''

    req.post(f'{self.server_url}/logout', headers=self.auth_header, timeout=self.__timeout)
def requests_bearer_auth(self) ‑> BearerAuth

Return a Bearer authentication object for the current session

Expand source code
def requests_bearer_auth(self) -> BearerAuth:
    '''
    Return a Bearer authentication object for the current session
    '''

    return BearerAuth(str(self.__id))