1
1
from datetime import datetime , timedelta
2
2
import mimetypes
3
3
import os
4
- from pathlib import Path , PurePosixPath
4
+ from pathlib import Path
5
5
from typing import Any , Callable , Dict , Iterable , Optional , Tuple , Union
6
6
7
7
15
15
try :
16
16
from azure .core .exceptions import ResourceNotFoundError
17
17
from azure .storage .blob import (
18
+ BlobPrefix ,
18
19
BlobSasPermissions ,
19
20
BlobServiceClient ,
20
21
BlobProperties ,
21
22
ContentSettings ,
22
23
generate_blob_sas ,
23
24
)
25
+
26
+ from azure .storage .filedatalake import DataLakeServiceClient , FileProperties
24
27
except ModuleNotFoundError :
25
28
implementation_registry ["azure" ].dependencies_loaded = False
26
29
@@ -39,6 +42,7 @@ def __init__(
39
42
credential : Optional [Any ] = None ,
40
43
connection_string : Optional [str ] = None ,
41
44
blob_service_client : Optional ["BlobServiceClient" ] = None ,
45
+ data_lake_client : Optional ["DataLakeServiceClient" ] = None ,
42
46
file_cache_mode : Optional [Union [str , FileCacheMode ]] = None ,
43
47
local_cache_dir : Optional [Union [str , os .PathLike ]] = None ,
44
48
content_type_method : Optional [Callable ] = mimetypes .guess_type ,
@@ -76,6 +80,10 @@ def __init__(
76
80
https://docs.microsoft.com/en-us/azure/storage/blobs/storage-quickstart-blobs-python#copy-your-credentials-from-the-azure-portal).
77
81
blob_service_client (Optional[BlobServiceClient]): Instantiated [`BlobServiceClient`](
78
82
https://docs.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blobserviceclient?view=azure-python).
83
+ data_lake_client (Optional[DataLakeServiceClient]): Instantiated [`DataLakeServiceClient`](
84
+ https://learn.microsoft.com/en-us/python/api/azure-storage-file-datalake/azure.storage.filedatalake.datalakeserviceclient).
85
+ If None and `blob_service_client` is passed, we will create based on that.
86
+ Otherwise, will create based on passed credential, account_url, connection_string, or AZURE_STORAGE_CONNECTION_STRING env var
79
87
file_cache_mode (Optional[Union[str, FileCacheMode]]): How often to clear the file cache; see
80
88
[the caching docs](https://cloudpathlib.drivendata.org/stable/caching/) for more information
81
89
about the options in cloudpathlib.eums.FileCacheMode.
@@ -94,27 +102,73 @@ def __init__(
94
102
if connection_string is None :
95
103
connection_string = os .getenv ("AZURE_STORAGE_CONNECTION_STRING" , None )
96
104
105
+ self .blob_service_client = None
106
+ self .data_lake_client = None
107
+
97
108
if blob_service_client is not None :
98
109
self .service_client = blob_service_client
110
+
111
+ # create from blob service client if not passed
112
+ if data_lake_client is None :
113
+ self .data_lake_client = DataLakeServiceClient (
114
+ account_url = f"https://{ self .service_client .account_name } .dfs.core.windows.net" ,
115
+ credential = blob_service_client .credential ,
116
+ )
117
+
118
+ if data_lake_client is not None :
119
+ self .data_lake_client = data_lake_client
120
+
99
121
elif connection_string is not None :
100
122
self .service_client = BlobServiceClient .from_connection_string (
101
123
conn_str = connection_string , credential = credential
102
124
)
125
+ self .data_lake_client = DataLakeServiceClient .from_connection_string (
126
+ conn_str = connection_string , credential = credential
127
+ )
103
128
elif account_url is not None :
104
129
self .service_client = BlobServiceClient (account_url = account_url , credential = credential )
130
+ self .data_lake_client = DataLakeServiceClient (
131
+ account_url = account_url , credential = credential
132
+ )
105
133
else :
106
134
raise MissingCredentialsError (
107
135
"AzureBlobClient does not support anonymous instantiation. "
108
136
"Credentials are required; see docs for options."
109
137
)
110
138
111
- def _get_metadata (self , cloud_path : AzureBlobPath ) -> Union ["BlobProperties" , Dict [str , Any ]]:
112
- blob = self .service_client .get_blob_client (
113
- container = cloud_path .container , blob = cloud_path .blob
114
- )
115
- properties = blob .get_blob_properties ()
139
+ self .hns_cache : Dict [str , bool ] = {}
140
+
141
+ def _check_hns (self , cloud_path : AzureBlobPath ) -> bool :
142
+ if cloud_path .container not in self .hns_cache :
143
+ hns_enabled : bool = self .service_client .get_account_information ().get (
144
+ "is_hns_enabled" , False
145
+ ) # type: ignore
146
+ self .hns_cache [cloud_path .container ] = hns_enabled
147
+
148
+ return self .hns_cache [cloud_path .container ]
149
+
150
+ def _get_metadata (
151
+ self , cloud_path : AzureBlobPath
152
+ ) -> Union ["BlobProperties" , "FileProperties" , Dict [str , Any ]]:
153
+ if self ._check_hns (cloud_path ):
154
+ # works on both files and directories
155
+ fsc = self .data_lake_client .get_file_system_client (cloud_path .container ) # type: ignore
156
+
157
+ if fsc is not None :
158
+ properties = fsc .get_file_client (cloud_path .blob ).get_file_properties ()
159
+
160
+ # no content settings on directory
161
+ properties ["content_type" ] = properties .get (
162
+ "content_settings" , {"content_type" : None }
163
+ ).get ("content_type" )
116
164
117
- properties ["content_type" ] = properties .content_settings .content_type
165
+ else :
166
+ blob = self .service_client .get_blob_client (
167
+ container = cloud_path .container , blob = cloud_path .blob
168
+ )
169
+ properties = blob .get_blob_properties ()
170
+
171
+ properties ["content_type" ] = properties .content_settings .content_type
118
172
119
173
return properties
120
174
@@ -155,8 +209,17 @@ def _is_file_or_dir(self, cloud_path: AzureBlobPath) -> Optional[str]:
155
209
return "dir"
156
210
157
211
try :
158
- self ._get_metadata (cloud_path )
159
- return "file"
212
+ meta = self ._get_metadata (cloud_path )
213
+
214
+ # if hns, has is_directory property; else if not hns, _get_metadata will raise if not a file
215
+ return (
216
+ "dir"
217
+ if meta .get ("is_directory" , False )
218
+ or meta .get ("metadata" , {}).get ("hdi_isfolder" , False )
219
+ else "file"
220
+ )
221
+
222
+ # thrown if not HNS and file does not exist _or_ is dir; check if is dir instead
160
223
except ResourceNotFoundError :
161
224
prefix = cloud_path .blob
162
225
if prefix and not prefix .endswith ("/" ):
@@ -181,17 +244,14 @@ def _exists(self, cloud_path: AzureBlobPath) -> bool:
181
244
def _list_dir (
182
245
self , cloud_path : AzureBlobPath , recursive : bool = False
183
246
) -> Iterable [Tuple [AzureBlobPath , bool ]]:
184
- # shortcut if listing all available containers
185
247
if not cloud_path .container :
186
- if recursive :
187
- raise NotImplementedError (
188
- "Cannot recursively list all containers and contents; you can get all the containers then recursively list each separately."
189
- )
248
+ for container in self .service_client .list_containers ():
249
+ yield self .CloudPath (f"az://{ container .name } " ), True
190
250
191
- yield from (
192
- ( self . CloudPath ( f"az:// { c . name } " ), True )
193
- for c in self . service_client . list_containers ()
194
- )
251
+ if not recursive :
252
+ continue
253
+
254
+ yield from self . _list_dir ( self . CloudPath ( f"az:// { container . name } " ), recursive = True )
195
255
return
196
256
197
257
container_client = self .service_client .get_container_client (cloud_path .container )
@@ -200,30 +260,24 @@ def _list_dir(
200
260
if prefix and not prefix .endswith ("/" ):
201
261
prefix += "/"
202
262
203
- yielded_dirs = set ()
204
-
205
- # NOTE: Not recursive may be slower than necessary since it just filters
206
- # the recursive implementation
207
- for o in container_client .list_blobs (name_starts_with = prefix ):
208
- # get directory from this path
209
- for parent in PurePosixPath (o .name [len (prefix ) :]).parents :
210
- # if we haven't surfaced this directory already
211
- if parent not in yielded_dirs and str (parent ) != "." :
212
- # skip if not recursive and this is beyond our depth
213
- if not recursive and "/" in str (parent ):
214
- continue
263
+ if self ._check_hns (cloud_path ):
264
+ file_system_client = self .data_lake_client .get_file_system_client (cloud_path .container ) # type: ignore
265
+ paths = file_system_client .get_paths (path = cloud_path .blob , recursive = recursive )
215
266
216
- yield (
217
- self .CloudPath (f"az://{ cloud_path .container } /{ prefix } { parent } " ),
218
- True , # is a directory
219
- )
220
- yielded_dirs .add (parent )
267
+ for path in paths :
268
+ yield self .CloudPath (f"az://{ cloud_path .container } /{ path .name } " ), path .is_directory
221
269
222
- # skip file if not recursive and this is beyond our depth
223
- if not recursive and "/" in o .name [len (prefix ) :]:
224
- continue
270
+ else :
271
+ if not recursive :
272
+ blobs = container_client .walk_blobs (name_starts_with = prefix )
273
+ else :
274
+ blobs = container_client .list_blobs (name_starts_with = prefix )
225
275
226
- yield (self .CloudPath (f"az://{ cloud_path .container } /{ o .name } " ), False ) # is a file
276
+ for blob in blobs :
277
+ # walk_blobs returns folders with a trailing slash
278
+ blob_path = blob .name .rstrip ("/" )
279
+ blob_cloud_path = self .CloudPath (f"az://{ cloud_path .container } /{ blob_path } " )
280
+ yield blob_cloud_path , isinstance (blob , BlobPrefix )
227
281
228
282
def _move_file (
229
283
self , src : AzureBlobPath , dst : AzureBlobPath , remove_src : bool = True
@@ -253,6 +307,10 @@ def _move_file(
253
307
def _remove (self , cloud_path : AzureBlobPath , missing_ok : bool = True ) -> None :
254
308
file_or_dir = self ._is_file_or_dir (cloud_path )
255
309
if file_or_dir == "dir" :
310
+ if self ._check_hns (cloud_path ):
311
+ _hns_rmtree (self .data_lake_client , cloud_path .container , cloud_path .blob )
312
+ return
313
+
256
314
blobs = [
257
315
b .blob for b , is_dir in self ._list_dir (cloud_path , recursive = True ) if not is_dir
258
316
]
@@ -313,4 +371,15 @@ def _generate_presigned_url(
313
371
return url
314
372
315
373
374
+ def _hns_rmtree (data_lake_client , container , directory ):
375
+ """Stateless implementation so can be used in test suite cleanup as well.
376
+
377
+ If hierarchical namespace is enabled, delete the directory and all its contents.
378
+ (The non-HNS version is implemented in `_remove`, but will leave empty folders in HNS).
379
+ """
380
+ file_system_client = data_lake_client .get_file_system_client (container )
381
+ directory_client = file_system_client .get_directory_client (directory )
382
+ directory_client .delete_directory ()
383
+
384
+
316
385
AzureBlobClient .AzureBlobPath = AzureBlobClient .CloudPath # type: ignore
0 commit comments