Module geoengine.tasks

Module for encapsulating Geo Engine tasks API

Expand source code
'''
Module for encapsulating Geo Engine tasks API
'''

from __future__ import annotations

import time
from enum import Enum
from typing import List, Tuple
from uuid import UUID
import asyncio
import datetime

import geoengine_openapi_client
from geoengine.types import DEFAULT_ISO_TIME_FORMAT
from geoengine.auth import get_session
from geoengine.error import GeoEngineException
from geoengine import backports


class TaskId:
    '''A wrapper for a task id'''

    def __init__(self, task_id: UUID) -> None:
        self.__task_id = task_id

    @classmethod
    def from_response(cls, response: geoengine_openapi_client.TaskResponse) -> TaskId:
        '''Parse a http response to an `TaskId`'''

        return TaskId(UUID(response.task_id))

    def __eq__(self, other) -> bool:
        '''Checks if two dataset ids are equal'''
        if not isinstance(other, self.__class__):
            return False

        return self.__task_id == other.__task_id  # pylint: disable=protected-access

    def __str__(self) -> str:
        return str(self.__task_id)

    def __repr__(self) -> str:
        return repr(self.__task_id)


class TaskStatus(Enum):
    '''An enum of task status types'''

    RUNNING = "running"
    COMPLETED = "completed"
    ABORTED = "aborted"
    FAILED = "failed"


class TaskStatusInfo:  # pylint: disable=too-few-public-methods
    '''A wrapper for a task status type'''

    status: TaskStatus
    time_started: datetime.datetime

    def __init__(self, status, time_started) -> None:
        self.status = status
        self.time_started = time_started

    @classmethod
    def from_response(cls, response: geoengine_openapi_client.TaskStatus) -> TaskStatusInfo:
        '''
        Parse a http response to a `TaskStatusInfo`

        The task can be one of:
        RunningTaskStatusInfo, CompletedTaskStatusInfo, AbortedTaskStatusInfo or FailedTaskStatusInfo
        '''

        inner = response.actual_instance
        status = TaskStatus(inner.status)
        time_started = None
        if isinstance(inner,
                      (geoengine_openapi_client.RunningTaskStatus, geoengine_openapi_client.CompletedTaskStatus)) \
                and inner.time_started is not None:
            time_started = datetime.datetime.strptime(inner.time_started, DEFAULT_ISO_TIME_FORMAT)

        if isinstance(inner, geoengine_openapi_client.RunningTaskStatus):
            return RunningTaskStatusInfo(status, time_started, inner.pct_complete, inner.estimated_time_remaining,
                                         inner.info, inner.task_type, inner.description)
        if isinstance(inner, geoengine_openapi_client.CompletedTaskStatus):
            return CompletedTaskStatusInfo(status, time_started, inner.info, inner.time_total,
                                           inner.task_type, inner.description)
        if isinstance(inner, geoengine_openapi_client.AbortedTaskStatus):
            return AbortedTaskStatusInfo(status, time_started, inner.clean_up)
        if isinstance(inner, geoengine_openapi_client.FailedTaskStatus):
            return FailedTaskStatusInfo(status, time_started, inner.error, inner.clean_up)
        raise GeoEngineException(response)


class RunningTaskStatusInfo(TaskStatusInfo):
    '''A wrapper for a running task status with information about completion progress'''

    def __init__(self, status, start_time, pct_complete, estimated_time_remaining, info, task_type, description) -> None:  # pylint: disable=too-many-arguments,line-too-long
        super().__init__(status, start_time)
        self.pct_complete = pct_complete
        self.estimated_time_remaining = estimated_time_remaining
        self.info = info
        self.task_type = task_type
        self.description = description

    def __eq__(self, other):
        '''Check if two task statuses are equal'''
        if not isinstance(other, self.__class__):
            return False

        return self.status == other.status and self.pct_complete == other.pct_complete \
            and self.estimated_time_remaining == other.estimated_time_remaining and self.info == other.info \
            and self.task_type == other.task_type and self.description == other.description

    def __str__(self) -> str:
        return f"status={self.status.value}, time_started={self.time_started}, " \
            f"pct_complete={self.pct_complete}, " \
            f"estimated_time_remaining={self.estimated_time_remaining}, info={self.info}, " \
            f"task_type={self.task_type}, description={self.description}"

    def __repr__(self) -> str:
        return f"TaskStatusInfo(status={self.status.value!r}, pct_complete={self.pct_complete!r}, " \
               f"estimated_time_remaining={self.estimated_time_remaining!r}, info={self.info!r}, " \
               f"task_type={self.task_type!r}, description={self.description!r})"


