Module geoengine.workflow
A workflow representation and methods on workflows
Expand source code
'''
A workflow representation and methods on workflows
'''
# pylint: disable=too-many-lines
# TODO: split into multiple files
from __future__ import annotations
import asyncio
from collections import defaultdict
import json
from io import BytesIO
from logging import debug
from os import PathLike
from typing import Any, AsyncIterator, Dict, List, Optional, Union, Type, cast, TypedDict
from uuid import UUID
import geopandas as gpd
import pandas as pd
import numpy as np
import rasterio.io
import requests as req
import rioxarray
from PIL import Image
from owslib.util import Authentication, ResponseWrapper
from owslib.wcs import WebCoverageService
from vega import VegaLite
import websockets
import websockets.client
import xarray as xr
import pyarrow as pa
import geoengine_openapi_client
from geoengine import api
from geoengine.auth import get_session
from geoengine.error import GeoEngineException, InputException, MethodNotCalledOnPlotException, \
MethodNotCalledOnRasterException, MethodNotCalledOnVectorException
from geoengine import backports
from geoengine.types import ProvenanceEntry, QueryRectangle, RasterColorizer, ResultDescriptor, \
VectorResultDescriptor, ClassificationMeasurement
from geoengine.tasks import Task, TaskId
from geoengine.workflow_builder.operators import Operator as WorkflowBuilderOperator
from geoengine.raster import RasterTile2D
# TODO: Define as recursive type when supported in mypy: https://github.com/python/mypy/issues/731
JsonType = Union[Dict[str, Any], List[Any], int, str, float, bool, Type[None]]
Axis = TypedDict('Axis', {'title': str})
Bin = TypedDict('Bin', {'binned': bool, 'step': float})
Field = TypedDict('Field', {'field': str})
DatasetIds = TypedDict('DatasetIds', {'upload': UUID, 'dataset': UUID})
Values = TypedDict('Values', {'binStart': float, 'binEnd': float, 'Frequency': int})
X = TypedDict('X', {'field': Field, 'bin': Bin, 'axis': Axis})
X2 = TypedDict('X2', {'field': Field})
Y = TypedDict('Y', {'field': Field, 'type': str})
Encoding = TypedDict('Encoding', {'x': X, 'x2': X2, 'y': Y})
VegaSpec = TypedDict('VegaSpec', {'$schema': str, 'data': List[Values], 'mark': str, 'encoding': Encoding})
class WorkflowId:
'''
A wrapper around a workflow UUID
'''
__workflow_id: UUID
def __init__(self, workflow_id: UUID) -> None:
self.__workflow_id = workflow_id
@classmethod
def from_response(cls, response: geoengine_openapi_client.AddCollection200Response) -> WorkflowId:
'''
Create a `WorkflowId` from an http response
'''
return WorkflowId(UUID(response.id))
def __str__(self) -> str:
return str(self.__workflow_id)
def __repr__(self) -> str:
return str(self)
class RasterStreamProcessing:
'''
Helper class to process raster stream data
'''
@classmethod
def read_arrow_ipc(cls, arrow_ipc: bytes) -> pa.RecordBatch:
'''Read an Arrow IPC file from a byte array'''
reader = pa.ipc.open_file(arrow_ipc)
# We know from the backend that there is only one record batch
record_batch = reader.get_record_batch(0)
return record_batch
@classmethod
def process_bytes(cls, tile_bytes: Optional[bytes]) -> Optional[RasterTile2D]:
'''Process a tile from a byte array'''
if tile_bytes is None:
return None
# process the received data
record_batch = RasterStreamProcessing.read_arrow_ipc(tile_bytes)
tile = RasterTile2D.from_ge_record_batch(record_batch)
return tile
@classmethod
def merge_tiles(cls, tiles: List[xr.DataArray]) -> Optional[xr.DataArray]:
'''Merge a list of tiles into a single xarray'''
if len(tiles) == 0:
return None
# group the tiles by band
tiles_by_band: Dict[int, List[xr.DataArray]] = defaultdict(list)
for tile in tiles:
band = tile.band.item() # assuming 'band' is a coordinate with a single value
tiles_by_band[band].append(tile)
# build one spatial tile per band
combined_by_band = []
for band_tiles in tiles_by_band.values():
combined = xr.combine_by_coords(band_tiles)
# `combine_by_coords` always returns a `DataArray` for single variable input arrays.
# This assertion verifies this for mypy
assert isinstance(combined, xr.DataArray)
combined_by_band.append(combined)
# build one array with all bands and geo coordinates
combined_tile = xr.concat(combined_by_band, dim='band')
return combined_tile
class Workflow:
'''
Holds a workflow id and allows querying data
'''
__workflow_id: WorkflowId
__result_descriptor: ResultDescriptor
def __init__(self, workflow_id: WorkflowId) -> None:
self.__workflow_id = workflow_id
self.__result_descriptor = self.__query_result_descriptor()
def __str__(self) -> str:
return str(self.__workflow_id)
def __repr__(self) -> str:
return repr(self.__workflow_id)
def __query_result_descriptor(self, timeout: int = 60) -> ResultDescriptor:
'''
Query the metadata of the workflow result
'''
session = get_session()
with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
workflows_api = geoengine_openapi_client.WorkflowsApi(api_client)
response = workflows_api.get_workflow_metadata_handler(str(self.__workflow_id), _request_timeout=timeout)
debug(response)
return ResultDescriptor.from_response(response)
def get_result_descriptor(self) -> ResultDescriptor:
'''
Return the metadata of the workflow result
'''
return self.__result_descriptor
def workflow_definition(self, timeout: int = 60) -> geoengine_openapi_client.Workflow:
'''Return the workflow definition for this workflow'''
session = get_session()
with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
workflows_api = geoengine_openapi_client.WorkflowsApi(api_client)
response = workflows_api.load_workflow_handler(str(self.__workflow_id), _request_timeout=timeout)
return response
def get_dataframe(
self,
bbox: QueryRectangle,
timeout: int = 3600,
resolve_classifications: bool = False
) -> gpd.GeoDataFrame:
'''
Query a workflow and return the WFS result as a GeoPandas `GeoDataFrame`
'''
if not self.__result_descriptor.is_vector_result():
raise MethodNotCalledOnVectorException()
session = get_session()
with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
wfs_api = geoengine_openapi_client.OGCWFSApi(api_client)
response = wfs_api.wfs_feature_handler(
workflow=str(self.__workflow_id),
service=geoengine_openapi_client.WfsService(geoengine_openapi_client.WfsService.WFS),
request=geoengine_openapi_client.GetFeatureRequest(
geoengine_openapi_client.GetFeatureRequest.GETFEATURE
),
type_names=str(self.__workflow_id),
bbox=bbox.bbox_str,
version=geoengine_openapi_client.WfsVersion(geoengine_openapi_client.WfsVersion.ENUM_2_DOT_0_DOT_0),
time=bbox.time_str,
srs_name=bbox.srs,
query_resolution=str(bbox.spatial_resolution),
_request_timeout=timeout
)
def geo_json_with_time_to_geopandas(geo_json):
'''
GeoJson has no standard for time, so we parse the when field
separately and attach it to the data frame as columns `start`
and `end`.
'''
data = gpd.GeoDataFrame.from_features(geo_json)
data = data.set_crs(bbox.srs, allow_override=True)
start = [f['when']['start'] for f in geo_json['features']]
end = [f['when']['end'] for f in geo_json['features']]
# TODO: find a good way to infer BoT/EoT
data['start'] = gpd.pd.to_datetime(start, errors='coerce')
data['end'] = gpd.pd.to_datetime(end, errors='coerce')
return data
def transform_classifications(data: gpd.GeoDataFrame):
result_descriptor: VectorResultDescriptor = self.__result_descriptor # type: ignore
for (column, info) in result_descriptor.columns.items():
if isinstance(info.measurement, ClassificationMeasurement):
measurement: ClassificationMeasurement = info.measurement
classes = measurement.classes
data[column] = data[column].apply(lambda x: classes[x]) # pylint: disable=cell-var-from-loop
return data
result = geo_json_with_time_to_geopandas(response.to_dict())
if resolve_classifications:
result = transform_classifications(result)
return result
def wms_get_map_as_image(self, bbox: QueryRectangle, raster_colorizer: RasterColorizer) -> Image.Image:
'''Return the result of a WMS request as a PIL Image'''
if not self.__result_descriptor.is_raster_result():
raise MethodNotCalledOnRasterException()
session = get_session()
with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
wms_api = geoengine_openapi_client.OGCWMSApi(api_client)
response = wms_api.wms_map_handler(
workflow=str(self),
version=geoengine_openapi_client.WmsVersion(geoengine_openapi_client.WmsVersion.ENUM_1_DOT_3_DOT_0),
service=geoengine_openapi_client.WmsService(geoengine_openapi_client.WmsService.WMS),
request=geoengine_openapi_client.GetMapRequest(geoengine_openapi_client.GetMapRequest.GETMAP),
width=int((bbox.spatial_bounds.xmax - bbox.spatial_bounds.xmin) / bbox.spatial_resolution.x_resolution),
height=int((bbox.spatial_bounds.ymax - bbox.spatial_bounds.ymin) / bbox.spatial_resolution.y_resolution), # pylint: disable=line-too-long
bbox=bbox.bbox_ogc_str,
format=geoengine_openapi_client.GetMapFormat(geoengine_openapi_client.GetMapFormat.IMAGE_SLASH_PNG),
layers=str(self),
styles='custom:' + raster_colorizer.to_api_dict().to_json(),
crs=bbox.srs,
time=bbox.time_str
)
return Image.open(BytesIO(response))
def plot_json(self, bbox: QueryRectangle, timeout: int = 3600) -> geoengine_openapi_client.WrappedPlotOutput:
'''
Query a workflow and return the plot chart result as WrappedPlotOutput
'''
if not self.__result_descriptor.is_plot_result():
raise MethodNotCalledOnPlotException()
session = get_session()
with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
plots_api = geoengine_openapi_client.PlotsApi(api_client)
return plots_api.get_plot_handler(
bbox.bbox_str,
bbox.time_str,
str(bbox.spatial_resolution),
str(self.__workflow_id),
bbox.srs,
_request_timeout=timeout
)
def plot_chart(self, bbox: QueryRectangle, timeout: int = 3600) -> VegaLite:
'''
Query a workflow and return the plot chart result as a vega plot
'''
response = self.plot_json(bbox, timeout)
vega_spec: VegaSpec = json.loads(response.data['vegaString'])
return VegaLite(vega_spec)
def __request_wcs(
self,
bbox: QueryRectangle,
timeout=3600,
file_format: str = 'image/tiff',
force_no_data_value: Optional[float] = None
) -> ResponseWrapper:
'''
Query a workflow and return the coverage
Parameters
----------
bbox : A bounding box for the query
timeout : HTTP request timeout in seconds
file_format : The format of the returned raster
force_no_data_value: If not None, use this value as no data value for the requested raster data. \
Otherwise, use the Geo Engine will produce masked rasters.
'''
if not self.__result_descriptor.is_raster_result():
raise MethodNotCalledOnRasterException()
session = get_session()
# TODO: properly build CRS string for bbox
crs = f'urn:ogc:def:crs:{bbox.srs.replace(":", "::")}'
wcs_url = f'{session.server_url}/wcs/{self.__workflow_id}'
wcs = WebCoverageService(
wcs_url,
version='1.1.1',
auth=Authentication(auth_delegate=session.requests_bearer_auth()),
)
[resx, resy] = bbox.resolution_ogc
kwargs = {}
if force_no_data_value is not None:
kwargs["nodatavalue"] = str(float(force_no_data_value))
return wcs.getCoverage(
identifier=f'{self.__workflow_id}',
bbox=bbox.bbox_ogc,
time=[bbox.time_str],
format=file_format,
crs=crs,
resx=resx,
resy=resy,
timeout=timeout,
**kwargs
)
def __get_wcs_tiff_as_memory_file(
self,
bbox: QueryRectangle,
timeout=3600,
force_no_data_value: Optional[float] = None
) -> rasterio.io.MemoryFile:
'''
Query a workflow and return the raster result as a memory mapped GeoTiff
Parameters
----------
bbox : A bounding box for the query
timeout : HTTP request timeout in seconds
force_no_data_value: If not None, use this value as no data value for the requested raster data. \
Otherwise, use the Geo Engine will produce masked rasters.
'''
response = self.__request_wcs(bbox, timeout, 'image/tiff', force_no_data_value).read()
# response is checked via `raise_on_error` in `getCoverage` / `openUrl`
memory_file = rasterio.io.MemoryFile(response)
return memory_file
def get_array(
self,
bbox: QueryRectangle,
timeout=3600,
force_no_data_value: Optional[float] = None
) -> np.ndarray:
'''
Query a workflow and return the raster result as a numpy array
Parameters
----------
bbox : A bounding box for the query
timeout : HTTP request timeout in seconds
force_no_data_value: If not None, use this value as no data value for the requested raster data. \
Otherwise, use the Geo Engine will produce masked rasters.
'''
with self.__get_wcs_tiff_as_memory_file(
bbox,
timeout,
force_no_data_value
) as memfile, memfile.open() as dataset:
array = dataset.read(1)
return array
def get_xarray(
self,
bbox: QueryRectangle,
timeout=3600,
force_no_data_value: Optional[float] = None
) -> xr.DataArray:
'''
Query a workflow and return the raster result as a georeferenced xarray
Parameters
----------
bbox : A bounding box for the query
timeout : HTTP request timeout in seconds
force_no_data_value: If not None, use this value as no data value for the requested raster data. \
Otherwise, use the Geo Engine will produce masked rasters.
'''
with self.__get_wcs_tiff_as_memory_file(
bbox,
timeout,
force_no_data_value
) as memfile, memfile.open() as dataset:
data_array = rioxarray.open_rasterio(dataset)
# helping mypy with inference
assert isinstance(data_array, xr.DataArray)
rio: xr.DataArray = data_array.rio
rio.update_attrs({
'crs': rio.crs,
'res': rio.resolution(),
'transform': rio.transform(),
}, inplace=True)
# TODO: add time information to dataset
return data_array.load()
# pylint: disable=too-many-arguments,too-many-positional-arguments
def download_raster(
self,
bbox: QueryRectangle,
file_path: str,
timeout=3600,
file_format: str = 'image/tiff',
force_no_data_value: Optional[float] = None
) -> None:
'''
Query a workflow and save the raster result as a file on disk
Parameters
----------
bbox : A bounding box for the query
file_path : The path to the file to save the raster to
timeout : HTTP request timeout in seconds
file_format : The format of the returned raster
force_no_data_value: If not None, use this value as no data value for the requested raster data. \
Otherwise, use the Geo Engine will produce masked rasters.
'''
response = self.__request_wcs(bbox, timeout, file_format, force_no_data_value)
with open(file_path, 'wb') as file:
file.write(response.read())
def get_provenance(self, timeout: int = 60) -> List[ProvenanceEntry]:
'''
Query the provenance of the workflow
'''
session = get_session()
with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
workflows_api = geoengine_openapi_client.WorkflowsApi(api_client)
response = workflows_api.get_workflow_provenance_handler(str(self.__workflow_id), _request_timeout=timeout)
return [ProvenanceEntry.from_response(item) for item in response]
def metadata_zip(self, path: Union[PathLike, BytesIO], timeout: int = 60) -> None:
'''
Query workflow metadata and citations and stores it as zip file to `path`
'''
session = get_session()
with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
workflows_api = geoengine_openapi_client.WorkflowsApi(api_client)
response = workflows_api.get_workflow_all_metadata_zip_handler(
str(self.__workflow_id),
_request_timeout=timeout
)
if isinstance(path, BytesIO):
path.write(response)
else:
with open(path, 'wb') as file:
file.write(response)
# pylint: disable=too-many-positional-arguments,too-many-positional-arguments
def save_as_dataset(
self,
query_rectangle: geoengine_openapi_client.RasterQueryRectangle,
name: Optional[str],
display_name: str,
description: str = '',
timeout: int = 3600) -> Task:
'''Init task to store the workflow result as a layer'''
# Currently, it only works for raster results
if not self.__result_descriptor.is_raster_result():
raise MethodNotCalledOnRasterException()
session = get_session()
with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
workflows_api = geoengine_openapi_client.WorkflowsApi(api_client)
response = workflows_api.dataset_from_workflow_handler(
str(self.__workflow_id),
geoengine_openapi_client.RasterDatasetFromWorkflow(
name=name,
display_name=display_name,
description=description,
query=query_rectangle
),
_request_timeout=timeout
)
return Task(TaskId.from_response(response))
async def raster_stream(
self,
query_rectangle: QueryRectangle,
open_timeout: int = 60,
bands: Optional[List[int]] = None # TODO: move into query rectangle?
) -> AsyncIterator[RasterTile2D]:
'''Stream the workflow result as series of RasterTile2D (transformable to numpy and xarray)'''
if bands is None:
bands = [0]
if len(bands) == 0:
raise InputException('At least one band must be specified')
# Currently, it only works for raster results
if not self.__result_descriptor.is_raster_result():
raise MethodNotCalledOnRasterException()
session = get_session()
url = req.Request(
'GET',
url=f'{session.server_url}/workflow/{self.__workflow_id}/rasterStream',
params={
'resultType': 'arrow',
'spatialBounds': query_rectangle.bbox_str,
'timeInterval': query_rectangle.time_str,
'spatialResolution': str(query_rectangle.spatial_resolution),
'attributes': ','.join(map(str, bands))
},
).prepare().url
if url is None:
raise InputException('Invalid websocket url')
async with websockets.client.connect(
uri=self.__replace_http_with_ws(url),
extra_headers=session.auth_header,
open_timeout=open_timeout,
max_size=None,
) as websocket:
tile_bytes: Optional[bytes] = None
while websocket.open:
async def read_new_bytes() -> Optional[bytes]:
# already send the next request to speed up the process
try:
await websocket.send("NEXT")
except websockets.exceptions.ConnectionClosed:
# the websocket connection is already closed, we cannot read anymore
return None
try:
data: Union[str, bytes] = await websocket.recv()
if isinstance(data, str):
# the server sent an error message
raise GeoEngineException({'error': data})
return data
except websockets.exceptions.ConnectionClosedOK:
# the websocket connection closed gracefully, so we stop reading
return None
(tile_bytes, tile) = await asyncio.gather(
read_new_bytes(),
# asyncio.to_thread(process_bytes, tile_bytes), # TODO: use this when min Python version is 3.9
backports.to_thread(RasterStreamProcessing.process_bytes, tile_bytes),
)
if tile is not None:
yield tile
# process the last tile
tile = RasterStreamProcessing.process_bytes(tile_bytes)
if tile is not None:
yield tile
async def raster_stream_into_xarray(
self,
query_rectangle: QueryRectangle,
clip_to_query_rectangle: bool = False,
open_timeout: int = 60,
bands: Optional[List[int]] = None # TODO: move into query rectangle?
) -> xr.DataArray:
'''
Stream the workflow result into memory and output a single xarray.
NOTE: You can run out of memory if the query rectangle is too large.
'''
if bands is None:
bands = [0]
if len(bands) == 0:
raise InputException('At least one band must be specified')
tile_stream = self.raster_stream(
query_rectangle,
open_timeout=open_timeout,
bands=bands
)
timestep_xarrays: List[xr.DataArray] = []
spatial_clip_bounds = query_rectangle.spatial_bounds if clip_to_query_rectangle else None
async def read_tiles(
remainder_tile: Optional[RasterTile2D]
) -> tuple[List[xr.DataArray], Optional[RasterTile2D]]:
last_timestep: Optional[np.datetime64] = None
tiles = []
if remainder_tile is not None:
last_timestep = remainder_tile.time_start_ms
xr_tile = remainder_tile.to_xarray(clip_with_bounds=spatial_clip_bounds)
tiles.append(xr_tile)
async for tile in tile_stream:
timestep: np.datetime64 = tile.time_start_ms
if last_timestep is None:
last_timestep = timestep
elif last_timestep != timestep:
return tiles, tile
xr_tile = tile.to_xarray(clip_with_bounds=spatial_clip_bounds)
tiles.append(xr_tile)
# this seems to be the last time step, so just return tiles
return tiles, None
(tiles, remainder_tile) = await read_tiles(None)
while len(tiles):
((new_tiles, new_remainder_tile), new_timestep_xarray) = await asyncio.gather(
read_tiles(remainder_tile),
backports.to_thread(RasterStreamProcessing.merge_tiles, tiles)
# asyncio.to_thread(merge_tiles, tiles), # TODO: use this when min Python version is 3.9
)
tiles = new_tiles
remainder_tile = new_remainder_tile
if new_timestep_xarray is not None:
timestep_xarrays.append(new_timestep_xarray)
output: xr.DataArray = cast(
xr.DataArray,
# await asyncio.to_thread( # TODO: use this when min Python version is 3.9
await backports.to_thread(
xr.concat,
# TODO: This is a typings error, since the method accepts also a `xr.DataArray` and returns one
cast(List[xr.Dataset], timestep_xarrays),
dim='time'
)
)
return output
async def vector_stream(
self,
query_rectangle: QueryRectangle,
time_start_column: str = 'time_start',
time_end_column: str = 'time_end',
open_timeout: int = 60) -> AsyncIterator[gpd.GeoDataFrame]:
'''Stream the workflow result as series of `GeoDataFrame`s'''
def read_arrow_ipc(arrow_ipc: bytes) -> pa.RecordBatch:
reader = pa.ipc.open_file(arrow_ipc)
# We know from the backend that there is only one record batch
record_batch = reader.get_record_batch(0)
return record_batch
def create_geo_data_frame(record_batch: pa.RecordBatch,
time_start_column: str,
time_end_column: str) -> gpd.GeoDataFrame:
metadata = record_batch.schema.metadata
spatial_reference = metadata[b'spatialReference'].decode('utf-8')
data_frame = record_batch.to_pandas()
geometry = gpd.GeoSeries.from_wkt(data_frame[api.GEOMETRY_COLUMN_NAME])
del data_frame[api.GEOMETRY_COLUMN_NAME] # delete the duplicated column
geo_data_frame = gpd.GeoDataFrame(
data_frame,
geometry=geometry,
crs=spatial_reference,
)
# split time column
geo_data_frame[[time_start_column, time_end_column]] = geo_data_frame[api.TIME_COLUMN_NAME].tolist()
del geo_data_frame[api.TIME_COLUMN_NAME] # delete the duplicated column
# parse time columns
for time_column in [time_start_column, time_end_column]:
geo_data_frame[time_column] = pd.to_datetime(
geo_data_frame[time_column],
utc=True,
unit='ms',
# TODO: solve time conversion problem from Geo Engine to Python for large (+/-) time instances
errors='coerce',
)
return geo_data_frame
def process_bytes(batch_bytes: Optional[bytes]) -> Optional[gpd.GeoDataFrame]:
if batch_bytes is None:
return None
# process the received data
record_batch = read_arrow_ipc(batch_bytes)
tile = create_geo_data_frame(
record_batch,
time_start_column=time_start_column,
time_end_column=time_end_column,
)
return tile
# Currently, it only works for raster results
if not self.__result_descriptor.is_vector_result():
raise MethodNotCalledOnVectorException()
session = get_session()
url = req.Request(
'GET',
url=f'{session.server_url}/workflow/{self.__workflow_id}/vectorStream',
params={
'resultType': 'arrow',
'spatialBounds': query_rectangle.bbox_str,
'timeInterval': query_rectangle.time_str,
'spatialResolution': str(query_rectangle.spatial_resolution),
},
).prepare().url
if url is None:
raise InputException('Invalid websocket url')
async with websockets.client.connect(
uri=self.__replace_http_with_ws(url),
extra_headers=session.auth_header,
open_timeout=open_timeout,
max_size=None, # allow arbitrary large messages, since it is capped by the server's chunk size
) as websocket:
batch_bytes: Optional[bytes] = None
while websocket.open:
async def read_new_bytes() -> Optional[bytes]:
# already send the next request to speed up the process
try:
await websocket.send("NEXT")
except websockets.exceptions.ConnectionClosed:
# the websocket connection is already closed, we cannot read anymore
return None
try:
data: Union[str, bytes] = await websocket.recv()
if isinstance(data, str):
# the server sent an error message
raise GeoEngineException({'error': data})
return data
except websockets.exceptions.ConnectionClosedOK:
# the websocket connection closed gracefully, so we stop reading
return None
(batch_bytes, batch) = await asyncio.gather(
read_new_bytes(),
# asyncio.to_thread(process_bytes, batch_bytes), # TODO: use this when min Python version is 3.9
backports.to_thread(process_bytes, batch_bytes),
)
if batch is not None:
yield batch
# process the last tile
batch = process_bytes(batch_bytes)
if batch is not None:
yield batch
async def vector_stream_into_geopandas(
self,
query_rectangle: QueryRectangle,
time_start_column: str = 'time_start',
time_end_column: str = 'time_end',
open_timeout: int = 60) -> gpd.GeoDataFrame:
'''
Stream the workflow result into memory and output a single geo data frame.
NOTE: You can run out of memory if the query rectangle is too large.
'''
chunk_stream = self.vector_stream(
query_rectangle,
time_start_column=time_start_column,
time_end_column=time_end_column,
open_timeout=open_timeout,
)
data_frame: Optional[gpd.GeoDataFrame] = None
chunk: Optional[gpd.GeoDataFrame] = None
async def read_dataframe() -> Optional[gpd.GeoDataFrame]:
try:
return await chunk_stream.__anext__()
except StopAsyncIteration:
return None
def merge_dataframes(
df_a: Optional[gpd.GeoDataFrame],
df_b: Optional[gpd.GeoDataFrame]
) -> Optional[gpd.GeoDataFrame]:
if df_a is None:
return df_b
if df_b is None:
return df_a
return pd.concat([df_a, df_b], ignore_index=True)
while True:
(chunk, data_frame) = await asyncio.gather(
read_dataframe(),
backports.to_thread(merge_dataframes, data_frame, chunk),
# TODO: use this when min Python version is 3.9
# asyncio.to_thread(merge_dataframes, data_frame, chunk),
)
# we can stop when the chunk stream is exhausted
if chunk is None:
break
return data_frame
def __replace_http_with_ws(self, url: str) -> str:
'''
Replace the protocol in the url from `http` to `ws`.
For the websockets library, it is necessary that the url starts with `ws://`.
For HTTPS, we need to use `wss://` instead.
'''
[protocol, url_part] = url.split('://', maxsplit=1)
ws_prefix = 'wss://' if 's' in protocol.lower() else 'ws://'
return f'{ws_prefix}{url_part}'
def register_workflow(workflow: Union[Dict[str, Any], WorkflowBuilderOperator], timeout: int = 60) -> Workflow:
'''
Register a workflow in Geo Engine and receive a `WorkflowId`
'''
if isinstance(workflow, WorkflowBuilderOperator):
workflow = workflow.to_workflow_dict()
workflow_model = geoengine_openapi_client.Workflow.from_dict(workflow)
if workflow_model is None:
raise InputException("Invalid workflow definition")
session = get_session()
with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
workflows_api = geoengine_openapi_client.WorkflowsApi(api_client)
response = workflows_api.register_workflow_handler(workflow_model, _request_timeout=timeout)
return Workflow(WorkflowId.from_response(response))
def workflow_by_id(workflow_id: UUID) -> Workflow:
'''
Create a workflow object from a workflow id
'''
# TODO: check that workflow exists
return Workflow(WorkflowId(workflow_id))
def get_quota(user_id: Optional[UUID] = None, timeout: int = 60) -> geoengine_openapi_client.Quota:
'''
Gets a user's quota. Only admins can get other users' quota.
'''
session = get_session()
with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
user_api = geoengine_openapi_client.UserApi(api_client)
if user_id is None:
return user_api.quota_handler(_request_timeout=timeout)
return user_api.get_user_quota_handler(str(user_id), _request_timeout=timeout)
def update_quota(user_id: UUID, new_available_quota: int, timeout: int = 60) -> None:
'''
Update a user's quota. Only admins can perform this operation.
'''
session = get_session()
with geoengine_openapi_client.ApiClient(session.configuration) as api_client:
user_api = geoengine_openapi_client.UserApi(api_client)
user_api.update_user_quota_handler(
str(user_id),
geoengine_openapi_client.UpdateQuota(
available=new_available_quota
),
_request_timeout=timeout
)
Functions
def get_quota(user_id: Optional[UUID] = None, timeout: int = 60) ‑> geoengine_openapi_client.models.quota.Quota
-
Gets a user's quota. Only admins can get other users' quota.
Expand source code
def get_quota(user_id: Optional[UUID] = None, timeout: int = 60) -> geoengine_openapi_client.Quota: ''' Gets a user's quota. Only admins can get other users' quota. ''' session = get_session() with geoengine_openapi_client.ApiClient(session.configuration) as api_client: user_api = geoengine_openapi_client.UserApi(api_client) if user_id is None: return user_api.quota_handler(_request_timeout=timeout) return user_api.get_user_quota_handler(str(user_id), _request_timeout=timeout)
def register_workflow(workflow: Union[Dict[str, Any], WorkflowBuilderOperator], timeout: int = 60) ‑> Workflow
-
Register a workflow in Geo Engine and receive a
WorkflowId
Expand source code
def register_workflow(workflow: Union[Dict[str, Any], WorkflowBuilderOperator], timeout: int = 60) -> Workflow: ''' Register a workflow in Geo Engine and receive a `WorkflowId` ''' if isinstance(workflow, WorkflowBuilderOperator): workflow = workflow.to_workflow_dict() workflow_model = geoengine_openapi_client.Workflow.from_dict(workflow) if workflow_model is None: raise InputException("Invalid workflow definition") session = get_session() with geoengine_openapi_client.ApiClient(session.configuration) as api_client: workflows_api = geoengine_openapi_client.WorkflowsApi(api_client) response = workflows_api.register_workflow_handler(workflow_model, _request_timeout=timeout) return Workflow(WorkflowId.from_response(response))
def update_quota(user_id: UUID, new_available_quota: int, timeout: int = 60) ‑> None
-
Update a user's quota. Only admins can perform this operation.
Expand source code
def update_quota(user_id: UUID, new_available_quota: int, timeout: int = 60) -> None: ''' Update a user's quota. Only admins can perform this operation. ''' session = get_session() with geoengine_openapi_client.ApiClient(session.configuration) as api_client: user_api = geoengine_openapi_client.UserApi(api_client) user_api.update_user_quota_handler( str(user_id), geoengine_openapi_client.UpdateQuota( available=new_available_quota ), _request_timeout=timeout )
def workflow_by_id(workflow_id: UUID) ‑> Workflow
-
Create a workflow object from a workflow id
Expand source code
def workflow_by_id(workflow_id: UUID) -> Workflow: ''' Create a workflow object from a workflow id ''' # TODO: check that workflow exists return Workflow(WorkflowId(workflow_id))
Classes
class Axis (*args, **kwargs)
-
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
Ancestors
- builtins.dict
Class variables
var title : str
class Bin (*args, **kwargs)
-
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
Ancestors
- builtins.dict
Class variables
var binned : bool
var step : float
class DatasetIds (*args, **kwargs)
-
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
Ancestors
- builtins.dict
Class variables
var dataset : uuid.UUID
var upload : uuid.UUID
class Encoding (*args, **kwargs)
-
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
Ancestors
- builtins.dict
Class variables
var x : X
var x2 : X2
var y : Y
class Field (*args, **kwargs)
-
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
Ancestors
- builtins.dict
Class variables
var field : str
class RasterStreamProcessing
-
Helper class to process raster stream data
Expand source code
class RasterStreamProcessing: ''' Helper class to process raster stream data ''' @classmethod def read_arrow_ipc(cls, arrow_ipc: bytes) -> pa.RecordBatch: '''Read an Arrow IPC file from a byte array''' reader = pa.ipc.open_file(arrow_ipc) # We know from the backend that there is only one record batch record_batch = reader.get_record_batch(0) return record_batch @classmethod def process_bytes(cls, tile_bytes: Optional[bytes]) -> Optional[RasterTile2D]: '''Process a tile from a byte array''' if tile_bytes is None: return None # process the received data record_batch = RasterStreamProcessing.read_arrow_ipc(tile_bytes) tile = RasterTile2D.from_ge_record_batch(record_batch) return tile @classmethod def merge_tiles(cls, tiles: List[xr.DataArray]) -> Optional[xr.DataArray]: '''Merge a list of tiles into a single xarray''' if len(tiles) == 0: return None # group the tiles by band tiles_by_band: Dict[int, List[xr.DataArray]] = defaultdict(list) for tile in tiles: band = tile.band.item() # assuming 'band' is a coordinate with a single value tiles_by_band[band].append(tile) # build one spatial tile per band combined_by_band = [] for band_tiles in tiles_by_band.values(): combined = xr.combine_by_coords(band_tiles) # `combine_by_coords` always returns a `DataArray` for single variable input arrays. # This assertion verifies this for mypy assert isinstance(combined, xr.DataArray) combined_by_band.append(combined) # build one array with all bands and geo coordinates combined_tile = xr.concat(combined_by_band, dim='band') return combined_tile
Static methods
def merge_tiles(tiles: List[xr.DataArray]) ‑> Optional[xarray.core.dataarray.DataArray]
-
Merge a list of tiles into a single xarray
Expand source code
@classmethod def merge_tiles(cls, tiles: List[xr.DataArray]) -> Optional[xr.DataArray]: '''Merge a list of tiles into a single xarray''' if len(tiles) == 0: return None # group the tiles by band tiles_by_band: Dict[int, List[xr.DataArray]] = defaultdict(list) for tile in tiles: band = tile.band.item() # assuming 'band' is a coordinate with a single value tiles_by_band[band].append(tile) # build one spatial tile per band combined_by_band = [] for band_tiles in tiles_by_band.values(): combined = xr.combine_by_coords(band_tiles) # `combine_by_coords` always returns a `DataArray` for single variable input arrays. # This assertion verifies this for mypy assert isinstance(combined, xr.DataArray) combined_by_band.append(combined) # build one array with all bands and geo coordinates combined_tile = xr.concat(combined_by_band, dim='band') return combined_tile
def process_bytes(tile_bytes: Optional[bytes]) ‑> Optional[RasterTile2D]
-
Process a tile from a byte array
Expand source code
@classmethod def process_bytes(cls, tile_bytes: Optional[bytes]) -> Optional[RasterTile2D]: '''Process a tile from a byte array''' if tile_bytes is None: return None # process the received data record_batch = RasterStreamProcessing.read_arrow_ipc(tile_bytes) tile = RasterTile2D.from_ge_record_batch(record_batch) return tile
def read_arrow_ipc(arrow_ipc: bytes) ‑> pyarrow.lib.RecordBatch
-
Read an Arrow IPC file from a byte array
Expand source code
@classmethod def read_arrow_ipc(cls, arrow_ipc: bytes) -> pa.RecordBatch: '''Read an Arrow IPC file from a byte array''' reader = pa.ipc.open_file(arrow_ipc) # We know from the backend that there is only one record batch record_batch = reader.get_record_batch(0) return record_batch
class Values (*args, **kwargs)
-
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
Ancestors
- builtins.dict
Class variables
var Frequency : int
var binEnd : float
var binStart : float
class VegaSpec (*args, **kwargs)
-
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
Ancestors
- builtins.dict
Class variables
var $schema : str
var data : List[Values]
var encoding : Encoding
var mark : str
class Workflow (workflow_id: WorkflowId)
-
Holds a workflow id and allows querying data
Expand source code
class Workflow: ''' Holds a workflow id and allows querying data ''' __workflow_id: WorkflowId __result_descriptor: ResultDescriptor def __init__(self, workflow_id: WorkflowId) -> None: self.__workflow_id = workflow_id self.__result_descriptor = self.__query_result_descriptor() def __str__(self) -> str: return str(self.__workflow_id) def __repr__(self) -> str: return repr(self.__workflow_id) def __query_result_descriptor(self, timeout: int = 60) -> ResultDescriptor: ''' Query the metadata of the workflow result ''' session = get_session() with geoengine_openapi_client.ApiClient(session.configuration) as api_client: workflows_api = geoengine_openapi_client.WorkflowsApi(api_client) response = workflows_api.get_workflow_metadata_handler(str(self.__workflow_id), _request_timeout=timeout) debug(response) return ResultDescriptor.from_response(response) def get_result_descriptor(self) -> ResultDescriptor: ''' Return the metadata of the workflow result ''' return self.__result_descriptor def workflow_definition(self, timeout: int = 60) -> geoengine_openapi_client.Workflow: '''Return the workflow definition for this workflow''' session = get_session() with geoengine_openapi_client.ApiClient(session.configuration) as api_client: workflows_api = geoengine_openapi_client.WorkflowsApi(api_client) response = workflows_api.load_workflow_handler(str(self.__workflow_id), _request_timeout=timeout) return response def get_dataframe( self, bbox: QueryRectangle, timeout: int = 3600, resolve_classifications: bool = False ) -> gpd.GeoDataFrame: ''' Query a workflow and return the WFS result as a GeoPandas `GeoDataFrame` ''' if not self.__result_descriptor.is_vector_result(): raise MethodNotCalledOnVectorException() session = get_session() with geoengine_openapi_client.ApiClient(session.configuration) as api_client: wfs_api = geoengine_openapi_client.OGCWFSApi(api_client) response = wfs_api.wfs_feature_handler( workflow=str(self.__workflow_id), service=geoengine_openapi_client.WfsService(geoengine_openapi_client.WfsService.WFS), request=geoengine_openapi_client.GetFeatureRequest( geoengine_openapi_client.GetFeatureRequest.GETFEATURE ), type_names=str(self.__workflow_id), bbox=bbox.bbox_str, version=geoengine_openapi_client.WfsVersion(geoengine_openapi_client.WfsVersion.ENUM_2_DOT_0_DOT_0), time=bbox.time_str, srs_name=bbox.srs, query_resolution=str(bbox.spatial_resolution), _request_timeout=timeout ) def geo_json_with_time_to_geopandas(geo_json): ''' GeoJson has no standard for time, so we parse the when field separately and attach it to the data frame as columns `start` and `end`. ''' data = gpd.GeoDataFrame.from_features(geo_json) data = data.set_crs(bbox.srs, allow_override=True) start = [f['when']['start'] for f in geo_json['features']] end = [f['when']['end'] for f in geo_json['features']] # TODO: find a good way to infer BoT/EoT data['start'] = gpd.pd.to_datetime(start, errors='coerce') data['end'] = gpd.pd.to_datetime(end, errors='coerce') return data def transform_classifications(data: gpd.GeoDataFrame): result_descriptor: VectorResultDescriptor = self.__result_descriptor # type: ignore for (column, info) in result_descriptor.columns.items(): if isinstance(info.measurement, ClassificationMeasurement): measurement: ClassificationMeasurement = info.measurement classes = measurement.classes data[column] = data[column].apply(lambda x: classes[x]) # pylint: disable=cell-var-from-loop return data result = geo_json_with_time_to_geopandas(response.to_dict()) if resolve_classifications: result = transform_classifications(result) return result def wms_get_map_as_image(self, bbox: QueryRectangle, raster_colorizer: RasterColorizer) -> Image.Image: '''Return the result of a WMS request as a PIL Image''' if not self.__result_descriptor.is_raster_result(): raise MethodNotCalledOnRasterException() session = get_session() with geoengine_openapi_client.ApiClient(session.configuration) as api_client: wms_api = geoengine_openapi_client.OGCWMSApi(api_client) response = wms_api.wms_map_handler( workflow=str(self), version=geoengine_openapi_client.WmsVersion(geoengine_openapi_client.WmsVersion.ENUM_1_DOT_3_DOT_0), service=geoengine_openapi_client.WmsService(geoengine_openapi_client.WmsService.WMS), request=geoengine_openapi_client.GetMapRequest(geoengine_openapi_client.GetMapRequest.GETMAP), width=int((bbox.spatial_bounds.xmax - bbox.spatial_bounds.xmin) / bbox.spatial_resolution.x_resolution), height=int((bbox.spatial_bounds.ymax - bbox.spatial_bounds.ymin) / bbox.spatial_resolution.y_resolution), # pylint: disable=line-too-long bbox=bbox.bbox_ogc_str, format=geoengine_openapi_client.GetMapFormat(geoengine_openapi_client.GetMapFormat.IMAGE_SLASH_PNG), layers=str(self), styles='custom:' + raster_colorizer.to_api_dict().to_json(), crs=bbox.srs, time=bbox.time_str ) return Image.open(BytesIO(response)) def plot_json(self, bbox: QueryRectangle, timeout: int = 3600) -> geoengine_openapi_client.WrappedPlotOutput: ''' Query a workflow and return the plot chart result as WrappedPlotOutput ''' if not self.__result_descriptor.is_plot_result(): raise MethodNotCalledOnPlotException() session = get_session() with geoengine_openapi_client.ApiClient(session.configuration) as api_client: plots_api = geoengine_openapi_client.PlotsApi(api_client) return plots_api.get_plot_handler( bbox.bbox_str, bbox.time_str, str(bbox.spatial_resolution), str(self.__workflow_id), bbox.srs, _request_timeout=timeout ) def plot_chart(self, bbox: QueryRectangle, timeout: int = 3600) -> VegaLite: ''' Query a workflow and return the plot chart result as a vega plot ''' response = self.plot_json(bbox, timeout) vega_spec: VegaSpec = json.loads(response.data['vegaString']) return VegaLite(vega_spec) def __request_wcs( self, bbox: QueryRectangle, timeout=3600, file_format: str = 'image/tiff', force_no_data_value: Optional[float] = None ) -> ResponseWrapper: ''' Query a workflow and return the coverage Parameters ---------- bbox : A bounding box for the query timeout : HTTP request timeout in seconds file_format : The format of the returned raster force_no_data_value: If not None, use this value as no data value for the requested raster data. \ Otherwise, use the Geo Engine will produce masked rasters. ''' if not self.__result_descriptor.is_raster_result(): raise MethodNotCalledOnRasterException() session = get_session() # TODO: properly build CRS string for bbox crs = f'urn:ogc:def:crs:{bbox.srs.replace(":", "::")}' wcs_url = f'{session.server_url}/wcs/{self.__workflow_id}' wcs = WebCoverageService( wcs_url, version='1.1.1', auth=Authentication(auth_delegate=session.requests_bearer_auth()), ) [resx, resy] = bbox.resolution_ogc kwargs = {} if force_no_data_value is not None: kwargs["nodatavalue"] = str(float(force_no_data_value)) return wcs.getCoverage( identifier=f'{self.__workflow_id}', bbox=bbox.bbox_ogc, time=[bbox.time_str], format=file_format, crs=crs, resx=resx, resy=resy, timeout=timeout, **kwargs ) def __get_wcs_tiff_as_memory_file( self, bbox: QueryRectangle, timeout=3600, force_no_data_value: Optional[float] = None ) -> rasterio.io.MemoryFile: ''' Query a workflow and return the raster result as a memory mapped GeoTiff Parameters ---------- bbox : A bounding box for the query timeout : HTTP request timeout in seconds force_no_data_value: If not None, use this value as no data value for the requested raster data. \ Otherwise, use the Geo Engine will produce masked rasters. ''' response = self.__request_wcs(bbox, timeout, 'image/tiff', force_no_data_value).read() # response is checked via `raise_on_error` in `getCoverage` / `openUrl` memory_file = rasterio.io.MemoryFile(response) return memory_file def get_array( self, bbox: QueryRectangle, timeout=3600, force_no_data_value: Optional[float] = None ) -> np.ndarray: ''' Query a workflow and return the raster result as a numpy array Parameters ---------- bbox : A bounding box for the query timeout : HTTP request timeout in seconds force_no_data_value: If not None, use this value as no data value for the requested raster data. \ Otherwise, use the Geo Engine will produce masked rasters. ''' with self.__get_wcs_tiff_as_memory_file( bbox, timeout, force_no_data_value ) as memfile, memfile.open() as dataset: array = dataset.read(1) return array def get_xarray( self, bbox: QueryRectangle, timeout=3600, force_no_data_value: Optional[float] = None ) -> xr.DataArray: ''' Query a workflow and return the raster result as a georeferenced xarray Parameters ---------- bbox : A bounding box for the query timeout : HTTP request timeout in seconds force_no_data_value: If not None, use this value as no data value for the requested raster data. \ Otherwise, use the Geo Engine will produce masked rasters. ''' with self.__get_wcs_tiff_as_memory_file( bbox, timeout, force_no_data_value ) as memfile, memfile.open() as dataset: data_array = rioxarray.open_rasterio(dataset) # helping mypy with inference assert isinstance(data_array, xr.DataArray) rio: xr.DataArray = data_array.rio rio.update_attrs({ 'crs': rio.crs, 'res': rio.resolution(), 'transform': rio.transform(), }, inplace=True) # TODO: add time information to dataset return data_array.load() # pylint: disable=too-many-arguments,too-many-positional-arguments def download_raster( self, bbox: QueryRectangle, file_path: str, timeout=3600, file_format: str = 'image/tiff', force_no_data_value: Optional[float] = None ) -> None: ''' Query a workflow and save the raster result as a file on disk Parameters ---------- bbox : A bounding box for the query file_path : The path to the file to save the raster to timeout : HTTP request timeout in seconds file_format : The format of the returned raster force_no_data_value: If not None, use this value as no data value for the requested raster data. \ Otherwise, use the Geo Engine will produce masked rasters. ''' response = self.__request_wcs(bbox, timeout, file_format, force_no_data_value) with open(file_path, 'wb') as file: file.write(response.read()) def get_provenance(self, timeout: int = 60) -> List[ProvenanceEntry]: ''' Query the provenance of the workflow ''' session = get_session() with geoengine_openapi_client.ApiClient(session.configuration) as api_client: workflows_api = geoengine_openapi_client.WorkflowsApi(api_client) response = workflows_api.get_workflow_provenance_handler(str(self.__workflow_id), _request_timeout=timeout) return [ProvenanceEntry.from_response(item) for item in response] def metadata_zip(self, path: Union[PathLike, BytesIO], timeout: int = 60) -> None: ''' Query workflow metadata and citations and stores it as zip file to `path` ''' session = get_session() with geoengine_openapi_client.ApiClient(session.configuration) as api_client: workflows_api = geoengine_openapi_client.WorkflowsApi(api_client) response = workflows_api.get_workflow_all_metadata_zip_handler( str(self.__workflow_id), _request_timeout=timeout ) if isinstance(path, BytesIO): path.write(response) else: with open(path, 'wb') as file: file.write(response) # pylint: disable=too-many-positional-arguments,too-many-positional-arguments def save_as_dataset( self, query_rectangle: geoengine_openapi_client.RasterQueryRectangle, name: Optional[str], display_name: str, description: str = '', timeout: int = 3600) -> Task: '''Init task to store the workflow result as a layer''' # Currently, it only works for raster results if not self.__result_descriptor.is_raster_result(): raise MethodNotCalledOnRasterException() session = get_session() with geoengine_openapi_client.ApiClient(session.configuration) as api_client: workflows_api = geoengine_openapi_client.WorkflowsApi(api_client) response = workflows_api.dataset_from_workflow_handler( str(self.__workflow_id), geoengine_openapi_client.RasterDatasetFromWorkflow( name=name, display_name=display_name, description=description, query=query_rectangle ), _request_timeout=timeout ) return Task(TaskId.from_response(response)) async def raster_stream( self, query_rectangle: QueryRectangle, open_timeout: int = 60, bands: Optional[List[int]] = None # TODO: move into query rectangle? ) -> AsyncIterator[RasterTile2D]: '''Stream the workflow result as series of RasterTile2D (transformable to numpy and xarray)''' if bands is None: bands = [0] if len(bands) == 0: raise InputException('At least one band must be specified') # Currently, it only works for raster results if not self.__result_descriptor.is_raster_result(): raise MethodNotCalledOnRasterException() session = get_session() url = req.Request( 'GET', url=f'{session.server_url}/workflow/{self.__workflow_id}/rasterStream', params={ 'resultType': 'arrow', 'spatialBounds': query_rectangle.bbox_str, 'timeInterval': query_rectangle.time_str, 'spatialResolution': str(query_rectangle.spatial_resolution), 'attributes': ','.join(map(str, bands)) }, ).prepare().url if url is None: raise InputException('Invalid websocket url') async with websockets.client.connect( uri=self.__replace_http_with_ws(url), extra_headers=session.auth_header, open_timeout=open_timeout, max_size=None, ) as websocket: tile_bytes: Optional[bytes] = None while websocket.open: async def read_new_bytes() -> Optional[bytes]: # already send the next request to speed up the process try: await websocket.send("NEXT") except websockets.exceptions.ConnectionClosed: # the websocket connection is already closed, we cannot read anymore return None try: data: Union[str, bytes] = await websocket.recv() if isinstance(data, str): # the server sent an error message raise GeoEngineException({'error': data}) return data except websockets.exceptions.ConnectionClosedOK: # the websocket connection closed gracefully, so we stop reading return None (tile_bytes, tile) = await asyncio.gather( read_new_bytes(), # asyncio.to_thread(process_bytes, tile_bytes), # TODO: use this when min Python version is 3.9 backports.to_thread(RasterStreamProcessing.process_bytes, tile_bytes), ) if tile is not None: yield tile # process the last tile tile = RasterStreamProcessing.process_bytes(tile_bytes) if tile is not None: yield tile async def raster_stream_into_xarray( self, query_rectangle: QueryRectangle, clip_to_query_rectangle: bool = False, open_timeout: int = 60, bands: Optional[List[int]] = None # TODO: move into query rectangle? ) -> xr.DataArray: ''' Stream the workflow result into memory and output a single xarray. NOTE: You can run out of memory if the query rectangle is too large. ''' if bands is None: bands = [0] if len(bands) == 0: raise InputException('At least one band must be specified') tile_stream = self.raster_stream( query_rectangle, open_timeout=open_timeout, bands=bands ) timestep_xarrays: List[xr.DataArray] = [] spatial_clip_bounds = query_rectangle.spatial_bounds if clip_to_query_rectangle else None async def read_tiles( remainder_tile: Optional[RasterTile2D] ) -> tuple[List[xr.DataArray], Optional[RasterTile2D]]: last_timestep: Optional[np.datetime64] = None tiles = [] if remainder_tile is not None: last_timestep = remainder_tile.time_start_ms xr_tile = remainder_tile.to_xarray(clip_with_bounds=spatial_clip_bounds) tiles.append(xr_tile) async for tile in tile_stream: timestep: np.datetime64 = tile.time_start_ms if last_timestep is None: last_timestep = timestep elif last_timestep != timestep: return tiles, tile xr_tile = tile.to_xarray(clip_with_bounds=spatial_clip_bounds) tiles.append(xr_tile) # this seems to be the last time step, so just return tiles return tiles, None (tiles, remainder_tile) = await read_tiles(None) while len(tiles): ((new_tiles, new_remainder_tile), new_timestep_xarray) = await asyncio.gather( read_tiles(remainder_tile), backports.to_thread(RasterStreamProcessing.merge_tiles, tiles) # asyncio.to_thread(merge_tiles, tiles), # TODO: use this when min Python version is 3.9 ) tiles = new_tiles remainder_tile = new_remainder_tile if new_timestep_xarray is not None: timestep_xarrays.append(new_timestep_xarray) output: xr.DataArray = cast( xr.DataArray, # await asyncio.to_thread( # TODO: use this when min Python version is 3.9 await backports.to_thread( xr.concat, # TODO: This is a typings error, since the method accepts also a `xr.DataArray` and returns one cast(List[xr.Dataset], timestep_xarrays), dim='time' ) ) return output async def vector_stream( self, query_rectangle: QueryRectangle, time_start_column: str = 'time_start', time_end_column: str = 'time_end', open_timeout: int = 60) -> AsyncIterator[gpd.GeoDataFrame]: '''Stream the workflow result as series of `GeoDataFrame`s''' def read_arrow_ipc(arrow_ipc: bytes) -> pa.RecordBatch: reader = pa.ipc.open_file(arrow_ipc) # We know from the backend that there is only one record batch record_batch = reader.get_record_batch(0) return record_batch def create_geo_data_frame(record_batch: pa.RecordBatch, time_start_column: str, time_end_column: str) -> gpd.GeoDataFrame: metadata = record_batch.schema.metadata spatial_reference = metadata[b'spatialReference'].decode('utf-8') data_frame = record_batch.to_pandas() geometry = gpd.GeoSeries.from_wkt(data_frame[api.GEOMETRY_COLUMN_NAME]) del data_frame[api.GEOMETRY_COLUMN_NAME] # delete the duplicated column geo_data_frame = gpd.GeoDataFrame( data_frame, geometry=geometry, crs=spatial_reference, ) # split time column geo_data_frame[[time_start_column, time_end_column]] = geo_data_frame[api.TIME_COLUMN_NAME].tolist() del geo_data_frame[api.TIME_COLUMN_NAME] # delete the duplicated column # parse time columns for time_column in [time_start_column, time_end_column]: geo_data_frame[time_column] = pd.to_datetime( geo_data_frame[time_column], utc=True, unit='ms', # TODO: solve time conversion problem from Geo Engine to Python for large (+/-) time instances errors='coerce', ) return geo_data_frame def process_bytes(batch_bytes: Optional[bytes]) -> Optional[gpd.GeoDataFrame]: if batch_bytes is None: return None # process the received data record_batch = read_arrow_ipc(batch_bytes) tile = create_geo_data_frame( record_batch, time_start_column=time_start_column, time_end_column=time_end_column, ) return tile # Currently, it only works for raster results if not self.__result_descriptor.is_vector_result(): raise MethodNotCalledOnVectorException() session = get_session() url = req.Request( 'GET', url=f'{session.server_url}/workflow/{self.__workflow_id}/vectorStream', params={ 'resultType': 'arrow', 'spatialBounds': query_rectangle.bbox_str, 'timeInterval': query_rectangle.time_str, 'spatialResolution': str(query_rectangle.spatial_resolution), }, ).prepare().url if url is None: raise InputException('Invalid websocket url') async with websockets.client.connect( uri=self.__replace_http_with_ws(url), extra_headers=session.auth_header, open_timeout=open_timeout, max_size=None, # allow arbitrary large messages, since it is capped by the server's chunk size ) as websocket: batch_bytes: Optional[bytes] = None while websocket.open: async def read_new_bytes() -> Optional[bytes]: # already send the next request to speed up the process try: await websocket.send("NEXT") except websockets.exceptions.ConnectionClosed: # the websocket connection is already closed, we cannot read anymore return None try: data: Union[str, bytes] = await websocket.recv() if isinstance(data, str): # the server sent an error message raise GeoEngineException({'error': data}) return data except websockets.exceptions.ConnectionClosedOK: # the websocket connection closed gracefully, so we stop reading return None (batch_bytes, batch) = await asyncio.gather( read_new_bytes(), # asyncio.to_thread(process_bytes, batch_bytes), # TODO: use this when min Python version is 3.9 backports.to_thread(process_bytes, batch_bytes), ) if batch is not None: yield batch # process the last tile batch = process_bytes(batch_bytes) if batch is not None: yield batch async def vector_stream_into_geopandas( self, query_rectangle: QueryRectangle, time_start_column: str = 'time_start', time_end_column: str = 'time_end', open_timeout: int = 60) -> gpd.GeoDataFrame: ''' Stream the workflow result into memory and output a single geo data frame. NOTE: You can run out of memory if the query rectangle is too large. ''' chunk_stream = self.vector_stream( query_rectangle, time_start_column=time_start_column, time_end_column=time_end_column, open_timeout=open_timeout, ) data_frame: Optional[gpd.GeoDataFrame] = None chunk: Optional[gpd.GeoDataFrame] = None async def read_dataframe() -> Optional[gpd.GeoDataFrame]: try: return await chunk_stream.__anext__() except StopAsyncIteration: return None def merge_dataframes( df_a: Optional[gpd.GeoDataFrame], df_b: Optional[gpd.GeoDataFrame] ) -> Optional[gpd.GeoDataFrame]: if df_a is None: return df_b if df_b is None: return df_a return pd.concat([df_a, df_b], ignore_index=True) while True: (chunk, data_frame) = await asyncio.gather( read_dataframe(), backports.to_thread(merge_dataframes, data_frame, chunk), # TODO: use this when min Python version is 3.9 # asyncio.to_thread(merge_dataframes, data_frame, chunk), ) # we can stop when the chunk stream is exhausted if chunk is None: break return data_frame def __replace_http_with_ws(self, url: str) -> str: ''' Replace the protocol in the url from `http` to `ws`. For the websockets library, it is necessary that the url starts with `ws://`. For HTTPS, we need to use `wss://` instead. ''' [protocol, url_part] = url.split('://', maxsplit=1) ws_prefix = 'wss://' if 's' in protocol.lower() else 'ws://' return f'{ws_prefix}{url_part}'
Methods
def download_raster(self, bbox: QueryRectangle, file_path: str, timeout=3600, file_format: str = 'image/tiff', force_no_data_value: Optional[float] = None) ‑> None
-
Query a workflow and save the raster result as a file on disk
Parameters
bbox
:A bounding box for the query
file_path
:The path to the file to save the raster to
timeout
:HTTP request timeout in seconds
file_format
:The format
ofthe returned raster
force_no_data_value: If not None, use this value as no data value for the requested raster data. Otherwise, use the Geo Engine will produce masked rasters.
Expand source code
def download_raster( self, bbox: QueryRectangle, file_path: str, timeout=3600, file_format: str = 'image/tiff', force_no_data_value: Optional[float] = None ) -> None: ''' Query a workflow and save the raster result as a file on disk Parameters ---------- bbox : A bounding box for the query file_path : The path to the file to save the raster to timeout : HTTP request timeout in seconds file_format : The format of the returned raster force_no_data_value: If not None, use this value as no data value for the requested raster data. \ Otherwise, use the Geo Engine will produce masked rasters. ''' response = self.__request_wcs(bbox, timeout, file_format, force_no_data_value) with open(file_path, 'wb') as file: file.write(response.read())
def get_array(self, bbox: QueryRectangle, timeout=3600, force_no_data_value: Optional[float] = None) ‑> numpy.ndarray
-
Query a workflow and return the raster result as a numpy array
Parameters
bbox
:A bounding box for the query
timeout
:HTTP request timeout in seconds
force_no_data_value: If not None, use this value as no data value for the requested raster data. Otherwise, use the Geo Engine will produce masked rasters.
Expand source code
def get_array( self, bbox: QueryRectangle, timeout=3600, force_no_data_value: Optional[float] = None ) -> np.ndarray: ''' Query a workflow and return the raster result as a numpy array Parameters ---------- bbox : A bounding box for the query timeout : HTTP request timeout in seconds force_no_data_value: If not None, use this value as no data value for the requested raster data. \ Otherwise, use the Geo Engine will produce masked rasters. ''' with self.__get_wcs_tiff_as_memory_file( bbox, timeout, force_no_data_value ) as memfile, memfile.open() as dataset: array = dataset.read(1) return array
def get_dataframe(self, bbox: QueryRectangle, timeout: int = 3600, resolve_classifications: bool = False) ‑> geopandas.geodataframe.GeoDataFrame
-
Query a workflow and return the WFS result as a GeoPandas
GeoDataFrame
Expand source code
def get_dataframe( self, bbox: QueryRectangle, timeout: int = 3600, resolve_classifications: bool = False ) -> gpd.GeoDataFrame: ''' Query a workflow and return the WFS result as a GeoPandas `GeoDataFrame` ''' if not self.__result_descriptor.is_vector_result(): raise MethodNotCalledOnVectorException() session = get_session() with geoengine_openapi_client.ApiClient(session.configuration) as api_client: wfs_api = geoengine_openapi_client.OGCWFSApi(api_client) response = wfs_api.wfs_feature_handler( workflow=str(self.__workflow_id), service=geoengine_openapi_client.WfsService(geoengine_openapi_client.WfsService.WFS), request=geoengine_openapi_client.GetFeatureRequest( geoengine_openapi_client.GetFeatureRequest.GETFEATURE ), type_names=str(self.__workflow_id), bbox=bbox.bbox_str, version=geoengine_openapi_client.WfsVersion(geoengine_openapi_client.WfsVersion.ENUM_2_DOT_0_DOT_0), time=bbox.time_str, srs_name=bbox.srs, query_resolution=str(bbox.spatial_resolution), _request_timeout=timeout ) def geo_json_with_time_to_geopandas(geo_json): ''' GeoJson has no standard for time, so we parse the when field separately and attach it to the data frame as columns `start` and `end`. ''' data = gpd.GeoDataFrame.from_features(geo_json) data = data.set_crs(bbox.srs, allow_override=True) start = [f['when']['start'] for f in geo_json['features']] end = [f['when']['end'] for f in geo_json['features']] # TODO: find a good way to infer BoT/EoT data['start'] = gpd.pd.to_datetime(start, errors='coerce') data['end'] = gpd.pd.to_datetime(end, errors='coerce') return data def transform_classifications(data: gpd.GeoDataFrame): result_descriptor: VectorResultDescriptor = self.__result_descriptor # type: ignore for (column, info) in result_descriptor.columns.items(): if isinstance(info.measurement, ClassificationMeasurement): measurement: ClassificationMeasurement = info.measurement classes = measurement.classes data[column] = data[column].apply(lambda x: classes[x]) # pylint: disable=cell-var-from-loop return data result = geo_json_with_time_to_geopandas(response.to_dict()) if resolve_classifications: result = transform_classifications(result) return result
def get_provenance(self, timeout: int = 60) ‑> List[ProvenanceEntry]
-
Query the provenance of the workflow
Expand source code
def get_provenance(self, timeout: int = 60) -> List[ProvenanceEntry]: ''' Query the provenance of the workflow ''' session = get_session() with geoengine_openapi_client.ApiClient(session.configuration) as api_client: workflows_api = geoengine_openapi_client.WorkflowsApi(api_client) response = workflows_api.get_workflow_provenance_handler(str(self.__workflow_id), _request_timeout=timeout) return [ProvenanceEntry.from_response(item) for item in response]
def get_result_descriptor(self) ‑> ResultDescriptor
-
Return the metadata of the workflow result
Expand source code
def get_result_descriptor(self) -> ResultDescriptor: ''' Return the metadata of the workflow result ''' return self.__result_descriptor
def get_xarray(self, bbox: QueryRectangle, timeout=3600, force_no_data_value: Optional[float] = None) ‑> xarray.core.dataarray.DataArray
-
Query a workflow and return the raster result as a georeferenced xarray
Parameters
bbox
:A bounding box for the query
timeout
:HTTP request timeout in seconds
force_no_data_value: If not None, use this value as no data value for the requested raster data. Otherwise, use the Geo Engine will produce masked rasters.
Expand source code
def get_xarray( self, bbox: QueryRectangle, timeout=3600, force_no_data_value: Optional[float] = None ) -> xr.DataArray: ''' Query a workflow and return the raster result as a georeferenced xarray Parameters ---------- bbox : A bounding box for the query timeout : HTTP request timeout in seconds force_no_data_value: If not None, use this value as no data value for the requested raster data. \ Otherwise, use the Geo Engine will produce masked rasters. ''' with self.__get_wcs_tiff_as_memory_file( bbox, timeout, force_no_data_value ) as memfile, memfile.open() as dataset: data_array = rioxarray.open_rasterio(dataset) # helping mypy with inference assert isinstance(data_array, xr.DataArray) rio: xr.DataArray = data_array.rio rio.update_attrs({ 'crs': rio.crs, 'res': rio.resolution(), 'transform': rio.transform(), }, inplace=True) # TODO: add time information to dataset return data_array.load()
def metadata_zip(self, path: Union[PathLike, BytesIO], timeout: int = 60) ‑> None
-
Query workflow metadata and citations and stores it as zip file to
path
Expand source code
def metadata_zip(self, path: Union[PathLike, BytesIO], timeout: int = 60) -> None: ''' Query workflow metadata and citations and stores it as zip file to `path` ''' session = get_session() with geoengine_openapi_client.ApiClient(session.configuration) as api_client: workflows_api = geoengine_openapi_client.WorkflowsApi(api_client) response = workflows_api.get_workflow_all_metadata_zip_handler( str(self.__workflow_id), _request_timeout=timeout ) if isinstance(path, BytesIO): path.write(response) else: with open(path, 'wb') as file: file.write(response)
def plot_chart(self, bbox: QueryRectangle, timeout: int = 3600) ‑> vega.vegalite.VegaLite
-
Query a workflow and return the plot chart result as a vega plot
Expand source code
def plot_chart(self, bbox: QueryRectangle, timeout: int = 3600) -> VegaLite: ''' Query a workflow and return the plot chart result as a vega plot ''' response = self.plot_json(bbox, timeout) vega_spec: VegaSpec = json.loads(response.data['vegaString']) return VegaLite(vega_spec)
def plot_json(self, bbox: QueryRectangle, timeout: int = 3600) ‑> geoengine_openapi_client.models.wrapped_plot_output.WrappedPlotOutput
-
Query a workflow and return the plot chart result as WrappedPlotOutput
Expand source code
def plot_json(self, bbox: QueryRectangle, timeout: int = 3600) -> geoengine_openapi_client.WrappedPlotOutput: ''' Query a workflow and return the plot chart result as WrappedPlotOutput ''' if not self.__result_descriptor.is_plot_result(): raise MethodNotCalledOnPlotException() session = get_session() with geoengine_openapi_client.ApiClient(session.configuration) as api_client: plots_api = geoengine_openapi_client.PlotsApi(api_client) return plots_api.get_plot_handler( bbox.bbox_str, bbox.time_str, str(bbox.spatial_resolution), str(self.__workflow_id), bbox.srs, _request_timeout=timeout )
async def raster_stream(self, query_rectangle: QueryRectangle, open_timeout: int = 60, bands: Optional[List[int]] = None) ‑> AsyncIterator[RasterTile2D]
-
Stream the workflow result as series of RasterTile2D (transformable to numpy and xarray)
Expand source code
async def raster_stream( self, query_rectangle: QueryRectangle, open_timeout: int = 60, bands: Optional[List[int]] = None # TODO: move into query rectangle? ) -> AsyncIterator[RasterTile2D]: '''Stream the workflow result as series of RasterTile2D (transformable to numpy and xarray)''' if bands is None: bands = [0] if len(bands) == 0: raise InputException('At least one band must be specified') # Currently, it only works for raster results if not self.__result_descriptor.is_raster_result(): raise MethodNotCalledOnRasterException() session = get_session() url = req.Request( 'GET', url=f'{session.server_url}/workflow/{self.__workflow_id}/rasterStream', params={ 'resultType': 'arrow', 'spatialBounds': query_rectangle.bbox_str, 'timeInterval': query_rectangle.time_str, 'spatialResolution': str(query_rectangle.spatial_resolution), 'attributes': ','.join(map(str, bands)) }, ).prepare().url if url is None: raise InputException('Invalid websocket url') async with websockets.client.connect( uri=self.__replace_http_with_ws(url), extra_headers=session.auth_header, open_timeout=open_timeout, max_size=None, ) as websocket: tile_bytes: Optional[bytes] = None while websocket.open: async def read_new_bytes() -> Optional[bytes]: # already send the next request to speed up the process try: await websocket.send("NEXT") except websockets.exceptions.ConnectionClosed: # the websocket connection is already closed, we cannot read anymore return None try: data: Union[str, bytes] = await websocket.recv() if isinstance(data, str): # the server sent an error message raise GeoEngineException({'error': data}) return data except websockets.exceptions.ConnectionClosedOK: # the websocket connection closed gracefully, so we stop reading return None (tile_bytes, tile) = await asyncio.gather( read_new_bytes(), # asyncio.to_thread(process_bytes, tile_bytes), # TODO: use this when min Python version is 3.9 backports.to_thread(RasterStreamProcessing.process_bytes, tile_bytes), ) if tile is not None: yield tile # process the last tile tile = RasterStreamProcessing.process_bytes(tile_bytes) if tile is not None: yield tile
async def raster_stream_into_xarray(self, query_rectangle: QueryRectangle, clip_to_query_rectangle: bool = False, open_timeout: int = 60, bands: Optional[List[int]] = None) ‑> xarray.core.dataarray.DataArray
-
Stream the workflow result into memory and output a single xarray.
NOTE: You can run out of memory if the query rectangle is too large.
Expand source code
async def raster_stream_into_xarray( self, query_rectangle: QueryRectangle, clip_to_query_rectangle: bool = False, open_timeout: int = 60, bands: Optional[List[int]] = None # TODO: move into query rectangle? ) -> xr.DataArray: ''' Stream the workflow result into memory and output a single xarray. NOTE: You can run out of memory if the query rectangle is too large. ''' if bands is None: bands = [0] if len(bands) == 0: raise InputException('At least one band must be specified') tile_stream = self.raster_stream( query_rectangle, open_timeout=open_timeout, bands=bands ) timestep_xarrays: List[xr.DataArray] = [] spatial_clip_bounds = query_rectangle.spatial_bounds if clip_to_query_rectangle else None async def read_tiles( remainder_tile: Optional[RasterTile2D] ) -> tuple[List[xr.DataArray], Optional[RasterTile2D]]: last_timestep: Optional[np.datetime64] = None tiles = [] if remainder_tile is not None: last_timestep = remainder_tile.time_start_ms xr_tile = remainder_tile.to_xarray(clip_with_bounds=spatial_clip_bounds) tiles.append(xr_tile) async for tile in tile_stream: timestep: np.datetime64 = tile.time_start_ms if last_timestep is None: last_timestep = timestep elif last_timestep != timestep: return tiles, tile xr_tile = tile.to_xarray(clip_with_bounds=spatial_clip_bounds) tiles.append(xr_tile) # this seems to be the last time step, so just return tiles return tiles, None (tiles, remainder_tile) = await read_tiles(None) while len(tiles): ((new_tiles, new_remainder_tile), new_timestep_xarray) = await asyncio.gather( read_tiles(remainder_tile), backports.to_thread(RasterStreamProcessing.merge_tiles, tiles) # asyncio.to_thread(merge_tiles, tiles), # TODO: use this when min Python version is 3.9 ) tiles = new_tiles remainder_tile = new_remainder_tile if new_timestep_xarray is not None: timestep_xarrays.append(new_timestep_xarray) output: xr.DataArray = cast( xr.DataArray, # await asyncio.to_thread( # TODO: use this when min Python version is 3.9 await backports.to_thread( xr.concat, # TODO: This is a typings error, since the method accepts also a `xr.DataArray` and returns one cast(List[xr.Dataset], timestep_xarrays), dim='time' ) ) return output
def save_as_dataset(self, query_rectangle: geoengine_openapi_client.RasterQueryRectangle, name: Optional[str], display_name: str, description: str = '', timeout: int = 3600) ‑> Task
-
Init task to store the workflow result as a layer
Expand source code
def save_as_dataset( self, query_rectangle: geoengine_openapi_client.RasterQueryRectangle, name: Optional[str], display_name: str, description: str = '', timeout: int = 3600) -> Task: '''Init task to store the workflow result as a layer''' # Currently, it only works for raster results if not self.__result_descriptor.is_raster_result(): raise MethodNotCalledOnRasterException() session = get_session() with geoengine_openapi_client.ApiClient(session.configuration) as api_client: workflows_api = geoengine_openapi_client.WorkflowsApi(api_client) response = workflows_api.dataset_from_workflow_handler( str(self.__workflow_id), geoengine_openapi_client.RasterDatasetFromWorkflow( name=name, display_name=display_name, description=description, query=query_rectangle ), _request_timeout=timeout ) return Task(TaskId.from_response(response))
async def vector_stream(self, query_rectangle: QueryRectangle, time_start_column: str = 'time_start', time_end_column: str = 'time_end', open_timeout: int = 60) ‑> AsyncIterator[geopandas.geodataframe.GeoDataFrame]
-
Stream the workflow result as series of
GeoDataFrame
sExpand source code
async def vector_stream( self, query_rectangle: QueryRectangle, time_start_column: str = 'time_start', time_end_column: str = 'time_end', open_timeout: int = 60) -> AsyncIterator[gpd.GeoDataFrame]: '''Stream the workflow result as series of `GeoDataFrame`s''' def read_arrow_ipc(arrow_ipc: bytes) -> pa.RecordBatch: reader = pa.ipc.open_file(arrow_ipc) # We know from the backend that there is only one record batch record_batch = reader.get_record_batch(0) return record_batch def create_geo_data_frame(record_batch: pa.RecordBatch, time_start_column: str, time_end_column: str) -> gpd.GeoDataFrame: metadata = record_batch.schema.metadata spatial_reference = metadata[b'spatialReference'].decode('utf-8') data_frame = record_batch.to_pandas() geometry = gpd.GeoSeries.from_wkt(data_frame[api.GEOMETRY_COLUMN_NAME]) del data_frame[api.GEOMETRY_COLUMN_NAME] # delete the duplicated column geo_data_frame = gpd.GeoDataFrame( data_frame, geometry=geometry, crs=spatial_reference, ) # split time column geo_data_frame[[time_start_column, time_end_column]] = geo_data_frame[api.TIME_COLUMN_NAME].tolist() del geo_data_frame[api.TIME_COLUMN_NAME] # delete the duplicated column # parse time columns for time_column in [time_start_column, time_end_column]: geo_data_frame[time_column] = pd.to_datetime( geo_data_frame[time_column], utc=True, unit='ms', # TODO: solve time conversion problem from Geo Engine to Python for large (+/-) time instances errors='coerce', ) return geo_data_frame def process_bytes(batch_bytes: Optional[bytes]) -> Optional[gpd.GeoDataFrame]: if batch_bytes is None: return None # process the received data record_batch = read_arrow_ipc(batch_bytes) tile = create_geo_data_frame( record_batch, time_start_column=time_start_column, time_end_column=time_end_column, ) return tile # Currently, it only works for raster results if not self.__result_descriptor.is_vector_result(): raise MethodNotCalledOnVectorException() session = get_session() url = req.Request( 'GET', url=f'{session.server_url}/workflow/{self.__workflow_id}/vectorStream', params={ 'resultType': 'arrow', 'spatialBounds': query_rectangle.bbox_str, 'timeInterval': query_rectangle.time_str, 'spatialResolution': str(query_rectangle.spatial_resolution), }, ).prepare().url if url is None: raise InputException('Invalid websocket url') async with websockets.client.connect( uri=self.__replace_http_with_ws(url), extra_headers=session.auth_header, open_timeout=open_timeout, max_size=None, # allow arbitrary large messages, since it is capped by the server's chunk size ) as websocket: batch_bytes: Optional[bytes] = None while websocket.open: async def read_new_bytes() -> Optional[bytes]: # already send the next request to speed up the process try: await websocket.send("NEXT") except websockets.exceptions.ConnectionClosed: # the websocket connection is already closed, we cannot read anymore return None try: data: Union[str, bytes] = await websocket.recv() if isinstance(data, str): # the server sent an error message raise GeoEngineException({'error': data}) return data except websockets.exceptions.ConnectionClosedOK: # the websocket connection closed gracefully, so we stop reading return None (batch_bytes, batch) = await asyncio.gather( read_new_bytes(), # asyncio.to_thread(process_bytes, batch_bytes), # TODO: use this when min Python version is 3.9 backports.to_thread(process_bytes, batch_bytes), ) if batch is not None: yield batch # process the last tile batch = process_bytes(batch_bytes) if batch is not None: yield batch
async def vector_stream_into_geopandas(self, query_rectangle: QueryRectangle, time_start_column: str = 'time_start', time_end_column: str = 'time_end', open_timeout: int = 60) ‑> geopandas.geodataframe.GeoDataFrame
-
Stream the workflow result into memory and output a single geo data frame.
NOTE: You can run out of memory if the query rectangle is too large.
Expand source code
async def vector_stream_into_geopandas( self, query_rectangle: QueryRectangle, time_start_column: str = 'time_start', time_end_column: str = 'time_end', open_timeout: int = 60) -> gpd.GeoDataFrame: ''' Stream the workflow result into memory and output a single geo data frame. NOTE: You can run out of memory if the query rectangle is too large. ''' chunk_stream = self.vector_stream( query_rectangle, time_start_column=time_start_column, time_end_column=time_end_column, open_timeout=open_timeout, ) data_frame: Optional[gpd.GeoDataFrame] = None chunk: Optional[gpd.GeoDataFrame] = None async def read_dataframe() -> Optional[gpd.GeoDataFrame]: try: return await chunk_stream.__anext__() except StopAsyncIteration: return None def merge_dataframes( df_a: Optional[gpd.GeoDataFrame], df_b: Optional[gpd.GeoDataFrame] ) -> Optional[gpd.GeoDataFrame]: if df_a is None: return df_b if df_b is None: return df_a return pd.concat([df_a, df_b], ignore_index=True) while True: (chunk, data_frame) = await asyncio.gather( read_dataframe(), backports.to_thread(merge_dataframes, data_frame, chunk), # TODO: use this when min Python version is 3.9 # asyncio.to_thread(merge_dataframes, data_frame, chunk), ) # we can stop when the chunk stream is exhausted if chunk is None: break return data_frame
def wms_get_map_as_image(self, bbox: QueryRectangle, raster_colorizer: RasterColorizer) ‑> PIL.Image.Image
-
Return the result of a WMS request as a PIL Image
Expand source code
def wms_get_map_as_image(self, bbox: QueryRectangle, raster_colorizer: RasterColorizer) -> Image.Image: '''Return the result of a WMS request as a PIL Image''' if not self.__result_descriptor.is_raster_result(): raise MethodNotCalledOnRasterException() session = get_session() with geoengine_openapi_client.ApiClient(session.configuration) as api_client: wms_api = geoengine_openapi_client.OGCWMSApi(api_client) response = wms_api.wms_map_handler( workflow=str(self), version=geoengine_openapi_client.WmsVersion(geoengine_openapi_client.WmsVersion.ENUM_1_DOT_3_DOT_0), service=geoengine_openapi_client.WmsService(geoengine_openapi_client.WmsService.WMS), request=geoengine_openapi_client.GetMapRequest(geoengine_openapi_client.GetMapRequest.GETMAP), width=int((bbox.spatial_bounds.xmax - bbox.spatial_bounds.xmin) / bbox.spatial_resolution.x_resolution), height=int((bbox.spatial_bounds.ymax - bbox.spatial_bounds.ymin) / bbox.spatial_resolution.y_resolution), # pylint: disable=line-too-long bbox=bbox.bbox_ogc_str, format=geoengine_openapi_client.GetMapFormat(geoengine_openapi_client.GetMapFormat.IMAGE_SLASH_PNG), layers=str(self), styles='custom:' + raster_colorizer.to_api_dict().to_json(), crs=bbox.srs, time=bbox.time_str ) return Image.open(BytesIO(response))
def workflow_definition(self, timeout: int = 60) ‑> geoengine_openapi_client.models.workflow.Workflow
-
Return the workflow definition for this workflow
Expand source code
def workflow_definition(self, timeout: int = 60) -> geoengine_openapi_client.Workflow: '''Return the workflow definition for this workflow''' session = get_session() with geoengine_openapi_client.ApiClient(session.configuration) as api_client: workflows_api = geoengine_openapi_client.WorkflowsApi(api_client) response = workflows_api.load_workflow_handler(str(self.__workflow_id), _request_timeout=timeout) return response
class WorkflowId (workflow_id: UUID)
-
A wrapper around a workflow UUID
Expand source code
class WorkflowId: ''' A wrapper around a workflow UUID ''' __workflow_id: UUID def __init__(self, workflow_id: UUID) -> None: self.__workflow_id = workflow_id @classmethod def from_response(cls, response: geoengine_openapi_client.AddCollection200Response) -> WorkflowId: ''' Create a `WorkflowId` from an http response ''' return WorkflowId(UUID(response.id)) def __str__(self) -> str: return str(self.__workflow_id) def __repr__(self) -> str: return str(self)
Static methods
def from_response(response: geoengine_openapi_client.AddCollection200Response) ‑> WorkflowId
-
Create a
WorkflowId
from an http responseExpand source code
@classmethod def from_response(cls, response: geoengine_openapi_client.AddCollection200Response) -> WorkflowId: ''' Create a `WorkflowId` from an http response ''' return WorkflowId(UUID(response.id))
class X (*args, **kwargs)
-
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
Ancestors
- builtins.dict
Class variables
var axis : Axis
var bin : Bin
var field : Field
class X2 (*args, **kwargs)
-
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
Ancestors
- builtins.dict
Class variables
var field : Field
class Y (*args, **kwargs)
-
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
Ancestors
- builtins.dict
Class variables
var field : Field
var type : str