Module geoengine.raster_workflow_rio_writer
A module that contains classes to write raster data from a Geo Engine raster workflow.
Expand source code
''' A module that contains classes to write raster data from a Geo Engine raster workflow. '''
from typing import Optional, cast
from datetime import datetime
import rasterio as rio
import numpy as np
from geoengine.workflow import Workflow, QueryRectangle
from geoengine.types import RasterResultDescriptor, TimeInterval
from geoengine.raster import ge_type_to_np
# pylint: disable=too-many-instance-attributes
class RasterWorkflowRioWriter:
'''
A class to write raster data from a Geo Engine raster workflow to a GDAL dataset.
It creates a new dataset for each time interval and writes the tiles to the dataset.
Multiple bands are supported and the bands are written to the dataset in the order of the result descriptor.
'''
current_dataset: Optional[rio.io.DatasetWriter] = None
current_time: Optional[TimeInterval] = None
dataset_geo_transform = None
dataset_width = None
dataset_height = None
dataset_data_type = np.dtype
print_info = False
dataset_prefix = None
workflow: Optional[Workflow] = None
bands = None
no_data_value = 0
time_format = "%Y-%m-%d_%H-%M-%S"
gdal_driver = "GTiff"
rio_kwargs = {"tiled": True, "compress": "DEFLATE", "zlevel": 6}
tile_size = 512
# pylint: disable=too-many-arguments,too-many-positional-arguments
def __init__(
self,
dataset_prefix,
workflow: Workflow,
no_data_value=0,
data_type=None,
print_info=False,
rio_kwargs=None
):
''' Create a new RasterWorkflowGdalWriter instance.'''
self.dataset_prefix = dataset_prefix
self.workflow = workflow
self.no_data_value = no_data_value
self.print_info = print_info
ras_res = cast(RasterResultDescriptor, self.workflow.get_result_descriptor())
dt = ge_type_to_np(ras_res.data_type)
self.dataset_data_type = dt if data_type is None else data_type
self.bands = ras_res.bands
if rio_kwargs:
for (key, value) in rio_kwargs.items():
self.rio_kwargs[key] = value
def close_current_dataset(self):
''' Close the current dataset '''
if self.current_dataset:
del self.current_dataset
self.current_dataset = None
# pylint: disable=too-many-locals, too-many-statements
def create_tiling_geo_transform_width_height(self, query: QueryRectangle):
''' Create the tiling geo transform, width and height for the current query.'''
ul_x = query.spatial_bounds.xmin
ul_y = query.spatial_bounds.ymax
lr_x = query.spatial_bounds.xmax
lr_y = query.spatial_bounds.ymin
res_x = query.spatial_resolution.x_resolution
res_y = query.spatial_resolution.y_resolution * -1 # honor the fact that the y axis is flipped
assert res_y < 0, "The y resolution must be negative"
assert ul_x < lr_x, "The upper left x coordinate must be smaller than the lower right x coordinate"
assert ul_y > lr_y, "The upper left y coordinate must be greater than the lower right y coordinate"
ul_pixel_x = ul_x / res_x # we can assume that the global origin is 0,0
ul_pixel_y = ul_y / res_y
lr_pixel_x = lr_x / res_x
lr_pixel_y = lr_y / res_y
assert ul_pixel_x < lr_pixel_x, "The upper left pixel x must be smaller than the lower right pixel x"
assert ul_pixel_y < lr_pixel_y, "The upper left pixel y must be smaller than the lower right pixel y"
tiling_ul_pixel_x = (ul_pixel_x // self.tile_size) * self.tile_size
if ul_pixel_x % self.tile_size != 0:
tiling_ul_pixel_x = ((ul_pixel_x // self.tile_size) - 1) * self.tile_size
tiling_ul_pixel_y = (ul_pixel_y // self.tile_size) * self.tile_size
if ul_pixel_y % self.tile_size != 0:
tiling_ul_pixel_y = ((ul_pixel_y // self.tile_size) - 1) * self.tile_size
assert tiling_ul_pixel_x <= ul_pixel_x, "Tiling upper left x pixel must be smaller than upper left x coordinate"
assert tiling_ul_pixel_y <= ul_pixel_y, "Tiling upper left y pixel must be smaller than upper left y coordinate"
width = int((lr_pixel_x - tiling_ul_pixel_x))
if width % self.tile_size != 0:
width = int((width // self.tile_size + 1) * self.tile_size)
assert width > 0, "The width must be greater than 0"
height = int((lr_pixel_y - tiling_ul_pixel_y))
if height % self.tile_size != 0:
height = int((height // self.tile_size + 1) * self.tile_size)
assert height > 0, "The height must be greater than 0"
assert width % self.tile_size == 0, "The width must be a multiple of the tile size"
assert height % self.tile_size == 0, "The height must be a multiple of the tile size"
tiling_ul_x_coord = tiling_ul_pixel_x * res_x
tiling_ul_y_coord = tiling_ul_pixel_y * res_y
assert tiling_ul_x_coord <= ul_x, "Tiling upper left x coordinate must be smaller than upper left x coordinate"
assert tiling_ul_y_coord >= ul_y, "Tiling upper left y coordinate must be greater than upper left y coordinate"
geo_transform = [tiling_ul_x_coord, res_x, 0., tiling_ul_y_coord, 0., res_y]
if self.dataset_geo_transform is None:
self.dataset_geo_transform = geo_transform
else:
assert self.dataset_geo_transform == geo_transform, "Can not change the geo transform of the dataset"
if self.dataset_width is None:
self.dataset_width = width
else:
assert self.dataset_width == width, "The width of the current dataset does not match the new one"
if self.dataset_height is None:
self.dataset_height = height
else:
assert self.dataset_height == height, "The height of the current dataset does not match the new one"
def __create_new_dataset(self, query: QueryRectangle):
''' Create a new dataset for the current query.'''
assert self.current_time is not None, "The current time must be set"
time_formated_start = self.current_time.start.astype(datetime).strftime(self.time_format)
width = self.dataset_width
height = self.dataset_height
geo_transform = self.dataset_geo_transform
assert geo_transform is not None
affine_transform = rio.Affine.from_gdal(
geo_transform[0], geo_transform[1], geo_transform[2], geo_transform[3], geo_transform[4], geo_transform[5]
)
if self.print_info:
print(f"Creating dataset {self.dataset_prefix}{time_formated_start}.tif"
f" with width {width}, height {height}, geo_transform {geo_transform}"
f" rio kwargs: {self.rio_kwargs}"
)
assert self.bands is not None, "The bands must be set"
number_of_bands = len(self.bands)
dataset_data_type = self.dataset_data_type
file_path = f"{self.dataset_prefix}{time_formated_start}.tif"
rio_dataset = rio.open(
file_path,
'w',
driver=self.gdal_driver,
width=width,
height=height,
count=number_of_bands,
crs=query.srs,
transform=affine_transform,
dtype=dataset_data_type,
nodata=self.no_data_value,
**self.rio_kwargs
)
self.current_dataset = rio_dataset
async def query_and_write(self, query: QueryRectangle):
''' Query the raster workflow and write the tiles to the dataset.'''
self.create_tiling_geo_transform_width_height(query)
assert self.workflow is not None, "The workflow must be set"
try:
async for tile in self.workflow.raster_stream(query):
if self.current_time != tile.time:
self.close_current_dataset()
self.current_time = tile.time
self.__create_new_dataset(query)
assert self.current_time == tile.time, "The time of the current dataset does not match the tile"
assert self.dataset_geo_transform is not None, "The geo transform must be set"
tile_ul_x = int(
(tile.geo_transform.x_min - self.dataset_geo_transform[0]) / self.dataset_geo_transform[1]
)
tile_ul_y = int(
(tile.geo_transform.y_max - self.dataset_geo_transform[3]) / self.dataset_geo_transform[5]
)
band_index = tile.band + 1
data = tile.to_numpy_data_array(self.no_data_value)
assert self.tile_size == tile.size_x == tile.size_y, "Tile size does not match the expected size"
window = rio.windows.Window(tile_ul_x, tile_ul_y, tile.size_x, tile.size_y)
assert self.current_dataset is not None, "Dataset must be open."
self.current_dataset.write(data, window=window, indexes=band_index)
except Exception as inner_e:
raise RuntimeError(f"Tile at {tile.spatial_partition().as_bbox_str()} with {tile.time}") from inner_e
finally:
self.close_current_dataset()
Classes
class RasterWorkflowRioWriter (dataset_prefix, workflow: Workflow, no_data_value=0, data_type=None, print_info=False, rio_kwargs=None)
-
A class to write raster data from a Geo Engine raster workflow to a GDAL dataset. It creates a new dataset for each time interval and writes the tiles to the dataset. Multiple bands are supported and the bands are written to the dataset in the order of the result descriptor.
Create a new RasterWorkflowGdalWriter instance.
Expand source code
class RasterWorkflowRioWriter: ''' A class to write raster data from a Geo Engine raster workflow to a GDAL dataset. It creates a new dataset for each time interval and writes the tiles to the dataset. Multiple bands are supported and the bands are written to the dataset in the order of the result descriptor. ''' current_dataset: Optional[rio.io.DatasetWriter] = None current_time: Optional[TimeInterval] = None dataset_geo_transform = None dataset_width = None dataset_height = None dataset_data_type = np.dtype print_info = False dataset_prefix = None workflow: Optional[Workflow] = None bands = None no_data_value = 0 time_format = "%Y-%m-%d_%H-%M-%S" gdal_driver = "GTiff" rio_kwargs = {"tiled": True, "compress": "DEFLATE", "zlevel": 6} tile_size = 512 # pylint: disable=too-many-arguments,too-many-positional-arguments def __init__( self, dataset_prefix, workflow: Workflow, no_data_value=0, data_type=None, print_info=False, rio_kwargs=None ): ''' Create a new RasterWorkflowGdalWriter instance.''' self.dataset_prefix = dataset_prefix self.workflow = workflow self.no_data_value = no_data_value self.print_info = print_info ras_res = cast(RasterResultDescriptor, self.workflow.get_result_descriptor()) dt = ge_type_to_np(ras_res.data_type) self.dataset_data_type = dt if data_type is None else data_type self.bands = ras_res.bands if rio_kwargs: for (key, value) in rio_kwargs.items(): self.rio_kwargs[key] = value def close_current_dataset(self): ''' Close the current dataset ''' if self.current_dataset: del self.current_dataset self.current_dataset = None # pylint: disable=too-many-locals, too-many-statements def create_tiling_geo_transform_width_height(self, query: QueryRectangle): ''' Create the tiling geo transform, width and height for the current query.''' ul_x = query.spatial_bounds.xmin ul_y = query.spatial_bounds.ymax lr_x = query.spatial_bounds.xmax lr_y = query.spatial_bounds.ymin res_x = query.spatial_resolution.x_resolution res_y = query.spatial_resolution.y_resolution * -1 # honor the fact that the y axis is flipped assert res_y < 0, "The y resolution must be negative" assert ul_x < lr_x, "The upper left x coordinate must be smaller than the lower right x coordinate" assert ul_y > lr_y, "The upper left y coordinate must be greater than the lower right y coordinate" ul_pixel_x = ul_x / res_x # we can assume that the global origin is 0,0 ul_pixel_y = ul_y / res_y lr_pixel_x = lr_x / res_x lr_pixel_y = lr_y / res_y assert ul_pixel_x < lr_pixel_x, "The upper left pixel x must be smaller than the lower right pixel x" assert ul_pixel_y < lr_pixel_y, "The upper left pixel y must be smaller than the lower right pixel y" tiling_ul_pixel_x = (ul_pixel_x // self.tile_size) * self.tile_size if ul_pixel_x % self.tile_size != 0: tiling_ul_pixel_x = ((ul_pixel_x // self.tile_size) - 1) * self.tile_size tiling_ul_pixel_y = (ul_pixel_y // self.tile_size) * self.tile_size if ul_pixel_y % self.tile_size != 0: tiling_ul_pixel_y = ((ul_pixel_y // self.tile_size) - 1) * self.tile_size assert tiling_ul_pixel_x <= ul_pixel_x, "Tiling upper left x pixel must be smaller than upper left x coordinate" assert tiling_ul_pixel_y <= ul_pixel_y, "Tiling upper left y pixel must be smaller than upper left y coordinate" width = int((lr_pixel_x - tiling_ul_pixel_x)) if width % self.tile_size != 0: width = int((width // self.tile_size + 1) * self.tile_size) assert width > 0, "The width must be greater than 0" height = int((lr_pixel_y - tiling_ul_pixel_y)) if height % self.tile_size != 0: height = int((height // self.tile_size + 1) * self.tile_size) assert height > 0, "The height must be greater than 0" assert width % self.tile_size == 0, "The width must be a multiple of the tile size" assert height % self.tile_size == 0, "The height must be a multiple of the tile size" tiling_ul_x_coord = tiling_ul_pixel_x * res_x tiling_ul_y_coord = tiling_ul_pixel_y * res_y assert tiling_ul_x_coord <= ul_x, "Tiling upper left x coordinate must be smaller than upper left x coordinate" assert tiling_ul_y_coord >= ul_y, "Tiling upper left y coordinate must be greater than upper left y coordinate" geo_transform = [tiling_ul_x_coord, res_x, 0., tiling_ul_y_coord, 0., res_y] if self.dataset_geo_transform is None: self.dataset_geo_transform = geo_transform else: assert self.dataset_geo_transform == geo_transform, "Can not change the geo transform of the dataset" if self.dataset_width is None: self.dataset_width = width else: assert self.dataset_width == width, "The width of the current dataset does not match the new one" if self.dataset_height is None: self.dataset_height = height else: assert self.dataset_height == height, "The height of the current dataset does not match the new one" def __create_new_dataset(self, query: QueryRectangle): ''' Create a new dataset for the current query.''' assert self.current_time is not None, "The current time must be set" time_formated_start = self.current_time.start.astype(datetime).strftime(self.time_format) width = self.dataset_width height = self.dataset_height geo_transform = self.dataset_geo_transform assert geo_transform is not None affine_transform = rio.Affine.from_gdal( geo_transform[0], geo_transform[1], geo_transform[2], geo_transform[3], geo_transform[4], geo_transform[5] ) if self.print_info: print(f"Creating dataset {self.dataset_prefix}{time_formated_start}.tif" f" with width {width}, height {height}, geo_transform {geo_transform}" f" rio kwargs: {self.rio_kwargs}" ) assert self.bands is not None, "The bands must be set" number_of_bands = len(self.bands) dataset_data_type = self.dataset_data_type file_path = f"{self.dataset_prefix}{time_formated_start}.tif" rio_dataset = rio.open( file_path, 'w', driver=self.gdal_driver, width=width, height=height, count=number_of_bands, crs=query.srs, transform=affine_transform, dtype=dataset_data_type, nodata=self.no_data_value, **self.rio_kwargs ) self.current_dataset = rio_dataset async def query_and_write(self, query: QueryRectangle): ''' Query the raster workflow and write the tiles to the dataset.''' self.create_tiling_geo_transform_width_height(query) assert self.workflow is not None, "The workflow must be set" try: async for tile in self.workflow.raster_stream(query): if self.current_time != tile.time: self.close_current_dataset() self.current_time = tile.time self.__create_new_dataset(query) assert self.current_time == tile.time, "The time of the current dataset does not match the tile" assert self.dataset_geo_transform is not None, "The geo transform must be set" tile_ul_x = int( (tile.geo_transform.x_min - self.dataset_geo_transform[0]) / self.dataset_geo_transform[1] ) tile_ul_y = int( (tile.geo_transform.y_max - self.dataset_geo_transform[3]) / self.dataset_geo_transform[5] ) band_index = tile.band + 1 data = tile.to_numpy_data_array(self.no_data_value) assert self.tile_size == tile.size_x == tile.size_y, "Tile size does not match the expected size" window = rio.windows.Window(tile_ul_x, tile_ul_y, tile.size_x, tile.size_y) assert self.current_dataset is not None, "Dataset must be open." self.current_dataset.write(data, window=window, indexes=band_index) except Exception as inner_e: raise RuntimeError(f"Tile at {tile.spatial_partition().as_bbox_str()} with {tile.time}") from inner_e finally: self.close_current_dataset()
Class variables
var bands
var current_dataset : Optional[rasterio.io.DatasetWriter]
var current_time : Optional[TimeInterval]
var dataset_data_type
-
dtype(dtype, align=False, copy=False, [metadata])
Create a data type object.
A numpy array is homogeneous, and contains elements described by a dtype object. A dtype object can be constructed from different combinations of fundamental numeric types.
Parameters
dtype
- Object to be converted to a data type object.
align
:bool
, optional- Add padding to the fields to match what a C compiler would output
for a similar C-struct. Can be
True
only ifobj
is a dictionary or a comma-separated string. If a struct dtype is being created, this also sets a sticky alignment flagisalignedstruct
. copy
:bool
, optional- Make a new copy of the data-type object. If
False
, the result may just be a reference to a built-in data-type object. metadata
:dict
, optional- An optional dictionary with dtype metadata.
See Also
result_type
Examples
Using array-scalar type:
>>> np.dtype(np.int16) dtype('int16')
Structured type, one field name 'f1', containing int16:
>>> np.dtype([('f1', np.int16)]) dtype([('f1', '<i2')])
Structured type, one field named 'f1', in itself containing a structured type with one field:
>>> np.dtype([('f1', [('f1', np.int16)])]) dtype([('f1', [('f1', '<i2')])])
Structured type, two fields: the first field contains an unsigned int, the second an int32:
>>> np.dtype([('f1', np.uint64), ('f2', np.int32)]) dtype([('f1', '<u8'), ('f2', '<i4')])
Using array-protocol type strings:
>>> np.dtype([('a','f8'),('b','S10')]) dtype([('a', '<f8'), ('b', 'S10')])
Using comma-separated field formats. The shape is (2,3):
>>> np.dtype("i4, (2,3)f8") dtype([('f0', '<i4'), ('f1', '<f8', (2, 3))])
Using tuples.
int
is a fixed type, 3 the field's shape.void
is a flexible type, here of size 10:>>> np.dtype([('hello',(np.int64,3)),('world',np.void,10)]) dtype([('hello', '<i8', (3,)), ('world', 'V10')])
Subdivide
int16
into 2int8
's, called x and y. 0 and 1 are the offsets in bytes:>>> np.dtype((np.int16, {'x':(np.int8,0), 'y':(np.int8,1)})) dtype((numpy.int16, [('x', 'i1'), ('y', 'i1')]))
Using dictionaries. Two fields named 'gender' and 'age':
>>> np.dtype({'names':['gender','age'], 'formats':['S1',np.uint8]}) dtype([('gender', 'S1'), ('age', 'u1')])
Offsets in bytes, here 0 and 25:
>>> np.dtype({'surname':('S25',0),'age':(np.uint8,25)}) dtype([('surname', 'S25'), ('age', 'u1')])
var dataset_geo_transform
var dataset_height
var dataset_prefix
var dataset_width
var gdal_driver
var no_data_value
var print_info
var rio_kwargs
var tile_size
var time_format
var workflow : Optional[Workflow]
Methods
def close_current_dataset(self)
-
Close the current dataset
Expand source code
def close_current_dataset(self): ''' Close the current dataset ''' if self.current_dataset: del self.current_dataset self.current_dataset = None
def create_tiling_geo_transform_width_height(self, query: QueryRectangle)
-
Create the tiling geo transform, width and height for the current query.
Expand source code
def create_tiling_geo_transform_width_height(self, query: QueryRectangle): ''' Create the tiling geo transform, width and height for the current query.''' ul_x = query.spatial_bounds.xmin ul_y = query.spatial_bounds.ymax lr_x = query.spatial_bounds.xmax lr_y = query.spatial_bounds.ymin res_x = query.spatial_resolution.x_resolution res_y = query.spatial_resolution.y_resolution * -1 # honor the fact that the y axis is flipped assert res_y < 0, "The y resolution must be negative" assert ul_x < lr_x, "The upper left x coordinate must be smaller than the lower right x coordinate" assert ul_y > lr_y, "The upper left y coordinate must be greater than the lower right y coordinate" ul_pixel_x = ul_x / res_x # we can assume that the global origin is 0,0 ul_pixel_y = ul_y / res_y lr_pixel_x = lr_x / res_x lr_pixel_y = lr_y / res_y assert ul_pixel_x < lr_pixel_x, "The upper left pixel x must be smaller than the lower right pixel x" assert ul_pixel_y < lr_pixel_y, "The upper left pixel y must be smaller than the lower right pixel y" tiling_ul_pixel_x = (ul_pixel_x // self.tile_size) * self.tile_size if ul_pixel_x % self.tile_size != 0: tiling_ul_pixel_x = ((ul_pixel_x // self.tile_size) - 1) * self.tile_size tiling_ul_pixel_y = (ul_pixel_y // self.tile_size) * self.tile_size if ul_pixel_y % self.tile_size != 0: tiling_ul_pixel_y = ((ul_pixel_y // self.tile_size) - 1) * self.tile_size assert tiling_ul_pixel_x <= ul_pixel_x, "Tiling upper left x pixel must be smaller than upper left x coordinate" assert tiling_ul_pixel_y <= ul_pixel_y, "Tiling upper left y pixel must be smaller than upper left y coordinate" width = int((lr_pixel_x - tiling_ul_pixel_x)) if width % self.tile_size != 0: width = int((width // self.tile_size + 1) * self.tile_size) assert width > 0, "The width must be greater than 0" height = int((lr_pixel_y - tiling_ul_pixel_y)) if height % self.tile_size != 0: height = int((height // self.tile_size + 1) * self.tile_size) assert height > 0, "The height must be greater than 0" assert width % self.tile_size == 0, "The width must be a multiple of the tile size" assert height % self.tile_size == 0, "The height must be a multiple of the tile size" tiling_ul_x_coord = tiling_ul_pixel_x * res_x tiling_ul_y_coord = tiling_ul_pixel_y * res_y assert tiling_ul_x_coord <= ul_x, "Tiling upper left x coordinate must be smaller than upper left x coordinate" assert tiling_ul_y_coord >= ul_y, "Tiling upper left y coordinate must be greater than upper left y coordinate" geo_transform = [tiling_ul_x_coord, res_x, 0., tiling_ul_y_coord, 0., res_y] if self.dataset_geo_transform is None: self.dataset_geo_transform = geo_transform else: assert self.dataset_geo_transform == geo_transform, "Can not change the geo transform of the dataset" if self.dataset_width is None: self.dataset_width = width else: assert self.dataset_width == width, "The width of the current dataset does not match the new one" if self.dataset_height is None: self.dataset_height = height else: assert self.dataset_height == height, "The height of the current dataset does not match the new one"
async def query_and_write(self, query: QueryRectangle)
-
Query the raster workflow and write the tiles to the dataset.
Expand source code
async def query_and_write(self, query: QueryRectangle): ''' Query the raster workflow and write the tiles to the dataset.''' self.create_tiling_geo_transform_width_height(query) assert self.workflow is not None, "The workflow must be set" try: async for tile in self.workflow.raster_stream(query): if self.current_time != tile.time: self.close_current_dataset() self.current_time = tile.time self.__create_new_dataset(query) assert self.current_time == tile.time, "The time of the current dataset does not match the tile" assert self.dataset_geo_transform is not None, "The geo transform must be set" tile_ul_x = int( (tile.geo_transform.x_min - self.dataset_geo_transform[0]) / self.dataset_geo_transform[1] ) tile_ul_y = int( (tile.geo_transform.y_max - self.dataset_geo_transform[3]) / self.dataset_geo_transform[5] ) band_index = tile.band + 1 data = tile.to_numpy_data_array(self.no_data_value) assert self.tile_size == tile.size_x == tile.size_y, "Tile size does not match the expected size" window = rio.windows.Window(tile_ul_x, tile_ul_y, tile.size_x, tile.size_y) assert self.current_dataset is not None, "Dataset must be open." self.current_dataset.write(data, window=window, indexes=band_index) except Exception as inner_e: raise RuntimeError(f"Tile at {tile.spatial_partition().as_bbox_str()} with {tile.time}") from inner_e finally: self.close_current_dataset()