1
1
"""All the heleper methods to interface Google Earthengine with sepal-ui."""
2
2
3
+ import asyncio
4
+ from concurrent .futures import ThreadPoolExecutor
3
5
import time
4
6
from pathlib import Path
5
7
from typing import List , Union
@@ -83,8 +85,59 @@ def is_running(task_descripsion: str) -> ee.batch.Task:
83
85
return current_task
84
86
85
87
88
+ async def list_assets_concurrent (folders ):
89
+ with ThreadPoolExecutor () as executor :
90
+ loop = asyncio .get_running_loop ()
91
+ tasks = [
92
+ loop .run_in_executor (executor , ee .data .listAssets , {"parent" : folder })
93
+ for folder in folders
94
+ ]
95
+ results = await asyncio .gather (* tasks )
96
+ return results
97
+
98
+
99
+ async def get_assets_async_concurrent (folder : str ) -> list :
100
+ folder_queue = asyncio .Queue ()
101
+ await folder_queue .put (folder )
102
+ asset_list = []
103
+
104
+ while not folder_queue .empty ():
105
+ current_folders = [await folder_queue .get () for _ in range (folder_queue .qsize ())]
106
+ assets_groups = await list_assets_concurrent (current_folders )
107
+
108
+ for assets in assets_groups :
109
+ for asset in assets .get ("assets" , []):
110
+ asset_list .append ({"type" : asset ["type" ], "name" : asset ["name" ], "id" : asset ["id" ]})
111
+ if asset ["type" ] == "FOLDER" :
112
+ await folder_queue .put (asset ["name" ])
113
+
114
+ return asset_list
115
+
116
+
86
117
@sd .need_ee
87
- def get_assets (folder : Union [str , Path ] = "" ) -> List [dict ]:
118
+ def get_assets (folder : Union [str , Path ] = "" , async_ = True ) -> List [dict ]:
119
+ """Get all the assets from the parameter folder. every nested asset will be displayed.
120
+
121
+ Args:
122
+
123
+ folder: the initial GEE folder
124
+ async_: whether or not the function should be executed asynchronously
125
+
126
+ Returns:
127
+ the asset list. each asset is a dict with 3 keys: 'type', 'name' and 'id'
128
+
129
+ """
130
+
131
+ folder = str (folder ) or f"projects/{ ee .data ._cloud_api_user_project } /assets/"
132
+
133
+ if async_ :
134
+ return asyncio .run (get_assets_async_concurrent (folder ))
135
+
136
+ return get_assets_sync (folder )
137
+
138
+
139
+ @sd .need_ee
140
+ def get_assets_sync (folder : Union [str , Path ] = "" ) -> List [dict ]:
88
141
"""Get all the assets from the parameter folder. every nested asset will be displayed.
89
142
90
143
Args:
@@ -95,7 +148,6 @@ def get_assets(folder: Union[str, Path] = "") -> List[dict]:
95
148
"""
96
149
# set the folder and init the list
97
150
asset_list = []
98
- folder = str (folder ) or f"projects/{ ee .data ._cloud_api_user_project } /assets/"
99
151
100
152
def _recursive_get (folder , asset_list ):
101
153
@@ -151,6 +203,7 @@ def delete_assets(asset_id: str, dry_run: bool = True) -> None:
151
203
asset_id: the Id of the asset or a folder
152
204
dry_run: whether or not a dry run should be launched. dry run will only display the files name without deleting them.
153
205
"""
206
+
154
207
# define the action to execute for each asset based on the dry run mode
155
208
def delete (id : str ) -> None :
156
209
if dry_run is True :
0 commit comments