1
1
"""All the heleper methods to interface Google Earthengine with sepal-ui."""
2
2
3
+ import asyncio
4
+ import os
3
5
import time
6
+ from concurrent .futures import ThreadPoolExecutor
4
7
from pathlib import Path
5
8
from typing import List , Union
6
9
7
10
import ee
8
11
import ipyvuetify as v
12
+ import nest_asyncio
13
+ import psutil
9
14
10
15
from sepal_ui .message import ms
11
16
from sepal_ui .scripts import decorator as sd
12
17
18
+ # This I have to add because of the error: RuntimeError: This event loop is already running when using jupyter notebook
19
+ nest_asyncio .apply ()
20
+
13
21
14
22
@sd .need_ee
15
23
def wait_for_completion (task_descripsion : str , widget_alert : v .Alert = None ) -> str :
@@ -83,8 +91,91 @@ def is_running(task_descripsion: str) -> ee.batch.Task:
83
91
return current_task
84
92
85
93
94
+ async def list_assets_concurrent (folders : list , semaphore : asyncio .Semaphore ) -> list :
95
+ """List assets concurrently using ThreadPoolExecutor.
96
+
97
+ Args:
98
+ folders: list of folders to list assets from
99
+
100
+ Returns:
101
+ list of assets for each folder
102
+ """
103
+ async with semaphore :
104
+ with ThreadPoolExecutor () as executor :
105
+ loop = asyncio .get_running_loop ()
106
+ tasks = [
107
+ loop .run_in_executor (executor , ee .data .listAssets , {"parent" : folder })
108
+ for folder in folders
109
+ ]
110
+ results = await asyncio .gather (* tasks )
111
+ return results
112
+
113
+
114
+ async def get_assets_async_concurrent (folder : str ) -> List [dict ]:
115
+ """Get all the assets from the parameter folder. every nested asset will be displayed.
116
+
117
+ Args:
118
+ folder: the initial GEE folder
119
+
120
+ Returns:
121
+ the asset list. each asset is a dict with 3 keys: 'type', 'name' and 'id'
122
+
123
+ """
124
+ folder_queue = asyncio .Queue ()
125
+ await folder_queue .put (folder )
126
+ asset_list = []
127
+
128
+ # Determine system resources
129
+ cpu_count = os .cpu_count ()
130
+ available_memory = psutil .virtual_memory ().available
131
+
132
+ # 50 MB per task
133
+ max_concurrent_tasks = min (30 , cpu_count , available_memory // (50 * 1024 * 1024 ))
134
+
135
+ # Create a semaphore to limit the number of concurrent tasks
136
+ semaphore = asyncio .Semaphore (max_concurrent_tasks )
137
+
138
+ while not folder_queue .empty ():
139
+ current_folders = [await folder_queue .get () for _ in range (folder_queue .qsize ())]
140
+ assets_groups = await list_assets_concurrent (current_folders , semaphore )
141
+
142
+ for assets in assets_groups :
143
+ for asset in assets .get ("assets" , []):
144
+ asset_list .append ({"type" : asset ["type" ], "name" : asset ["name" ], "id" : asset ["id" ]})
145
+ if asset ["type" ] == "FOLDER" :
146
+ await folder_queue .put (asset ["name" ])
147
+
148
+ return asset_list
149
+
150
+
151
+ @sd .need_ee
152
+ def get_assets (folder : Union [str , Path ] = "" , async_ = True ) -> List [dict ]:
153
+ """Get all the assets from the parameter folder. every nested asset will be displayed.
154
+
155
+ Args:
156
+ folder: the initial GEE folder
157
+ async_: whether or not the function should be executed asynchronously
158
+
159
+ Returns:
160
+ the asset list. each asset is a dict with 3 keys: 'type', 'name' and 'id'
161
+
162
+ """
163
+ folder = str (folder ) or f"projects/{ ee .data ._cloud_api_user_project } /assets/"
164
+
165
+ if async_ :
166
+ try :
167
+ return asyncio .run (get_assets_async_concurrent (folder ))
168
+ except Exception as e :
169
+ # Log the exception for future debugging
170
+ print (f"Error occurred in get_assets_async_concurrent: { e } " )
171
+ # Fallback to synchronous method
172
+ return get_assets_sync (folder )
173
+
174
+ return get_assets_sync (folder )
175
+
176
+
86
177
@sd .need_ee
87
- def get_assets (folder : Union [str , Path ] = "" ) -> List [dict ]:
178
+ def get_assets_sync (folder : Union [str , Path ] = "" ) -> List [dict ]:
88
179
"""Get all the assets from the parameter folder. every nested asset will be displayed.
89
180
90
181
Args:
@@ -95,7 +186,6 @@ def get_assets(folder: Union[str, Path] = "") -> List[dict]:
95
186
"""
96
187
# set the folder and init the list
97
188
asset_list = []
98
- folder = str (folder ) or f"projects/{ ee .data ._cloud_api_user_project } /assets/"
99
189
100
190
def _recursive_get (folder , asset_list ):
101
191
0 commit comments