Source code for zen3geo.datapipes.pystac_client

"""
DataPipes for :doc:`pystac-client <pystac_client:index>`.
"""
from typing import Any, Dict, Iterator, Optional

try:
    import pystac_client
except ImportError:
    pystac_client = None
from torchdata.datapipes import functional_datapipe
from torchdata.datapipes.iter import IterDataPipe


[docs] @functional_datapipe("search_for_pystac_item") class PySTACAPISearcherIterDataPipe(IterDataPipe): """ Takes dictionaries containing a STAC API query (as long as the parameters are understood by :py:meth:`pystac_client.Client.search`) and yields :py:class:`pystac_client.ItemSearch` objects (functional name: ``search_for_pystac_item``). Parameters ---------- source_datapipe : IterDataPipe[dict] A DataPipe that contains STAC API query parameters in the form of a Python dictionary to pass to :py:meth:`pystac_client.Client.search`. For example: - **bbox** - A list, tuple, or iterator representing a bounding box of 2D or 3D coordinates. Results will be filtered to only those intersecting the bounding box. - **datetime** - Either a single datetime or datetime range used to filter results. You may express a single datetime using a :py:class:`datetime.datetime` instance, a `RFC 3339-compliant <https://tools.ietf.org/html/rfc3339>`_ timestamp, or a simple date string. - **collections** - List of one or more Collection IDs or :py:class:`pystac.Collection` instances. Only Items in one of the provided Collections will be searched. catalog_url : str The URL of a STAC Catalog. kwargs : Optional Extra keyword arguments to pass to :py:meth:`pystac_client.Client.open`. For example: - **headers** - A dictionary of additional headers to use in all requests made to any part of this Catalog/API. - **parameters** - Optional dictionary of query string parameters to include in all requests. - **modifier** - A callable that modifies the children collection and items returned by this Client. This can be useful for injecting authentication parameters into child assets to access data from non-public sources. Yields ------ item_search : pystac_client.ItemSearch A :py:class:`pystac_client.ItemSearch` object instance that represents a deferred query to a STAC search endpoint as described in the `STAC API - Item Search spec <https://github.com/radiantearth/stac-api-spec/tree/main/item-search>`_. Raises ------ ModuleNotFoundError If ``pystac_client`` is not installed. See :doc:`install instructions for pystac-client <pystac_client:index>`, (e.g. via ``pip install pystac-client``) before using this class. Example ------- >>> import pytest >>> pystac_client = pytest.importorskip("pystac_client") ... >>> from torchdata.datapipes.iter import IterableWrapper >>> from zen3geo.datapipes import PySTACAPISearcher ... >>> # Peform STAC API query using DataPipe >>> query = dict( ... bbox=[174.5, -41.37, 174.9, -41.19], # xmin, ymin, xmax, ymax ... datetime=["2012-02-20T00:00:00Z", "2022-12-22T00:00:00Z"], ... collections=["cop-dem-glo-30"], ... ) >>> dp = IterableWrapper(iterable=[query]) >>> dp_pystac_client = dp.search_for_pystac_item( ... catalog_url="https://planetarycomputer.microsoft.com/api/stac/v1", ... # modifier=planetary_computer.sign_inplace, ... ) >>> # Loop or iterate over the DataPipe stream >>> it = iter(dp_pystac_client) >>> stac_item_search = next(it) >>> stac_items = list(stac_item_search.items()) >>> stac_items [<Item id=Copernicus_DSM_COG_10_S42_00_E174_00_DEM>] >>> stac_items[0].properties # doctest: +NORMALIZE_WHITESPACE {'gsd': 30, 'datetime': '2021-04-22T00:00:00Z', 'platform': 'TanDEM-X', 'proj:epsg': 4326, 'proj:shape': [3600, 3600], 'proj:transform': [0.0002777777777777778, 0.0, 173.9998611111111, 0.0, -0.0002777777777777778, -40.99986111111111]} """ def __init__( self, source_datapipe: IterDataPipe[dict], catalog_url: str, **kwargs: Optional[Dict[str, Any]] ) -> None: if pystac_client is None: raise ModuleNotFoundError( "Package `pystac_client` is required to be installed to use this datapipe. " "Please use `pip install pystac-client` or " "`conda install -c conda-forge pystac-client` " "to install the package" ) self.source_datapipe: IterDataPipe[dict] = source_datapipe self.catalog_url: str = catalog_url self.kwargs = kwargs def __iter__(self) -> Iterator: catalog = pystac_client.Client.open(url=self.catalog_url, **self.kwargs) for query in self.source_datapipe: search = catalog.search(**query) yield search def __len__(self) -> int: return len(self.source_datapipe)
[docs] @functional_datapipe("list_pystac_items_by_search") class PySTACAPIItemListerIterDataPipe(IterDataPipe): """ Lists the :py:class:`pystac.Item` objects that match the provided STAC API search parameters (functional name: ``list_pystac_items_by_search``). Parameters ---------- source_datapipe : IterDataPipe[pystac_client.ItemSearch] A DataPipe that contains :py:class:`pystac_client.ItemSearch` object instances that represents a deferred query to a STAC search endpoint as described in the `STAC API - Item Search spec <https://github.com/radiantearth/stac-api-spec/tree/main/item-search>`_. Yields ------ stac_item : pystac.Item A :py:class:`pystac.Item` object containing the specific :py:class:`pystac.STACObject` implementation class represented in a JSON format. Raises ------ ModuleNotFoundError If ``pystac_client`` is not installed. See :doc:`install instructions for pystac-client <pystac_client:index>`, (e.g. via ``pip install pystac-client``) before using this class. Example ------- >>> import pytest >>> pystac_client = pytest.importorskip("pystac_client") ... >>> from torchdata.datapipes.iter import IterableWrapper >>> from zen3geo.datapipes import PySTACAPIItemLister ... >>> # List STAC Items from a STAC API query >>> catalog = pystac_client.Client.open( ... url="https://explorer.digitalearth.africa/stac/" ... ) >>> search = catalog.search( ... bbox=[57.2, -20.6, 57.9, -19.9], # xmin, ymin, xmax, ymax ... datetime=["2023-01-01T00:00:00Z", "2023-01-31T00:00:00Z"], ... collections=["s2_l2a"], ... ) >>> dp = IterableWrapper(iterable=[search]) >>> dp_pystac_item_list = dp.list_pystac_items_by_search() ... >>> # Loop or iterate over the DataPipe stream >>> it = iter(dp_pystac_item_list) >>> stac_item = next(it) >>> stac_item <Item id=ec16dbf6-9729-5a8f-9d72-5e83a8b9f30d> >>> stac_item.properties # doctest: +NORMALIZE_WHITESPACE {'title': 'S2B_MSIL2A_20230103T062449_N0509_R091_T40KED_20230103T075000', 'gsd': 10, 'proj:epsg': 32740, 'platform': 'sentinel-2b', 'view:off_nadir': 0, 'instruments': ['msi'], 'eo:cloud_cover': 0.02, 'odc:file_format': 'GeoTIFF', 'odc:region_code': '40KED', 'constellation': 'sentinel-2', 'sentinel:sequence': '0', 'sentinel:utm_zone': 40, 'sentinel:product_id': 'S2B_MSIL2A_20230103T062449_N0509_R091_T40KED_20230103T075000', 'sentinel:grid_square': 'ED', 'sentinel:data_coverage': 28.61, 'sentinel:latitude_band': 'K', 'created': '2023-01-03T06:24:53Z', 'sentinel:valid_cloud_cover': True, 'sentinel:boa_offset_applied': True, 'sentinel:processing_baseline': '05.09', 'proj:shape': [10980, 10980], 'proj:transform': [10.0, 0.0, 499980.0, 0.0, -10.0, 7900000.0, 0.0, 0.0, 1.0], 'cubedash:region_code': '40KED', 'datetime': '2023-01-03T06:24:53Z'} """ def __init__(self, source_datapipe): if pystac_client is None: raise ModuleNotFoundError( "Package `pystac_client` is required to be installed to use this datapipe. " "Please use `pip install pystac-client` or " "`conda install -c conda-forge pystac-client` " "to install the package" ) self.source_datapipe = source_datapipe def __iter__(self): for item_search in self.source_datapipe: yield from item_search.items() def __len__(self): return sum(item_search.matched() for item_search in self.source_datapipe)