Source code for zen3geo.datapipes.rioxarray
"""
DataPipes for :doc:`rioxarray <rioxarray:index>`.
"""
from typing import Any, Dict, Iterator, Optional
import rioxarray
from torchdata.datapipes import functional_datapipe
from torchdata.datapipes.iter import IterDataPipe
from torchdata.datapipes.utils import StreamWrapper
[docs]
@functional_datapipe("read_from_rioxarray")
class RioXarrayReaderIterDataPipe(IterDataPipe[StreamWrapper]):
"""
Takes raster files (e.g. GeoTIFFs) from local disk or URLs
(as long as they can be read by rioxarray and/or rasterio)
and yields :py:class:`xarray.DataArray` objects (functional name:
``read_from_rioxarray``).
Based on
https://github.com/pytorch/data/blob/v0.4.0/torchdata/datapipes/iter/load/online.py#L55-L96
Parameters
----------
source_datapipe : IterDataPipe[str]
A DataPipe that contains filepaths or URL links to raster files such as
GeoTIFFs.
kwargs : Optional
Extra keyword arguments to pass to :py:func:`rioxarray.open_rasterio`
and/or :py:func:`rasterio.open`.
Yields
------
stream_obj : xarray.DataArray
An :py:class:`xarray.DataArray` object containing the raster data.
Example
-------
>>> from torchdata.datapipes.iter import IterableWrapper
>>> from zen3geo.datapipes import RioXarrayReader
...
>>> # Read in GeoTIFF data using DataPipe
>>> file_url: str = "https://github.com/GenericMappingTools/gmtserver-admin/raw/master/cache/earth_day_HD.tif"
>>> dp = IterableWrapper(iterable=[file_url])
>>> dp_rioxarray = dp.read_from_rioxarray()
...
>>> # Loop or iterate over the DataPipe stream
>>> it = iter(dp_rioxarray)
>>> dataarray = next(it)
>>> dataarray.encoding["source"]
'https://github.com/GenericMappingTools/gmtserver-admin/raw/master/cache/earth_day_HD.tif'
>>> dataarray
StreamWrapper<<xarray.DataArray (band: 1, y: 960, x: 1920)>
[1843200 values with dtype=uint8]
Coordinates:
* band (band) int64 1
* x (x) float64 -179.9 -179.7 -179.5 -179.3 ... 179.5 179.7 179.9
* y (y) float64 89.91 89.72 89.53 89.34 ... -89.53 -89.72 -89.91
spatial_ref int64 0
...
"""
def __init__(
self, source_datapipe: IterDataPipe[str], **kwargs: Optional[Dict[str, Any]]
) -> None:
self.source_datapipe: IterDataPipe[str] = source_datapipe
self.kwargs = kwargs
def __iter__(self) -> Iterator[StreamWrapper]:
for filename in self.source_datapipe:
yield StreamWrapper(
rioxarray.open_rasterio(filename=filename, **self.kwargs)
)
def __len__(self) -> int:
return len(self.source_datapipe)