Source code for zen3geo.datapipes.stackstac

"""
DataPipes for :doc:`stackstac <stackstac:index>`.
"""
from typing import Any, Dict, Iterator, Optional

import xarray as xr

try:
    import stackstac
except ImportError:
    stackstac = None
from torchdata.datapipes import functional_datapipe
from torchdata.datapipes.iter import IterDataPipe


[docs] @functional_datapipe("mosaic_dataarray") class StackSTACMosaickerIterDataPipe(IterDataPipe[xr.DataArray]): """ Takes :py:class:`xarray.DataArray` objects, flattens a dimension by picking the first valid pixel, to yield mosaicked :py:class:`xarray.DataArray` objects (functional name: ``mosaic_dataarray``). Parameters ---------- source_datapipe : IterDataPipe[xarray.DataArray] A DataPipe that contains :py:class:`xarray.DataArray` objects, with e.g. dimensions ("time", "band", "y", "x"). kwargs : Optional Extra keyword arguments to pass to :py:func:`stackstac.mosaic`. Yields ------ dataarray : xarray.DataArray An :py:class:`xarray.DataArray` that has been mosaicked with e.g. dimensions ("band", "y", "x"). Raises ------ ModuleNotFoundError If ``stackstac`` is not installed. See :doc:`install instructions for stackstac <stackstac:index>`, (e.g. via ``pip install stackstac``) before using this class. Example ------- >>> import pytest >>> import xarray as xr >>> pystac = pytest.importorskip("pystac") >>> stackstac = pytest.importorskip("stackstac") ... >>> from torchdata.datapipes.iter import IterableWrapper >>> from zen3geo.datapipes import StackSTACMosaicker ... >>> # Get list of ALOS DEM tiles to mosaic together later >>> item_urls = [ ... "https://planetarycomputer.microsoft.com/api/stac/v1/collections/alos-dem/items/ALPSMLC30_N022E113_DSM", ... "https://planetarycomputer.microsoft.com/api/stac/v1/collections/alos-dem/items/ALPSMLC30_N022E114_DSM", ... ] >>> stac_items = [pystac.Item.from_file(href=url) for url in item_urls] >>> dataarray = stackstac.stack(items=stac_items) >>> assert dataarray.sizes == {'time': 2, 'band': 1, 'y': 3600, 'x': 7200} ... >>> # Mosaic different tiles in an xarray.DataArray using DataPipe >>> dp = IterableWrapper(iterable=[dataarray]) >>> dp_mosaic = dp.mosaic_dataarray() ... >>> # Loop or iterate over the DataPipe stream >>> it = iter(dp_mosaic) >>> dataarray = next(it) >>> print(dataarray.sizes) Frozen({'band': 1, 'y': 3600, 'x': 7200}) >>> print(dataarray.coords) Coordinates: * band (band) <U4 'data' * x (x) float64 113.0 113.0 113.0 113.0 ... 115.0 115.0 115.0 115.0 * y (y) float64 23.0 23.0 23.0 23.0 23.0 ... 22.0 22.0 22.0 22.0 ... >>> print(dataarray.attrs["spec"]) RasterSpec(epsg=4326, bounds=(113.0, 22.0, 115.0, 23.0), resolutions_xy=(0.0002777777777777778, 0.0002777777777777778)) """ def __init__( self, source_datapipe: IterDataPipe[xr.DataArray], **kwargs: Optional[Dict[str, Any]] ) -> None: if stackstac is None: raise ModuleNotFoundError( "Package `stackstac` is required to be installed to use this datapipe. " "Please use `pip install stackstac` or " "`conda install -c conda-forge stackstac` " "to install the package" ) self.source_datapipe: IterDataPipe = source_datapipe self.kwargs = kwargs def __iter__(self) -> Iterator[xr.DataArray]: for dataarray in self.source_datapipe: yield stackstac.mosaic(arr=dataarray, **self.kwargs) def __len__(self) -> int: return len(self.source_datapipe)
[docs] @functional_datapipe("stack_stac_items") class StackSTACStackerIterDataPipe(IterDataPipe[xr.DataArray]): """ Takes :py:class:`pystac.Item` objects, reprojects them to the same grid and stacks them along time, to yield :py:class:`xarray.DataArray` objects (functional name: ``stack_stac_items``). Parameters ---------- source_datapipe : IterDataPipe[pystac.Item] A DataPipe that contains :py:class:`pystac.Item` objects. kwargs : Optional Extra keyword arguments to pass to :py:func:`stackstac.stack`. Yields ------ datacube : xarray.DataArray An :py:class:`xarray.DataArray` backed by a :py:class:`dask.array.Array` containing the time-series datacube. The dimensions will be ("time", "band", "y", "x"). Raises ------ ModuleNotFoundError If ``stackstac`` is not installed. See :doc:`install instructions for stackstac <stackstac:index>`, (e.g. via ``pip install stackstac``) before using this class. Example ------- >>> import pytest >>> pystac = pytest.importorskip("pystac") >>> stacstac = pytest.importorskip("stackstac") ... >>> from torchdata.datapipes.iter import IterableWrapper >>> from zen3geo.datapipes import StackSTACStacker ... >>> # Stack different bands in a STAC Item using DataPipe >>> item_url: str = "https://planetarycomputer.microsoft.com/api/stac/v1/collections/sentinel-1-grd/items/S1A_IW_GRDH_1SDV_20220914T093226_20220914T093252_044999_056053" >>> stac_item = pystac.Item.from_file(href=item_url) >>> dp = IterableWrapper(iterable=[stac_item]) >>> dp_stackstac = dp.stack_stac_items( ... assets=["vh", "vv"], epsg=32652, resolution=10 ... ) ... >>> # Loop or iterate over the DataPipe stream >>> it = iter(dp_stackstac) >>> dataarray = next(it) >>> print(dataarray.sizes) Frozen({'time': 1, 'band': 2, 'y': 20686, 'x': 28043}) >>> print(dataarray.coords) Coordinates: * time (time) datetime64[ns] 2022-09-14T0... id (time) <U62 'S1A_IW_GRDH_1SDV_2022... * band (band) <U2 'vh' 'vv' * x (x) float64 1.354e+05 ... 4.158e+05 * y (y) float64 4.305e+06 ... 4.098e+06 ... >>> print(dataarray.attrs["spec"]) RasterSpec(epsg=32652, bounds=(135370, 4098080, 415800, 4304940), resolutions_xy=(10, 10)) """ def __init__( self, source_datapipe: IterDataPipe, **kwargs: Optional[Dict[str, Any]] ) -> None: if stackstac is None: raise ModuleNotFoundError( "Package `stackstac` is required to be installed to use this datapipe. " "Please use `pip install stackstac` or " "`conda install -c conda-forge stackstac` " "to install the package" ) self.source_datapipe: IterDataPipe = source_datapipe self.kwargs = kwargs def __iter__(self) -> Iterator[xr.DataArray]: for stac_items in self.source_datapipe: yield stackstac.stack(items=stac_items, **self.kwargs) def __len__(self) -> int: return len(self.source_datapipe)