class CompletedTaskStatusInfo(TaskStatusInfo):
    '''A wrapper for a completed task status with information about the completion'''

    def __init__(self, status, time_started, info, time_total, task_type, description) -> None:  # pylint: disable=too-many-arguments
        super().__init__(status, time_started)
        self.info = info
        self.time_total = time_total
        self.task_type = task_type
        self.description = description

    def __eq__(self, other):
        '''Check if two task statuses are equal'''
        if not isinstance(other, self.__class__):
            return False

        return self.status == other.status and self.info == other.info and self.time_total == other.time_total \
            and self.task_type == other.task_type and self.description == other.description

    def __str__(self) -> str:
        return f"status={self.status.value}, time_started={self.time_started}, info={self.info}, " \
            f"time_total={self.time_total}, task_type={self.task_type}, description={self.description}"

    def __repr__(self) -> str:
        return f"TaskStatusInfo(status={self.status.value!r}, time_started={self.time_started!r}, " \
            f"info = {self.info!r}, time_total = {self.time_total!r}, task_type={self.task_type!r}, " \
            f"description={self.description!r})"


class AbortedTaskStatusInfo(TaskStatusInfo):
    '''A wrapper for an aborted task status with information about the termination'''

    def __init__(self, status, time_started, clean_up) -> None:
        super().__init__(status, time_started)
        self.clean_up = clean_up

    def __eq__(self, other):
        '''Check if two task statuses are equal'''
        if not isinstance(other, self.__class__):
            return False

        return self.status == other.status and self.clean_up == other.clean_up

    def __str__(self) -> str:
        return f"status={self.status.value}, time_started={self.time_started}, clean_up={self.clean_up}"

    def __repr__(self) -> str:
        return f"TaskStatusInfo(status={self.status.value!r}, time_started={self.time_started!r}, " \
            f"clean_up={self.clean_up!r})"


class FailedTaskStatusInfo(TaskStatusInfo):
    '''A wrapper for a failed task status with information about the failure'''

    def __init__(self, status, time_started, error, clean_up) -> None:
        super().__init__(status, time_started)
        self.error = error
        self.clean_up = clean_up

    def __eq__(self, other):
        '''Check if two task statuses are equal'''
        if not isinstance(other, self.__class__):
            return False

        return self.status == other.status and self.error == other.error and self.clean_up == other.clean_up

    def __str__(self) -> str:
        return f"status={self.status.value}, time_started={self.time_started}, error={self.error}, " \
            f"clean_up={self.clean_up}"

    def __repr__(self) -> str:
        return f"TaskStatusInfo(status={self.status.value!r}, time_started={self.time_started!r}, " \
            f"error={self.error!r}, clean_up={self.clean_up!r})"


