Source code for zen3geo.datapipes.pystac_client

"""
DataPipes for :doc:`pystac-client <pystac_client:index>`.
"""
from typing import Any, Dict, Iterator, Optional

try:
    import pystac_client
except ImportError:
    pystac_client = None
from torchdata.datapipes import functional_datapipe
from torchdata.datapipes.iter import IterDataPipe


[docs]@functional_datapipe("search_for_pystac_item") class PySTACAPISearcherIterDataPipe(IterDataPipe): """ Takes dictionaries containing a STAC API query (as long as the parameters are understood by :py:meth:`pystac_client.Client.search`) and yields :py:class:`pystac_client.ItemSearch` objects (functional name: ``search_for_pystac_item``). Parameters ---------- source_datapipe : IterDataPipe[dict] A DataPipe that contains STAC API query parameters in the form of a Python dictionary to pass to :py:meth:`pystac_client.Client.search`. For example: - **bbox** - A list, tuple, or iterator representing a bounding box of 2D or 3D coordinates. Results will be filtered to only those intersecting the bounding box. - **datetime** - Either a single datetime or datetime range used to filter results. You may express a single datetime using a :py:class:`datetime.datetime` instance, a `RFC 3339-compliant <https://tools.ietf.org/html/rfc3339>`_ timestamp, or a simple date string. - **collections** - List of one or more Collection IDs or :py:class:`pystac.Collection` instances. Only Items in one of the provided Collections will be searched. catalog_url : str The URL of a STAC Catalog. If not specified, this will use the ``STAC_URL`` environment variable. kwargs : Optional Extra keyword arguments to pass to :py:meth:`pystac_client.Client.open`. For example: - **headers** - A dictionary of additional headers to use in all requests made to any part of this Catalog/API. - **parameters** - Optional dictionary of query string parameters to include in all requests. - **modifier** - A callable that modifies the children collection and items returned by this Client. This can be useful for injecting authentication parameters into child assets to access data from non-public sources. Yields ------ item_search : pystac_client.ItemSearch A :py:class:`pystac_client.ItemSearch` object instance that represents a deferred query to a STAC search endpoint as described in the `STAC API - Item Search spec <https://github.com/radiantearth/stac-api-spec/tree/main/item-search>`_. Raises ------ ModuleNotFoundError If ``pystac_client`` is not installed. See :doc:`install instructions for pystac-client <pystac_client:index>`, (e.g. via ``pip install pystac-client``) before using this class. Example ------- >>> import pytest >>> pystac_client = pytest.importorskip("pystac_client") ... >>> from torchdata.datapipes.iter import IterableWrapper >>> from zen3geo.datapipes import PySTACAPISearcher ... >>> # Peform STAC API query using DataPipe >>> query = dict( ... bbox=[174.5, -41.37, 174.9, -41.19], ... datetime=["2012-02-20T00:00:00Z", "2022-12-22T00:00:00Z"], ... collections=["cop-dem-glo-30"], ... ) >>> dp = IterableWrapper(iterable=[query]) >>> dp_pystac_client = dp.search_for_pystac_item( ... catalog_url="https://planetarycomputer.microsoft.com/api/stac/v1", ... # modifier=planetary_computer.sign_inplace, ... ) >>> # Loop or iterate over the DataPipe stream >>> it = iter(dp_pystac_client) >>> stac_item_search = next(it) >>> stac_items = list(stac_item_search.items()) >>> stac_items [<Item id=Copernicus_DSM_COG_10_S42_00_E174_00_DEM>] >>> stac_items[0].properties # doctest: +NORMALIZE_WHITESPACE {'gsd': 30, 'datetime': '2021-04-22T00:00:00Z', 'platform': 'TanDEM-X', 'proj:epsg': 4326, 'proj:shape': [3600, 3600], 'proj:transform': [0.0002777777777777778, 0.0, 173.9998611111111, 0.0, -0.0002777777777777778, -40.99986111111111]} """ def __init__( self, source_datapipe: IterDataPipe[dict], catalog_url: str, **kwargs: Optional[Dict[str, Any]] ) -> None: if pystac_client is None: raise ModuleNotFoundError( "Package `pystac_client` is required to be installed to use this datapipe. " "Please use `pip install pystac-client` or " "`conda install -c conda-forge pystac-client` " "to install the package" ) self.source_datapipe: IterDataPipe[dict] = source_datapipe self.catalog_url: str = catalog_url self.kwargs = kwargs def __iter__(self) -> Iterator: catalog = pystac_client.Client.open(url=self.catalog_url, **self.kwargs) for query in self.source_datapipe: search = catalog.search(**query) yield search def __len__(self) -> int: return len(self.source_datapipe)