Skip to content

Commit b85ec7a

Browse files
author
Phil Varner
authored
replace use of fsspec for downloading Item Assets with stac-asset (#120)
* replace use of fsspec for downloading Item Assets with stac-asset * changelog
1 parent 7e66cf5 commit b85ec7a

File tree

8 files changed

+251
-156
lines changed

8 files changed

+251
-156
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
1919

2020
## Changed
2121

22+
- Replaced the use of fsspec with stac-asset for downloading Item Assets
2223
- `--local` flag no longer turns off validation
2324
- The `processing:software` field is no longer added to Items by default. This is
2425
because the intention of the STAC Processing Extension is to add metadata about the

README.md

Lines changed: 118 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
<!-- omit from toc -->
12
# STAC Task (stac-task)
23

34
[![Build Status](https://github.com/stac-utils/stac-task/workflows/CI/badge.svg?branch=main)](https://github.com/stac-utils/stac-task/actions/workflows/continuous-integration.yml)
@@ -6,6 +7,20 @@
67
[![codecov](https://codecov.io/gh/stac-utils/stac-task/branch/main/graph/badge.svg)](https://codecov.io/gh/stac-utils/stac-task)
78
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
89

10+
- [Quickstart for Creating New Tasks](#quickstart-for-creating-new-tasks)
11+
- [Task Input](#task-input)
12+
- [ProcessDefinition Object](#processdefinition-object)
13+
- [UploadOptions Object](#uploadoptions-object)
14+
- [path\_template](#path_template)
15+
- [collections](#collections)
16+
- [tasks](#tasks)
17+
- [TaskConfig Object](#taskconfig-object)
18+
- [Full Process Definition Example](#full-process-definition-example)
19+
- [Migration](#migration)
20+
- [0.4.x -\> 0.5.x](#04x---05x)
21+
- [Development](#development)
22+
- [Contributing](#contributing)
23+
924
This Python library consists of the Task class, which is used to create custom tasks based
1025
on a "STAC In, STAC Out" approach. The Task class acts as wrapper around custom code and provides
1126
several convenience methods for modifying STAC Items, creating derived Items, and providing a CLI.
@@ -17,7 +32,7 @@ This library is based on a [branch of cirrus-lib](https://github.com/cirrus-geo/
1732
```python
1833
from typing import Any
1934

20-
from stactask import Task
35+
from stactask import Task, DownloadConfig
2136

2237
class MyTask(Task):
2338
name = "my-task"
@@ -30,7 +45,10 @@ class MyTask(Task):
3045
item = self.items[0]
3146

3247
# download a datafile
33-
item = self.download_item_assets(item, assets=['data'])
48+
item = self.download_item_assets(
49+
item,
50+
config=DownloadConfig(include=['data'])
51+
)
3452

3553
# operate on the local file to create a new asset
3654
item = self.upload_item_assets_to_s3(item)
@@ -41,32 +59,32 @@ class MyTask(Task):
4159

4260
## Task Input
4361

44-
| Field Name | Type | Description |
45-
| ------------- | ---- | ----------- |
46-
| type | string | Must be FeatureCollection |
47-
| features | [Item] | A list of STAC `Item` |
48-
| process | ProcessDefinition | A Process Definition |
62+
| Field Name | Type | Description |
63+
| ---------- | ----------------- | ------------------------- |
64+
| type | string | Must be FeatureCollection |
65+
| features | [Item] | A list of STAC `Item` |
66+
| process | ProcessDefinition | A Process Definition |
4967

5068
### ProcessDefinition Object
5169

5270
A STAC task can be provided additional configuration via the 'process' field in the input
5371
ItemCollection.
5472

55-
| Field Name | Type | Description |
56-
| ------------- | ---- | ----------- |
57-
| description | string | Optional description of the process configuration |
58-
| upload_options | UploadOptions | Options used when uploading assets to a remote server |
59-
| tasks | Map<str, Map> | Dictionary of task configurations. A List of [task configurations](#taskconfig-object) is supported for backwards compatibility reasons, but a dictionary should be preferred. |
73+
| Field Name | Type | Description |
74+
| -------------- | ------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
75+
| description | string | Optional description of the process configuration |
76+
| upload_options | UploadOptions | Options used when uploading assets to a remote server |
77+
| tasks | Map<str, Map> | Dictionary of task configurations. A list of [task configurations](#taskconfig-object) is supported for backwards compatibility reasons, but a dictionary should be preferred. |
6078

6179
#### UploadOptions Object
6280

63-
| Field Name | Type | Description |
64-
| ------------- | ---- | ----------- |
65-
| path_template | string | **REQUIRED** A string template for specifying the location of uploaded assets |
66-
| public_assets | [str] | A list of asset keys that should be marked as public when uploaded |
67-
| headers | Map<str, str> | A set of key, value headers to send when uploading data to s3 |
68-
| collections | Map<str, str> | A mapping of output collection name to a JSONPath pattern (for matching Items) |
69-
| s3_urls | bool | Controls if the final published URLs should be an s3 (s3://*bucket*/*key*) or https URL |
81+
| Field Name | Type | Description |
82+
| ------------- | ------------- | --------------------------------------------------------------------------------------- |
83+
| path_template | string | **REQUIRED** A string template for specifying the location of uploaded assets |
84+
| public_assets | [str] | A list of asset keys that should be marked as public when uploaded |
85+
| headers | Map<str, str> | A set of key, value headers to send when uploading data to s3 |
86+
| collections | Map<str, str> | A mapping of output collection name to a JSONPath pattern (for matching Items) |
87+
| s3_urls | bool | Controls if the final published URLs should be an s3 (s3://*bucket*/*key*) or https URL |
7088

7189
##### path_template
7290

@@ -121,10 +139,10 @@ would have `param2=value2` passed. If there were a `task-b` to be run it would n
121139

122140
A Task Configuration contains information for running a specific task.
123141

124-
| Field Name | Type | Description |
125-
| ------------- | ---- | ----------- |
126-
| name | str | **REQUIRED** Name of the task |
127-
| parameters | Map<str, str> | Dictionary of keyword parameters that will be passed to the Tasks `process` function |
142+
| Field Name | Type | Description |
143+
| ---------- | ------------- | ------------------------------------------------------------------------------------ |
144+
| name | str | **REQUIRED** Name of the task |
145+
| parameters | Map<str, str> | Dictionary of keyword parameters that will be passed to the Tasks `process` function |
128146

129147
## Full Process Definition Example
130148

@@ -147,6 +165,83 @@ Process definitions are sometimes called "Payloads":
147165
}
148166
```
149167

168+
## Migration
169+
170+
### 0.4.x -> 0.5.x
171+
172+
In 0.5.0, the previous use of fsspec to download Item Assets has been replaced with
173+
the stac-asset library. This has necessitated a change in the parameters
174+
that the download methods accept.
175+
176+
The primary change is that the Task methods `download_item_assets` and
177+
`download_items_assets` (items plural) now accept fewer explicit and implicit
178+
(kwargs) parameters.
179+
180+
Previously, the methods looked like:
181+
182+
```python
183+
def download_item_assets(
184+
self,
185+
item: Item,
186+
path_template: str = "${collection}/${id}",
187+
keep_original_filenames: bool = False,
188+
**kwargs: Any,
189+
) -> Item:
190+
```
191+
192+
but now look like:
193+
194+
```python
195+
def download_item_assets(
196+
self,
197+
item: Item,
198+
path_template: str = "${collection}/${id}",
199+
config: Optional[DownloadConfig] = None,
200+
) -> Item:
201+
```
202+
203+
Similarly, the `asset_io` package methods were previously:
204+
205+
```python
206+
async def download_item_assets(
207+
item: Item,
208+
assets: Optional[list[str]] = None,
209+
save_item: bool = True,
210+
overwrite: bool = False,
211+
path_template: str = "${collection}/${id}",
212+
absolute_path: bool = False,
213+
keep_original_filenames: bool = False,
214+
**kwargs: Any,
215+
) -> Item:
216+
```
217+
218+
and are now:
219+
220+
```python
221+
async def download_item_assets(
222+
item: Item,
223+
path_template: str = "${collection}/${id}",
224+
config: Optional[DownloadConfig] = None,
225+
) -> Item:
226+
```
227+
228+
Additionally, `kwargs` keys were set to pass configuration through to fsspec. The most common
229+
parameter was `requester_pays`, to set the Requester Pays flag in AWS S3 requests.
230+
231+
Many of these parameters can be directly translated into configuration passed in a
232+
`DownloadConfig` object, which is just a wrapper over the `stac_asset.Config` object.
233+
234+
Migration of these various parameters to `DownloadConfig` are as follows:
235+
236+
- `assets`: set `include`
237+
- `requester_pays`: set `s3_requester_pays` = True
238+
- `keep_original_filenames`: set `file_name_strategy` to
239+
`FileNameStrategy.FILE_NAME` if True or `FileNameStrategy.KEY` if False
240+
- `overwrite`: set `overwrite`
241+
- `save_item`: none, Item is always saved
242+
- `absolute_path`: none. To create or retrieve the Asset hrefs as absolute paths, use either
243+
`Item#make_all_asset_hrefs_absolute()` or `Asset#get_absolute_href()`
244+
150245
## Development
151246

152247
Clone, install in editable mode with development requirements, and install the **pre-commit** hooks:

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ dependencies = [
2424
"python-dateutil>=2.7.0",
2525
"boto3-utils>=0.3.2",
2626
"fsspec>=2022.8.2",
27+
"stac-asset>=0.3.0",
2728
"jsonpath_ng>=1.5.3",
2829
"requests>=2.28.1",
2930
"s3fs>=2022.8.2",
@@ -37,6 +38,7 @@ dev = [
3738
"pre-commit~=3.7",
3839
"ruff~=0.4.1",
3940
"types-setuptools~=69.0",
41+
"boto3-stubs",
4042
]
4143
test = [
4244
"pytest~=8.0",

stactask/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
# package is not installed
77
pass
88

9+
from .config import DownloadConfig
910
from .task import Task
1011

11-
__all__ = ["Task"]
12+
__all__ = ["Task", "DownloadConfig"]

stactask/asset_io.py

Lines changed: 32 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,102 +1,55 @@
11
import asyncio
22
import logging
3-
import os
43
from os import path as op
54
from typing import Any, Iterable, Optional, Union
6-
from urllib.parse import urlparse
75

8-
import fsspec
6+
import stac_asset
97
from boto3utils import s3
10-
from fsspec import AbstractFileSystem
118
from pystac import Item
129
from pystac.layout import LayoutTemplate
1310

11+
from .config import DownloadConfig
12+
1413
logger = logging.getLogger(__name__)
1514

1615
# global dictionary of sessions per bucket
1716
global_s3_client = s3()
1817

19-
SIMULTANEOUS_DOWNLOADS = int(os.getenv("STAC_SIMULTANEOUS_DOWNLOADS", 3))
20-
sem = asyncio.Semaphore(SIMULTANEOUS_DOWNLOADS)
21-
22-
23-
async def download_file(fs: AbstractFileSystem, src: str, dest: str) -> None:
24-
async with sem:
25-
logger.debug(f"{src} start")
26-
if hasattr(fs, "_get_file"):
27-
await fs._get_file(src, dest)
28-
elif hasattr(fs, "get_file"):
29-
fs.get_file(src, dest)
30-
else:
31-
raise NotImplementedError(
32-
"stac-task only supports filesystems providing"
33-
" `get_file` or `_get_file` interface"
34-
)
35-
logger.debug(f"{src} completed")
36-
3718

3819
async def download_item_assets(
3920
item: Item,
40-
assets: Optional[list[str]] = None,
41-
save_item: bool = True,
42-
overwrite: bool = False,
4321
path_template: str = "${collection}/${id}",
44-
absolute_path: bool = False,
45-
keep_original_filenames: bool = False,
46-
**kwargs: Any,
22+
config: Optional[DownloadConfig] = None,
23+
keep_non_downloaded: bool = True,
4724
) -> Item:
48-
_assets = item.assets.keys() if assets is None else assets
49-
50-
# determine path from template and item
51-
layout = LayoutTemplate(path_template)
52-
path = layout.substitute(item)
53-
54-
# make necessary directories
55-
os.makedirs(path, exist_ok=True)
25+
return await stac_asset.download_item(
26+
item=item.clone(),
27+
directory=LayoutTemplate(path_template).substitute(item),
28+
file_name="item.json",
29+
config=config,
30+
keep_non_downloaded=keep_non_downloaded,
31+
)
5632

57-
new_item = item.clone()
58-
59-
tasks = []
60-
for a in _assets:
61-
if a not in item.assets:
62-
continue
63-
href = item.assets[a].href
6433

65-
# local filename
66-
url_path = urlparse(href).path
67-
if keep_original_filenames:
68-
basename = os.path.basename(url_path)
69-
else:
70-
basename = a + os.path.splitext(url_path)[1]
71-
new_href = os.path.join(path, basename)
72-
if absolute_path:
73-
new_href = os.path.abspath(new_href)
74-
75-
# save file
76-
if not os.path.exists(new_href) or overwrite:
77-
fs = fsspec.core.url_to_fs(href, asynchronous=True, **kwargs)[0]
78-
task = asyncio.create_task(download_file(fs, href, new_href))
79-
tasks.append(task)
80-
81-
# update
82-
new_item.assets[a].href = new_href
83-
84-
await asyncio.gather(*tasks)
85-
86-
# save Item metadata alongside saved assets
87-
if save_item:
88-
new_item.remove_links("root")
89-
new_item.save_object(dest_href=os.path.join(path, "item.json"))
90-
91-
return new_item
92-
93-
94-
async def download_items_assets(items: Iterable[Item], **kwargs: Any) -> list[Item]:
95-
tasks = []
96-
for item in items:
97-
tasks.append(asyncio.create_task(download_item_assets(item, **kwargs)))
98-
new_items: list[Item] = await asyncio.gather(*tasks)
99-
return new_items
34+
async def download_items_assets(
35+
items: Iterable[Item],
36+
path_template: str = "${collection}/${id}",
37+
config: Optional[DownloadConfig] = None,
38+
keep_non_downloaded: bool = True,
39+
) -> list[Item]:
40+
return await asyncio.gather(
41+
*[
42+
asyncio.create_task(
43+
download_item_assets(
44+
item=item,
45+
path_template=path_template,
46+
config=config,
47+
keep_non_downloaded=keep_non_downloaded,
48+
)
49+
)
50+
for item in items
51+
]
52+
)
10053

10154

10255
def upload_item_assets_to_s3(
@@ -107,7 +60,7 @@ def upload_item_assets_to_s3(
10760
s3_urls: bool = False,
10861
headers: Optional[dict[str, Any]] = None,
10962
s3_client: Optional[s3] = None,
110-
**kwargs: Any,
63+
**kwargs: Any, # unused, but retain to permit unused attributes from upload_options
11164
) -> Item:
11265
"""Upload Item assets to an S3 bucket
11366
Args:

stactask/config.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from dataclasses import dataclass
2+
3+
from stac_asset import Config
4+
5+
6+
@dataclass
7+
class DownloadConfig(Config): # type: ignore
8+
pass

0 commit comments

Comments
 (0)