class Task:
    '''
    Holds a task id, allows querying and manipulating the task status
    '''

    def __init__(self, task_id: TaskId):
        self.__task_id = task_id

    def __eq__(self, other):
        '''Check if two task representations are equal'''
        if not isinstance(other, self.__class__):
            return False

        return self.__task_id == other.__task_id  # pylint: disable=protected-access

    def get_status(self, timeout: int = 3600) -> TaskStatusInfo:
        '''
        Returns the status of a task in a Geo Engine instance
        '''
        session = get_session()

        task_id_str = str(self.__task_id)

        with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
            tasks_api = geoengine_openapi_client.TasksApi(api_client)
            response = tasks_api.status_handler(task_id_str, _request_timeout=timeout)

        return TaskStatusInfo.from_response(response)

    def abort(self, force: bool = False, timeout: int = 3600) -> None:
        '''
        Abort a running task in a Geo Engine instance
        '''
        session = get_session()

        task_id_str = str(self.__task_id)

        with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
            tasks_api = geoengine_openapi_client.TasksApi(api_client)
            tasks_api.abort_handler(
                task_id_str,
                None if force is False else True,
                _request_timeout=timeout
            )

    def wait_for_finish(
            self,
            check_interval_seconds: float = 5,
            request_timeout: int = 3600,
            print_status: bool = True) -> TaskStatusInfo:
        '''
        Wait for the given task in a Geo Engine instance to finish (status either complete, aborted or failed).
        The status is printed after each check-in. Check-ins happen in intervals of check_interval_seconds seconds.
        '''
        current_status = self.get_status(request_timeout)

        while current_status.status == TaskStatus.RUNNING:
            current_status = self.get_status(request_timeout)

            if print_status:
                print(current_status)
            if current_status.status == TaskStatus.RUNNING:
                time.sleep(check_interval_seconds)

        return current_status

    def __str__(self) -> str:
        return str(self.__task_id)

    def __repr__(self) -> str:
        return repr(self.__task_id)

    async def as_future(
            self,
            request_interval: int = 5,
            print_status=False
    ) -> TaskStatusInfo:
        '''
        Returns a future that will be resolved when the task is finished in the backend.
        '''

        def get_status_inner(tasks_api: geoengine_openapi_client.TasksApi, task_id_str: str, timeout: int = 3600):
            return tasks_api.status_handler(task_id_str, _request_timeout=timeout)

        session = get_session()
        task_id_str = str(self.__task_id)

        last_status = None
        with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
            tasks_api = geoengine_openapi_client.TasksApi(api_client)
            while True:
                response = await backports.to_thread(get_status_inner, tasks_api, task_id_str)

                last_status = TaskStatusInfo.from_response(response)

                if print_status:
                    print(last_status)

                if last_status.status != TaskStatus.RUNNING:
                    return last_status

                await asyncio.sleep(request_interval)


def get_task_list(timeout: int = 3600) -> List[Tuple[Task, TaskStatusInfo]]:
    '''
    Returns the status of all tasks in a Geo Engine instance
    '''
    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        tasks_api = geoengine_openapi_client.TasksApi(api_client)
        response = tasks_api.list_handler(None, 0, 10, _request_timeout=timeout)

    result = []
    for item in response:
        result.append((Task(TaskId(UUID(item.task_id))), TaskStatusInfo.from_response(item)))

    return result

Functions

def get_task_list(timeout: int = 3600) ‑> List[Tuple[TaskTaskStatusInfo]]

Returns the status of all tasks in a Geo Engine instance

Expand source code
def get_task_list(timeout: int = 3600) -> List[Tuple[Task, TaskStatusInfo]]:
    '''
    Returns the status of all tasks in a Geo Engine instance
    '''
    session = get_session()

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        tasks_api = geoengine_openapi_client.TasksApi(api_client)
        response = tasks_api.list_handler(None, 0, 10, _request_timeout=timeout)

    result = []
    for item in response:
        result.append((Task(TaskId(UUID(item.task_id))), TaskStatusInfo.from_response(item)))

    return result

Classes

class AbortedTaskStatusInfo (status, time_started, clean_up)

A wrapper for an aborted task status with information about the termination

Expand source code
class AbortedTaskStatusInfo(TaskStatusInfo):
    '''A wrapper for an aborted task status with information about the termination'''

    def __init__(self, status, time_started, clean_up) -> None:
        super().__init__(status, time_started)
        self.clean_up = clean_up

    def __eq__(self, other):
        '''Check if two task statuses are equal'''
        if not isinstance(other, self.__class__):
            return False

        return self.status == other.status and self.clean_up == other.clean_up

    def __str__(self) -> str:
        return f"status={self.status.value}, time_started={self.time_started}, clean_up={self.clean_up}"

    def __repr__(self) -> str:
        return f"TaskStatusInfo(status={self.status.value!r}, time_started={self.time_started!r}, " \
            f"clean_up={self.clean_up!r})"

