Module geoengine.raster_workflow_rio_writer

A module that contains classes to write raster data from a Geo Engine raster workflow.

Expand source code
''' A module that contains classes to write raster data from a Geo Engine raster workflow. '''

from typing import Optional, cast
from datetime import datetime
import rasterio as rio
import numpy as np
from geoengine.workflow import Workflow, QueryRectangle
from geoengine.types import RasterResultDescriptor, TimeInterval
from geoengine.raster import ge_type_to_np


# pylint: disable=too-many-instance-attributes
class RasterWorkflowRioWriter:
    '''
    A class to write raster data from a Geo Engine raster workflow to a GDAL dataset.
    It creates a new dataset for each time interval and writes the tiles to the dataset.
    Multiple bands are supported and the bands are written to the dataset in the order of the result descriptor.
    '''
    current_dataset: Optional[rio.io.DatasetWriter] = None
    current_time: Optional[TimeInterval] = None
    dataset_geo_transform = None
    dataset_width = None
    dataset_height = None
    dataset_data_type = np.dtype
    print_info = False

    dataset_prefix = None
    workflow: Optional[Workflow] = None
    bands = None
    no_data_value = 0
    time_format = "%Y-%m-%d_%H-%M-%S"

    gdal_driver = "GTiff"
    rio_kwargs = {"tiled": True, "compress": "DEFLATE", "zlevel": 6}
    tile_size = 512

    # pylint: disable=too-many-arguments,too-many-positional-arguments
    def __init__(
        self,
        dataset_prefix,
        workflow: Workflow,
        no_data_value=0,
        data_type=None,
        print_info=False,
        rio_kwargs=None
    ):
        ''' Create a new RasterWorkflowGdalWriter instance.'''
        self.dataset_prefix = dataset_prefix
        self.workflow = workflow
        self.no_data_value = no_data_value
        self.print_info = print_info

        ras_res = cast(RasterResultDescriptor, self.workflow.get_result_descriptor())
        dt = ge_type_to_np(ras_res.data_type)
        self.dataset_data_type = dt if data_type is None else data_type
        self.bands = ras_res.bands
        if rio_kwargs:
            for (key, value) in rio_kwargs.items():
                self.rio_kwargs[key] = value

    def close_current_dataset(self):
        ''' Close the current dataset '''
        if self.current_dataset:
            del self.current_dataset
            self.current_dataset = None

    # pylint: disable=too-many-locals, too-many-statements
    def create_tiling_geo_transform_width_height(self, query: QueryRectangle):
        ''' Create the tiling geo transform, width and height for the current query.'''

        ul_x = query.spatial_bounds.xmin
        ul_y = query.spatial_bounds.ymax
        lr_x = query.spatial_bounds.xmax
        lr_y = query.spatial_bounds.ymin
        res_x = query.spatial_resolution.x_resolution
        res_y = query.spatial_resolution.y_resolution * -1  # honor the fact that the y axis is flipped

        assert res_y < 0, "The y resolution must be negative"

        assert ul_x < lr_x, "The upper left x coordinate must be smaller than the lower right x coordinate"
        assert ul_y > lr_y, "The upper left y coordinate must be greater than the lower right y coordinate"

        ul_pixel_x = ul_x / res_x  # we can assume that the global origin is 0,0
        ul_pixel_y = ul_y / res_y
        lr_pixel_x = lr_x / res_x
        lr_pixel_y = lr_y / res_y

        assert ul_pixel_x < lr_pixel_x, "The upper left pixel x must be smaller than the lower right pixel x"
        assert ul_pixel_y < lr_pixel_y, "The upper left pixel y must be smaller than the lower right pixel y"

        tiling_ul_pixel_x = (ul_pixel_x // self.tile_size) * self.tile_size
        if ul_pixel_x % self.tile_size != 0:
            tiling_ul_pixel_x = ((ul_pixel_x // self.tile_size) - 1) * self.tile_size

        tiling_ul_pixel_y = (ul_pixel_y // self.tile_size) * self.tile_size
        if ul_pixel_y % self.tile_size != 0:
            tiling_ul_pixel_y = ((ul_pixel_y // self.tile_size) - 1) * self.tile_size

        assert tiling_ul_pixel_x <= ul_pixel_x, "Tiling upper left x pixel must be smaller than upper left x coordinate"
        assert tiling_ul_pixel_y <= ul_pixel_y, "Tiling upper left y pixel must be smaller than upper left y coordinate"

        width = int((lr_pixel_x - tiling_ul_pixel_x))
        if width % self.tile_size != 0:
            width = int((width // self.tile_size + 1) * self.tile_size)
        assert width > 0, "The width must be greater than 0"

        height = int((lr_pixel_y - tiling_ul_pixel_y))
        if height % self.tile_size != 0:
            height = int((height // self.tile_size + 1) * self.tile_size)
        assert height > 0, "The height must be greater than 0"

        assert width % self.tile_size == 0, "The width must be a multiple of the tile size"
        assert height % self.tile_size == 0, "The height must be a multiple of the tile size"

        tiling_ul_x_coord = tiling_ul_pixel_x * res_x
        tiling_ul_y_coord = tiling_ul_pixel_y * res_y
        assert tiling_ul_x_coord <= ul_x, "Tiling upper left x coordinate must be smaller than upper left x coordinate"
        assert tiling_ul_y_coord >= ul_y, "Tiling upper left y coordinate must be greater than upper left y coordinate"

        geo_transform = [tiling_ul_x_coord, res_x, 0., tiling_ul_y_coord, 0., res_y]

        if self.dataset_geo_transform is None:
            self.dataset_geo_transform = geo_transform
        else:
            assert self.dataset_geo_transform == geo_transform, "Can not change the geo transform of the dataset"

        if self.dataset_width is None:
            self.dataset_width = width
        else:
            assert self.dataset_width == width, "The width of the current dataset does not match the new one"

        if self.dataset_height is None:
            self.dataset_height = height
        else:
            assert self.dataset_height == height, "The height of the current dataset does not match the new one"

    def __create_new_dataset(self, query: QueryRectangle):
        ''' Create a new dataset for the current query.'''
        assert self.current_time is not None, "The current time must be set"
        time_formated_start = self.current_time.start.astype(datetime).strftime(self.time_format)
        width = self.dataset_width
        height = self.dataset_height
        geo_transform = self.dataset_geo_transform
        assert geo_transform is not None
        affine_transform = rio.Affine.from_gdal(
            geo_transform[0], geo_transform[1], geo_transform[2], geo_transform[3], geo_transform[4], geo_transform[5]
        )
        if self.print_info:
            print(f"Creating dataset {self.dataset_prefix}{time_formated_start}.tif"
                  f" with width {width}, height {height}, geo_transform {geo_transform}"
                  f" rio kwargs: {self.rio_kwargs}"
                  )
        assert self.bands is not None, "The bands must be set"
        number_of_bands = len(self.bands)
        dataset_data_type = self.dataset_data_type
        file_path = f"{self.dataset_prefix}{time_formated_start}.tif"
        rio_dataset = rio.open(
            file_path,
            'w',
            driver=self.gdal_driver,
            width=width,
            height=height,
            count=number_of_bands,
            crs=query.srs,
            transform=affine_transform,
            dtype=dataset_data_type,
            nodata=self.no_data_value,
            **self.rio_kwargs
        )

        self.current_dataset = rio_dataset

    async def query_and_write(self, query: QueryRectangle):
        ''' Query the raster workflow and write the tiles to the dataset.'''

        self.create_tiling_geo_transform_width_height(query)

        assert self.workflow is not None, "The workflow must be set"

        try:
            async for tile in self.workflow.raster_stream(query):
                if self.current_time != tile.time:
                    self.close_current_dataset()
                    self.current_time = tile.time
                    self.__create_new_dataset(query)

                assert self.current_time == tile.time, "The time of the current dataset does not match the tile"
                assert self.dataset_geo_transform is not None, "The geo transform must be set"

                tile_ul_x = int(
                    (tile.geo_transform.x_min - self.dataset_geo_transform[0]) / self.dataset_geo_transform[1]
                )
                tile_ul_y = int(
                    (tile.geo_transform.y_max - self.dataset_geo_transform[3]) / self.dataset_geo_transform[5]
                )

                band_index = tile.band + 1
                data = tile.to_numpy_data_array(self.no_data_value)

                assert self.tile_size == tile.size_x == tile.size_y, "Tile size does not match the expected size"
                window = rio.windows.Window(tile_ul_x, tile_ul_y, tile.size_x, tile.size_y)
                assert self.current_dataset is not None, "Dataset must be open."
                self.current_dataset.write(data, window=window, indexes=band_index)
        except Exception as inner_e:
            raise RuntimeError(f"Tile at {tile.spatial_partition().as_bbox_str()} with {tile.time}") from inner_e

        finally:
            self.close_current_dataset()

Classes

class RasterWorkflowRioWriter (dataset_prefix, workflow: Workflow, no_data_value=0, data_type=None, print_info=False, rio_kwargs=None)

A class to write raster data from a Geo Engine raster workflow to a GDAL dataset. It creates a new dataset for each time interval and writes the tiles to the dataset. Multiple bands are supported and the bands are written to the dataset in the order of the result descriptor.

Create a new RasterWorkflowGdalWriter instance.

Expand source code
class RasterWorkflowRioWriter:
    '''
    A class to write raster data from a Geo Engine raster workflow to a GDAL dataset.
    It creates a new dataset for each time interval and writes the tiles to the dataset.
    Multiple bands are supported and the bands are written to the dataset in the order of the result descriptor.
    '''
    current_dataset: Optional[rio.io.DatasetWriter] = None
    current_time: Optional[TimeInterval] = None
    dataset_geo_transform = None
    dataset_width = None
    dataset_height = None
    dataset_data_type = np.dtype
    print_info = False

    dataset_prefix = None
    workflow: Optional[Workflow] = None
    bands = None
    no_data_value = 0
    time_format = "%Y-%m-%d_%H-%M-%S"

    gdal_driver = "GTiff"
    rio_kwargs = {"tiled": True, "compress": "DEFLATE", "zlevel": 6}
    tile_size = 512

    # pylint: disable=too-many-arguments,too-many-positional-arguments
    def __init__(
        self,
        dataset_prefix,
        workflow: Workflow,
        no_data_value=0,
        data_type=None,
        print_info=False,
        rio_kwargs=None
    ):
        ''' Create a new RasterWorkflowGdalWriter instance.'''
        self.dataset_prefix = dataset_prefix
        self.workflow = workflow
        self.no_data_value = no_data_value
        self.print_info = print_info

        ras_res = cast(RasterResultDescriptor, self.workflow.get_result_descriptor())
        dt = ge_type_to_np(ras_res.data_type)
        self.dataset_data_type = dt if data_type is None else data_type
        self.bands = ras_res.bands
        if rio_kwargs:
            for (key, value) in rio_kwargs.items():
                self.rio_kwargs[key] = value

    def close_current_dataset(self):
        ''' Close the current dataset '''
        if self.current_dataset:
            del self.current_dataset
            self.current_dataset = None

    # pylint: disable=too-many-locals, too-many-statements
    def create_tiling_geo_transform_width_height(self, query: QueryRectangle):
        ''' Create the tiling geo transform, width and height for the current query.'''

        ul_x = query.spatial_bounds.xmin
        ul_y = query.spatial_bounds.ymax
        lr_x = query.spatial_bounds.xmax
        lr_y = query.spatial_bounds.ymin
        res_x = query.spatial_resolution.x_resolution
        res_y = query.spatial_resolution.y_resolution * -1  # honor the fact that the y axis is flipped

        assert res_y < 0, "The y resolution must be negative"

        assert ul_x < lr_x, "The upper left x coordinate must be smaller than the lower right x coordinate"
        assert ul_y > lr_y, "The upper left y coordinate must be greater than the lower right y coordinate"

        ul_pixel_x = ul_x / res_x  # we can assume that the global origin is 0,0
        ul_pixel_y = ul_y / res_y
        lr_pixel_x = lr_x / res_x
        lr_pixel_y = lr_y / res_y

        assert ul_pixel_x < lr_pixel_x, "The upper left pixel x must be smaller than the lower right pixel x"
        assert ul_pixel_y < lr_pixel_y, "The upper left pixel y must be smaller than the lower right pixel y"

        tiling_ul_pixel_x = (ul_pixel_x // self.tile_size) * self.tile_size
        if ul_pixel_x % self.tile_size != 0:
            tiling_ul_pixel_x = ((ul_pixel_x // self.tile_size) - 1) * self.tile_size

        tiling_ul_pixel_y = (ul_pixel_y // self.tile_size) * self.tile_size
        if ul_pixel_y % self.tile_size != 0:
            tiling_ul_pixel_y = ((ul_pixel_y // self.tile_size) - 1) * self.tile_size

        assert tiling_ul_pixel_x <= ul_pixel_x, "Tiling upper left x pixel must be smaller than upper left x coordinate"
        assert tiling_ul_pixel_y <= ul_pixel_y, "Tiling upper left y pixel must be smaller than upper left y coordinate"

        width = int((lr_pixel_x - tiling_ul_pixel_x))
        if width % self.tile_size != 0:
            width = int((width // self.tile_size + 1) * self.tile_size)
        assert width > 0, "The width must be greater than 0"

        height = int((lr_pixel_y - tiling_ul_pixel_y))
        if height % self.tile_size != 0:
            height = int((height // self.tile_size + 1) * self.tile_size)
        assert height > 0, "The height must be greater than 0"

        assert width % self.tile_size == 0, "The width must be a multiple of the tile size"
        assert height % self.tile_size == 0, "The height must be a multiple of the tile size"

        tiling_ul_x_coord = tiling_ul_pixel_x * res_x
        tiling_ul_y_coord = tiling_ul_pixel_y * res_y
        assert tiling_ul_x_coord <= ul_x, "Tiling upper left x coordinate must be smaller than upper left x coordinate"
        assert tiling_ul_y_coord >= ul_y, "Tiling upper left y coordinate must be greater than upper left y coordinate"

        geo_transform = [tiling_ul_x_coord, res_x, 0., tiling_ul_y_coord, 0., res_y]

        if self.dataset_geo_transform is None:
            self.dataset_geo_transform = geo_transform
        else:
            assert self.dataset_geo_transform == geo_transform, "Can not change the geo transform of the dataset"

        if self.dataset_width is None:
            self.dataset_width = width
        else:
            assert self.dataset_width == width, "The width of the current dataset does not match the new one"

        if self.dataset_height is None:
            self.dataset_height = height
        else:
            assert self.dataset_height == height, "The height of the current dataset does not match the new one"

    def __create_new_dataset(self, query: QueryRectangle):
        ''' Create a new dataset for the current query.'''
        assert self.current_time is not None, "The current time must be set"
        time_formated_start = self.current_time.start.astype(datetime).strftime(self.time_format)
        width = self.dataset_width
        height = self.dataset_height
        geo_transform = self.dataset_geo_transform
        assert geo_transform is not None
        affine_transform = rio.Affine.from_gdal(
            geo_transform[0], geo_transform[1], geo_transform[2], geo_transform[3], geo_transform[4], geo_transform[5]
        )
        if self.print_info:
            print(f"Creating dataset {self.dataset_prefix}{time_formated_start}.tif"
                  f" with width {width}, height {height}, geo_transform {geo_transform}"
                  f" rio kwargs: {self.rio_kwargs}"
                  )
        assert self.bands is not None, "The bands must be set"
        number_of_bands = len(self.bands)
        dataset_data_type = self.dataset_data_type
        file_path = f"{self.dataset_prefix}{time_formated_start}.tif"
        rio_dataset = rio.open(
            file_path,
            'w',
            driver=self.gdal_driver,
            width=width,
            height=height,
            count=number_of_bands,
            crs=query.srs,
            transform=affine_transform,
            dtype=dataset_data_type,
            nodata=self.no_data_value,
            **self.rio_kwargs
        )

        self.current_dataset = rio_dataset

    async def query_and_write(self, query: QueryRectangle):
        ''' Query the raster workflow and write the tiles to the dataset.'''

        self.create_tiling_geo_transform_width_height(query)

        assert self.workflow is not None, "The workflow must be set"

        try:
            async for tile in self.workflow.raster_stream(query):
                if self.current_time != tile.time:
                    self.close_current_dataset()
                    self.current_time = tile.time
                    self.__create_new_dataset(query)

                assert self.current_time == tile.time, "The time of the current dataset does not match the tile"
                assert self.dataset_geo_transform is not None, "The geo transform must be set"

                tile_ul_x = int(
                    (tile.geo_transform.x_min - self.dataset_geo_transform[0]) / self.dataset_geo_transform[1]
                )
                tile_ul_y = int(
                    (tile.geo_transform.y_max - self.dataset_geo_transform[3]) / self.dataset_geo_transform[5]
                )

                band_index = tile.band + 1
                data = tile.to_numpy_data_array(self.no_data_value)

                assert self.tile_size == tile.size_x == tile.size_y, "Tile size does not match the expected size"
                window = rio.windows.Window(tile_ul_x, tile_ul_y, tile.size_x, tile.size_y)
                assert self.current_dataset is not None, "Dataset must be open."
                self.current_dataset.write(data, window=window, indexes=band_index)
        except Exception as inner_e:
            raise RuntimeError(f"Tile at {tile.spatial_partition().as_bbox_str()} with {tile.time}") from inner_e

        finally:
            self.close_current_dataset()

Class variables

var bands
var current_dataset : Optional[rasterio.io.DatasetWriter]
var current_time : Optional[TimeInterval]
var dataset_data_type

dtype(dtype, align=False, copy=False, [metadata])

Create a data type object.

A numpy array is homogeneous, and contains elements described by a dtype object. A dtype object can be constructed from different combinations of fundamental numeric types.

Parameters

dtype
Object to be converted to a data type object.
align : bool, optional
Add padding to the fields to match what a C compiler would output for a similar C-struct. Can be True only if obj is a dictionary or a comma-separated string. If a struct dtype is being created, this also sets a sticky alignment flag isalignedstruct.
copy : bool, optional
Make a new copy of the data-type object. If False, the result may just be a reference to a built-in data-type object.
metadata : dict, optional
An optional dictionary with dtype metadata.

See Also

result_type

Examples

Using array-scalar type:

>>> np.dtype(np.int16)
dtype('int16')

Structured type, one field name 'f1', containing int16:

>>> np.dtype([('f1', np.int16)])
dtype([('f1', '<i2')])

Structured type, one field named 'f1', in itself containing a structured type with one field:

>>> np.dtype([('f1', [('f1', np.int16)])])
dtype([('f1', [('f1', '<i2')])])

Structured type, two fields: the first field contains an unsigned int, the second an int32:

>>> np.dtype([('f1', np.uint64), ('f2', np.int32)])
dtype([('f1', '<u8'), ('f2', '<i4')])

Using array-protocol type strings:

>>> np.dtype([('a','f8'),('b','S10')])
dtype([('a', '<f8'), ('b', 'S10')])

Using comma-separated field formats. The shape is (2,3):

>>> np.dtype("i4, (2,3)f8")
dtype([('f0', '<i4'), ('f1', '<f8', (2, 3))])

Using tuples. int is a fixed type, 3 the field's shape. void is a flexible type, here of size 10:

>>> np.dtype([('hello',(np.int64,3)),('world',np.void,10)])
dtype([('hello', '<i8', (3,)), ('world', 'V10')])

Subdivide int16 into 2 int8's, called x and y. 0 and 1 are the offsets in bytes:

>>> np.dtype((np.int16, {'x':(np.int8,0), 'y':(np.int8,1)}))
dtype((numpy.int16, [('x', 'i1'), ('y', 'i1')]))

Using dictionaries. Two fields named 'gender' and 'age':

>>> np.dtype({'names':['gender','age'], 'formats':['S1',np.uint8]})
dtype([('gender', 'S1'), ('age', 'u1')])

Offsets in bytes, here 0 and 25:

>>> np.dtype({'surname':('S25',0),'age':(np.uint8,25)})
dtype([('surname', 'S25'), ('age', 'u1')])
var dataset_geo_transform
var dataset_height
var dataset_prefix
var dataset_width
var gdal_driver
var no_data_value
var print_info
var rio_kwargs
var tile_size
var time_format
var workflow : Optional[Workflow]

Methods

def close_current_dataset(self)

Close the current dataset

Expand source code
def close_current_dataset(self):
    ''' Close the current dataset '''
    if self.current_dataset:
        del self.current_dataset
        self.current_dataset = None
def create_tiling_geo_transform_width_height(self, query: QueryRectangle)

Create the tiling geo transform, width and height for the current query.

Expand source code
def create_tiling_geo_transform_width_height(self, query: QueryRectangle):
    ''' Create the tiling geo transform, width and height for the current query.'''

    ul_x = query.spatial_bounds.xmin
    ul_y = query.spatial_bounds.ymax
    lr_x = query.spatial_bounds.xmax
    lr_y = query.spatial_bounds.ymin
    res_x = query.spatial_resolution.x_resolution
    res_y = query.spatial_resolution.y_resolution * -1  # honor the fact that the y axis is flipped

    assert res_y < 0, "The y resolution must be negative"

    assert ul_x < lr_x, "The upper left x coordinate must be smaller than the lower right x coordinate"
    assert ul_y > lr_y, "The upper left y coordinate must be greater than the lower right y coordinate"

    ul_pixel_x = ul_x / res_x  # we can assume that the global origin is 0,0
    ul_pixel_y = ul_y / res_y
    lr_pixel_x = lr_x / res_x
    lr_pixel_y = lr_y / res_y

    assert ul_pixel_x < lr_pixel_x, "The upper left pixel x must be smaller than the lower right pixel x"
    assert ul_pixel_y < lr_pixel_y, "The upper left pixel y must be smaller than the lower right pixel y"

    tiling_ul_pixel_x = (ul_pixel_x // self.tile_size) * self.tile_size
    if ul_pixel_x % self.tile_size != 0:
        tiling_ul_pixel_x = ((ul_pixel_x // self.tile_size) - 1) * self.tile_size

    tiling_ul_pixel_y = (ul_pixel_y // self.tile_size) * self.tile_size
    if ul_pixel_y % self.tile_size != 0:
        tiling_ul_pixel_y = ((ul_pixel_y // self.tile_size) - 1) * self.tile_size

    assert tiling_ul_pixel_x <= ul_pixel_x, "Tiling upper left x pixel must be smaller than upper left x coordinate"
    assert tiling_ul_pixel_y <= ul_pixel_y, "Tiling upper left y pixel must be smaller than upper left y coordinate"

    width = int((lr_pixel_x - tiling_ul_pixel_x))
    if width % self.tile_size != 0:
        width = int((width // self.tile_size + 1) * self.tile_size)
    assert width > 0, "The width must be greater than 0"

    height = int((lr_pixel_y - tiling_ul_pixel_y))
    if height % self.tile_size != 0:
        height = int((height // self.tile_size + 1) * self.tile_size)
    assert height > 0, "The height must be greater than 0"

    assert width % self.tile_size == 0, "The width must be a multiple of the tile size"
    assert height % self.tile_size == 0, "The height must be a multiple of the tile size"

    tiling_ul_x_coord = tiling_ul_pixel_x * res_x
    tiling_ul_y_coord = tiling_ul_pixel_y * res_y
    assert tiling_ul_x_coord <= ul_x, "Tiling upper left x coordinate must be smaller than upper left x coordinate"
    assert tiling_ul_y_coord >= ul_y, "Tiling upper left y coordinate must be greater than upper left y coordinate"

    geo_transform = [tiling_ul_x_coord, res_x, 0., tiling_ul_y_coord, 0., res_y]

    if self.dataset_geo_transform is None:
        self.dataset_geo_transform = geo_transform
    else:
        assert self.dataset_geo_transform == geo_transform, "Can not change the geo transform of the dataset"

    if self.dataset_width is None:
        self.dataset_width = width
    else:
        assert self.dataset_width == width, "The width of the current dataset does not match the new one"

    if self.dataset_height is None:
        self.dataset_height = height
    else:
        assert self.dataset_height == height, "The height of the current dataset does not match the new one"
async def query_and_write(self, query: QueryRectangle)

Query the raster workflow and write the tiles to the dataset.

Expand source code
async def query_and_write(self, query: QueryRectangle):
    ''' Query the raster workflow and write the tiles to the dataset.'''

    self.create_tiling_geo_transform_width_height(query)

    assert self.workflow is not None, "The workflow must be set"

    try:
        async for tile in self.workflow.raster_stream(query):
            if self.current_time != tile.time:
                self.close_current_dataset()
                self.current_time = tile.time
                self.__create_new_dataset(query)

            assert self.current_time == tile.time, "The time of the current dataset does not match the tile"
            assert self.dataset_geo_transform is not None, "The geo transform must be set"

            tile_ul_x = int(
                (tile.geo_transform.x_min - self.dataset_geo_transform[0]) / self.dataset_geo_transform[1]
            )
            tile_ul_y = int(
                (tile.geo_transform.y_max - self.dataset_geo_transform[3]) / self.dataset_geo_transform[5]
            )

            band_index = tile.band + 1
            data = tile.to_numpy_data_array(self.no_data_value)

            assert self.tile_size == tile.size_x == tile.size_y, "Tile size does not match the expected size"
            window = rio.windows.Window(tile_ul_x, tile_ul_y, tile.size_x, tile.size_y)
            assert self.current_dataset is not None, "Dataset must be open."
            self.current_dataset.write(data, window=window, indexes=band_index)
    except Exception as inner_e:
        raise RuntimeError(f"Tile at {tile.spatial_partition().as_bbox_str()} with {tile.time}") from inner_e

    finally:
        self.close_current_dataset()