Source code for zen3geo.datapipes.pystac
"""
DataPipes for :doc:`pystac <pystac:index>`.
"""
from typing import Any, Dict, Iterator, Optional
try:
import pystac
except ImportError:
pystac = None
from torchdata.datapipes import functional_datapipe
from torchdata.datapipes.iter import IterDataPipe
[docs]@functional_datapipe("read_to_pystac_item")
class PySTACItemReaderIterDataPipe(IterDataPipe):
"""
Takes files from local disk or URLs (as long as they can be read by pystac)
and yields :py:class:`pystac.Item` objects (functional name:
``read_to_pystac_item``).
Parameters
----------
source_datapipe : IterDataPipe[str]
A DataPipe that contains filepaths or URL links to STAC items.
kwargs : Optional
Extra keyword arguments to pass to :py:meth:`pystac.Item.from_file`.
Yields
------
stac_item : pystac.Item
A :py:class:`pystac.Item` object containing the specific
:py:class:`pystac.STACObject` implementation class represented in a
JSON format.
Raises
------
ModuleNotFoundError
If ``pystac`` is not installed. See
:doc:`install instructions for pystac <pystac:installation>`, (e.g. via
``pip install pystac``) before using this class.
Example
-------
>>> import pytest
>>> pystac = pytest.importorskip("pystac")
...
>>> from torchdata.datapipes.iter import IterableWrapper
>>> from zen3geo.datapipes import PySTACItemReader
...
>>> # Read in STAC Item using DataPipe
>>> item_url: str = "https://planetarycomputer.microsoft.com/api/stac/v1/collections/sentinel-2-l2a/items/S2A_MSIL2A_20220115T032101_R118_T48NUG_20220115T170435"
>>> dp = IterableWrapper(iterable=[item_url])
>>> dp_pystac = dp.read_to_pystac_item()
...
>>> # Loop or iterate over the DataPipe stream
>>> it = iter(dp_pystac)
>>> stac_item = next(it)
>>> stac_item.bbox
[103.20205689, 0.81602476, 104.18934086, 1.8096362]
>>> stac_item.properties # doctest: +NORMALIZE_WHITESPACE
{'datetime': '2022-01-15T03:21:01.024000Z',
'platform': 'Sentinel-2A',
'proj:epsg': 32648,
'instruments': ['msi'],
's2:mgrs_tile': '48NUG',
'constellation': 'Sentinel 2',
's2:granule_id': 'S2A_OPER_MSI_L2A_TL_ESRI_20220115T170436_A034292_T48NUG_N03.00',
'eo:cloud_cover': 17.352597,
's2:datatake_id': 'GS2A_20220115T032101_034292_N03.00',
's2:product_uri': 'S2A_MSIL2A_20220115T032101_N0300_R118_T48NUG_20220115T170435.SAFE',
's2:datastrip_id': 'S2A_OPER_MSI_L2A_DS_ESRI_20220115T170436_S20220115T033502_N03.00',
's2:product_type': 'S2MSI2A',
'sat:orbit_state': 'descending',
...
"""
def __init__(
self, source_datapipe: IterDataPipe[str], **kwargs: Optional[Dict[str, Any]]
) -> None:
if pystac is None:
raise ModuleNotFoundError(
"Package `pystac` is required to be installed to use this datapipe. "
"Please use `pip install pystac` or "
"`conda install -c conda-forge pystac` "
"to install the package"
)
self.source_datapipe: IterDataPipe[str] = source_datapipe
self.kwargs = kwargs
def __iter__(self) -> Iterator:
for href in self.source_datapipe:
yield pystac.Item.from_file(href=href, **self.kwargs)
def __len__(self) -> int:
return len(self.source_datapipe)