Ancestors

Class variables

var statusTaskStatus
var time_started : datetime.datetime

Inherited members

class CompletedTaskStatusInfo (status, time_started, info, time_total, task_type, description)

A wrapper for a completed task status with information about the completion

Expand source code
class CompletedTaskStatusInfo(TaskStatusInfo):
    '''A wrapper for a completed task status with information about the completion'''

    def __init__(self, status, time_started, info, time_total, task_type, description) -> None:  # pylint: disable=too-many-arguments
        super().__init__(status, time_started)
        self.info = info
        self.time_total = time_total
        self.task_type = task_type
        self.description = description

    def __eq__(self, other):
        '''Check if two task statuses are equal'''
        if not isinstance(other, self.__class__):
            return False

        return self.status == other.status and self.info == other.info and self.time_total == other.time_total \
            and self.task_type == other.task_type and self.description == other.description

    def __str__(self) -> str:
        return f"status={self.status.value}, time_started={self.time_started}, info={self.info}, " \
            f"time_total={self.time_total}, task_type={self.task_type}, description={self.description}"

    def __repr__(self) -> str:
        return f"TaskStatusInfo(status={self.status.value!r}, time_started={self.time_started!r}, " \
            f"info = {self.info!r}, time_total = {self.time_total!r}, task_type={self.task_type!r}, " \
            f"description={self.description!r})"

Ancestors

Class variables

var statusTaskStatus
var time_started : datetime.datetime

Inherited members

class FailedTaskStatusInfo (status, time_started, error, clean_up)

A wrapper for a failed task status with information about the failure

Expand source code
class FailedTaskStatusInfo(TaskStatusInfo):
    '''A wrapper for a failed task status with information about the failure'''

    def __init__(self, status, time_started, error, clean_up) -> None:
        super().__init__(status, time_started)
        self.error = error
        self.clean_up = clean_up

    def __eq__(self, other):
        '''Check if two task statuses are equal'''
        if not isinstance(other, self.__class__):
            return False

        return self.status == other.status and self.error == other.error and self.clean_up == other.clean_up

    def __str__(self) -> str:
        return f"status={self.status.value}, time_started={self.time_started}, error={self.error}, " \
            f"clean_up={self.clean_up}"

    def __repr__(self) -> str:
        return f"TaskStatusInfo(status={self.status.value!r}, time_started={self.time_started!r}, " \
            f"error={self.error!r}, clean_up={self.clean_up!r})"

Ancestors

Class variables

var statusTaskStatus
var time_started : datetime.datetime

Inherited members

class RunningTaskStatusInfo (status, start_time, pct_complete, estimated_time_remaining, info, task_type, description)

A wrapper for a running task status with information about completion progress

Expand source code
class RunningTaskStatusInfo(TaskStatusInfo):
    '''A wrapper for a running task status with information about completion progress'''

    def __init__(self, status, start_time, pct_complete, estimated_time_remaining, info, task_type, description) -> None:  # pylint: disable=too-many-arguments,line-too-long
        super().__init__(status, start_time)
        self.pct_complete = pct_complete
        self.estimated_time_remaining = estimated_time_remaining
        self.info = info
        self.task_type = task_type
        self.description = description

    def __eq__(self, other):
        '''Check if two task statuses are equal'''
        if not isinstance(other, self.__class__):
            return False

        return self.status == other.status and self.pct_complete == other.pct_complete \
            and self.estimated_time_remaining == other.estimated_time_remaining and self.info == other.info \
            and self.task_type == other.task_type and self.description == other.description

    def __str__(self) -> str:
        return f"status={self.status.value}, time_started={self.time_started}, " \
            f"pct_complete={self.pct_complete}, " \
            f"estimated_time_remaining={self.estimated_time_remaining}, info={self.info}, " \
            f"task_type={self.task_type}, description={self.description}"

    def __repr__(self) -> str:
        return f"TaskStatusInfo(status={self.status.value!r}, pct_complete={self.pct_complete!r}, " \
               f"estimated_time_remaining={self.estimated_time_remaining!r}, info={self.info!r}, " \
               f"task_type={self.task_type!r}, description={self.description!r})"

Ancestors

Class variables

var statusTaskStatus
var time_started : datetime.datetime

Inherited members

class Task (task_id: TaskId)

Holds a task id, allows querying and manipulating the task status

Expand source code
class Task:
    '''
    Holds a task id, allows querying and manipulating the task status
    '''

    def __init__(self, task_id: TaskId):
        self.__task_id = task_id

    def __eq__(self, other):
        '''Check if two task representations are equal'''
        if not isinstance(other, self.__class__):
            return False

        return self.__task_id == other.__task_id  # pylint: disable=protected-access

    def get_status(self, timeout: int = 3600) -> TaskStatusInfo:
        '''
        Returns the status of a task in a Geo Engine instance
        '''
        session = get_session()

        task_id_str = str(self.__task_id)

        with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
            tasks_api = geoengine_openapi_client.TasksApi(api_client)
            response = tasks_api.status_handler(task_id_str, _request_timeout=timeout)

        return TaskStatusInfo.from_response(response)

    def abort(self, force: bool = False, timeout: int = 3600) -> None:
        '''
        Abort a running task in a Geo Engine instance
        '''
        session = get_session()

        task_id_str = str(self.__task_id)

        with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
            tasks_api = geoengine_openapi_client.TasksApi(api_client)
            tasks_api.abort_handler(
                task_id_str,
                None if force is False else True,
                _request_timeout=timeout
            )

    def wait_for_finish(
            self,
            check_interval_seconds: float = 5,
            request_timeout: int = 3600,
            print_status: bool = True) -> TaskStatusInfo:
        '''
        Wait for the given task in a Geo Engine instance to finish (status either complete, aborted or failed).
        The status is printed after each check-in. Check-ins happen in intervals of check_interval_seconds seconds.
        '''
        current_status = self.get_status(request_timeout)

        while current_status.status == TaskStatus.RUNNING:
            current_status = self.get_status(request_timeout)

            if print_status:
                print(current_status)
            if current_status.status == TaskStatus.RUNNING:
                time.sleep(check_interval_seconds)

        return current_status

    def __str__(self) -> str:
        return str(self.__task_id)

    def __repr__(self) -> str:
        return repr(self.__task_id)

    async def as_future(
            self,
            request_interval: int = 5,
            print_status=False
    ) -> TaskStatusInfo:
        '''
        Returns a future that will be resolved when the task is finished in the backend.
        '''

        def get_status_inner(tasks_api: geoengine_openapi_client.TasksApi, task_id_str: str, timeout: int = 3600):
            return tasks_api.status_handler(task_id_str, _request_timeout=timeout)

        session = get_session()
        task_id_str = str(self.__task_id)

        last_status = None
        with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
            tasks_api = geoengine_openapi_client.TasksApi(api_client)
            while True:
                response = await backports.to_thread(get_status_inner, tasks_api, task_id_str)

                last_status = TaskStatusInfo.from_response(response)

                if print_status:
                    print(last_status)

                if last_status.status != TaskStatus.RUNNING:
                    return last_status

                await asyncio.sleep(request_interval)

Methods

def abort(self, force: bool = False, timeout: int = 3600) ‑> None

Abort a running task in a Geo Engine instance

Expand source code
def abort(self, force: bool = False, timeout: int = 3600) -> None:
    '''
    Abort a running task in a Geo Engine instance
    '''
    session = get_session()

    task_id_str = str(self.__task_id)

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        tasks_api = geoengine_openapi_client.TasksApi(api_client)
        tasks_api.abort_handler(
            task_id_str,
            None if force is False else True,
            _request_timeout=timeout
        )
async def as_future(self, request_interval: int = 5, print_status=False) ‑> TaskStatusInfo

Returns a future that will be resolved when the task is finished in the backend.

Expand source code
async def as_future(
        self,
        request_interval: int = 5,
        print_status=False
) -> TaskStatusInfo:
    '''
    Returns a future that will be resolved when the task is finished in the backend.
    '''

    def get_status_inner(tasks_api: geoengine_openapi_client.TasksApi, task_id_str: str, timeout: int = 3600):
        return tasks_api.status_handler(task_id_str, _request_timeout=timeout)

    session = get_session()
    task_id_str = str(self.__task_id)

    last_status = None
    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        tasks_api = geoengine_openapi_client.TasksApi(api_client)
        while True:
            response = await backports.to_thread(get_status_inner, tasks_api, task_id_str)

            last_status = TaskStatusInfo.from_response(response)

            if print_status:
                print(last_status)

            if last_status.status != TaskStatus.RUNNING:
                return last_status

            await asyncio.sleep(request_interval)
def get_status(self, timeout: int = 3600) ‑> TaskStatusInfo

Returns the status of a task in a Geo Engine instance

Expand source code
def get_status(self, timeout: int = 3600) -> TaskStatusInfo:
    '''
    Returns the status of a task in a Geo Engine instance
    '''
    session = get_session()

    task_id_str = str(self.__task_id)

    with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
        tasks_api = geoengine_openapi_client.TasksApi(api_client)
        response = tasks_api.status_handler(task_id_str, _request_timeout=timeout)

    return TaskStatusInfo.from_response(response)
def wait_for_finish(self, check_interval_seconds: float = 5, request_timeout: int = 3600, print_status: bool = True) ‑> TaskStatusInfo

Wait for the given task in a Geo Engine instance to finish (status either complete, aborted or failed). The status is printed after each check-in. Check-ins happen in intervals of check_interval_seconds seconds.

Expand source code
def wait_for_finish(
        self,
        check_interval_seconds: float = 5,
        request_timeout: int = 3600,
        print_status: bool = True) -> TaskStatusInfo:
    '''
    Wait for the given task in a Geo Engine instance to finish (status either complete, aborted or failed).
    The status is printed after each check-in. Check-ins happen in intervals of check_interval_seconds seconds.
    '''
    current_status = self.get_status(request_timeout)

    while current_status.status == TaskStatus.RUNNING:
        current_status = self.get_status(request_timeout)

        if print_status:
            print(current_status)
        if current_status.status == TaskStatus.RUNNING:
            time.sleep(check_interval_seconds)

    return current_status
class TaskId (task_id: UUID)

A wrapper for a task id

Expand source code
class TaskId:
    '''A wrapper for a task id'''

    def __init__(self, task_id: UUID) -> None:
        self.__task_id = task_id

    @classmethod
    def from_response(cls, response: geoengine_openapi_client.TaskResponse) -> TaskId:
        '''Parse a http response to an `TaskId`'''

        return TaskId(UUID(response.task_id))

    def __eq__(self, other) -> bool:
        '''Checks if two dataset ids are equal'''
        if not isinstance(other, self.__class__):
            return False

        return self.__task_id == other.__task_id  # pylint: disable=protected-access

    def __str__(self) -> str:
        return str(self.__task_id)

    def __repr__(self) -> str:
        return repr(self.__task_id)

Static methods

def from_response(response: geoengine_openapi_client.TaskResponse) ‑> TaskId

Parse a http response to an TaskId

Expand source code
@classmethod
def from_response(cls, response: geoengine_openapi_client.TaskResponse) -> TaskId:
    '''Parse a http response to an `TaskId`'''

    return TaskId(UUID(response.task_id))
class TaskStatus (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enum of task status types

Expand source code
class TaskStatus(Enum):
    '''An enum of task status types'''

    RUNNING = "running"
    COMPLETED = "completed"
    ABORTED = "aborted"
    FAILED = "failed"

Ancestors

  • enum.Enum

Class variables

var ABORTED
var COMPLETED
var FAILED
var RUNNING
class TaskStatusInfo (status, time_started)

A wrapper for a task status type

Expand source code
class TaskStatusInfo:  # pylint: disable=too-few-public-methods
    '''A wrapper for a task status type'''

    status: TaskStatus
    time_started: datetime.datetime

    def __init__(self, status, time_started) -> None:
        self.status = status
        self.time_started = time_started

    @classmethod
    def from_response(cls, response: geoengine_openapi_client.TaskStatus) -> TaskStatusInfo:
        '''
        Parse a http response to a `TaskStatusInfo`

        The task can be one of:
        RunningTaskStatusInfo, CompletedTaskStatusInfo, AbortedTaskStatusInfo or FailedTaskStatusInfo
        '''

        inner = response.actual_instance
        status = TaskStatus(inner.status)
        time_started = None
        if isinstance(inner,
                      (geoengine_openapi_client.RunningTaskStatus, geoengine_openapi_client.CompletedTaskStatus)) \
                and inner.time_started is not None:
            time_started = datetime.datetime.strptime(inner.time_started, DEFAULT_ISO_TIME_FORMAT)

        if isinstance(inner, geoengine_openapi_client.RunningTaskStatus):
            return RunningTaskStatusInfo(status, time_started, inner.pct_complete, inner.estimated_time_remaining,
                                         inner.info, inner.task_type, inner.description)
        if isinstance(inner, geoengine_openapi_client.CompletedTaskStatus):
            return CompletedTaskStatusInfo(status, time_started, inner.info, inner.time_total,
                                           inner.task_type, inner.description)
        if isinstance(inner, geoengine_openapi_client.AbortedTaskStatus):
            return AbortedTaskStatusInfo(status, time_started, inner.clean_up)
        if isinstance(inner, geoengine_openapi_client.FailedTaskStatus):
            return FailedTaskStatusInfo(status, time_started, inner.error, inner.clean_up)
        raise GeoEngineException(response)

Subclasses

Class variables

var statusTaskStatus
var time_started : datetime.datetime

Static methods

def from_response(response: geoengine_openapi_client.TaskStatus) ‑> TaskStatusInfo

Parse a http response to a TaskStatusInfo

The task can be one of: RunningTaskStatusInfo, CompletedTaskStatusInfo, AbortedTaskStatusInfo or FailedTaskStatusInfo

Expand source code
@classmethod
def from_response(cls, response: geoengine_openapi_client.TaskStatus) -> TaskStatusInfo:
    '''
    Parse a http response to a `TaskStatusInfo`

    The task can be one of:
    RunningTaskStatusInfo, CompletedTaskStatusInfo, AbortedTaskStatusInfo or FailedTaskStatusInfo
    '''

    inner = response.actual_instance
    status = TaskStatus(inner.status)
    time_started = None
    if isinstance(inner,
                  (geoengine_openapi_client.RunningTaskStatus, geoengine_openapi_client.CompletedTaskStatus)) \
            and inner.time_started is not None:
        time_started = datetime.datetime.strptime(inner.time_started, DEFAULT_ISO_TIME_FORMAT)

    if isinstance(inner, geoengine_openapi_client.RunningTaskStatus):
        return RunningTaskStatusInfo(status, time_started, inner.pct_complete, inner.estimated_time_remaining,
                                     inner.info, inner.task_type, inner.description)
    if isinstance(inner, geoengine_openapi_client.CompletedTaskStatus):
        return CompletedTaskStatusInfo(status, time_started, inner.info, inner.time_total,
                                       inner.task_type, inner.description)
    if isinstance(inner, geoengine_openapi_client.AbortedTaskStatus):
        return AbortedTaskStatusInfo(status, time_started, inner.clean_up)
    if isinstance(inner, geoengine_openapi_client.FailedTaskStatus):
        return FailedTaskStatusInfo(status, time_started, inner.error, inner.clean_up)
    raise GeoEngineException(response)