Skip to content

Commit 3cae72d

Browse files
authored
[plugin] repository-azure is not working properly hangs on basic operations (#1740)
* [plugin] repository-azure is not working properly hangs on basic operations Signed-off-by: Andriy Redko <[email protected]> * Added tests cases and TODO items, addressing code review comments Signed-off-by: Andriy Redko <[email protected]>
1 parent 210f473 commit 3cae72d

File tree

3 files changed

+142
-61
lines changed

3 files changed

+142
-61
lines changed

plugins/repository-azure/build.gradle

+20-1
Original file line numberDiff line numberDiff line change
@@ -283,4 +283,23 @@ task azureThirdPartyTest(type: Test) {
283283
nonInputProperties.systemProperty 'test.azure.endpoint_suffix', "${-> azureAddress.call() }"
284284
}
285285
}
286-
check.dependsOn(azureThirdPartyTest)
286+
287+
task azureThirdPartyDefaultXmlTest(type: Test) {
288+
SourceSetContainer sourceSets = project.getExtensions().getByType(SourceSetContainer.class);
289+
SourceSet internalTestSourceSet = sourceSets.getByName(InternalClusterTestPlugin.SOURCE_SET_NAME)
290+
setTestClassesDirs(internalTestSourceSet.getOutput().getClassesDirs())
291+
setClasspath(internalTestSourceSet.getRuntimeClasspath())
292+
dependsOn tasks.internalClusterTest
293+
include '**/AzureStorageCleanupThirdPartyTests.class'
294+
systemProperty 'javax.xml.stream.XMLInputFactory', "com.sun.xml.internal.stream.XMLInputFactoryImpl"
295+
systemProperty 'test.azure.account', azureAccount ? azureAccount : ""
296+
systemProperty 'test.azure.key', azureKey ? azureKey : ""
297+
systemProperty 'test.azure.sas_token', azureSasToken ? azureSasToken : ""
298+
systemProperty 'test.azure.container', azureContainer ? azureContainer : ""
299+
systemProperty 'test.azure.base', (azureBasePath ? azureBasePath : "") + "_third_party_tests_" + BuildParams.testSeed
300+
if (useFixture) {
301+
nonInputProperties.systemProperty 'test.azure.endpoint_suffix', "${-> azureAddress.call() }"
302+
}
303+
}
304+
305+
check.dependsOn(azureThirdPartyTest, azureThirdPartyDefaultXmlTest)

plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureBlobStore.java

+121-60
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import com.azure.core.http.HttpMethod;
3636
import com.azure.core.http.HttpRequest;
3737
import com.azure.core.http.HttpResponse;
38+
import com.azure.core.http.rest.PagedResponse;
3839
import com.azure.core.http.rest.Response;
3940
import com.azure.core.util.Context;
4041
import com.azure.storage.blob.BlobClient;
@@ -51,6 +52,7 @@
5152
import com.azure.storage.blob.options.BlobParallelUploadOptions;
5253
import com.azure.storage.common.implementation.Constants;
5354

55+
import org.apache.commons.lang3.StringUtils;
5456
import org.apache.logging.log4j.LogManager;
5557
import org.apache.logging.log4j.Logger;
5658
import org.apache.logging.log4j.core.util.Throwables;
@@ -82,6 +84,7 @@
8284
import java.util.HashMap;
8385
import java.util.HashSet;
8486
import java.util.Map;
87+
import java.util.Optional;
8588
import java.util.Set;
8689
import java.util.concurrent.Executor;
8790
import java.util.concurrent.atomic.AtomicLong;
@@ -217,50 +220,71 @@ public DeleteResult deleteBlobDirectory(String path, Executor executor) throws U
217220
final ListBlobsOptions listBlobsOptions = new ListBlobsOptions().setPrefix(path);
218221

219222
SocketAccess.doPrivilegedVoidException(() -> {
220-
for (final BlobItem blobItem : blobContainer.listBlobs(listBlobsOptions, timeout())) {
221-
// Skipping prefixes as those are not deletable and should not be there
222-
assert (blobItem.isPrefix() == null || !blobItem.isPrefix()) : "Only blobs (not prefixes) are expected";
223-
224-
outstanding.incrementAndGet();
225-
executor.execute(new AbstractRunnable() {
226-
@Override
227-
protected void doRun() throws Exception {
228-
final long len = blobItem.getProperties().getContentLength();
229-
230-
final BlobClient azureBlob = blobContainer.getBlobClient(blobItem.getName());
231-
logger.trace(
232-
() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blobItem.getName())
233-
);
234-
final Response<Void> response = azureBlob.deleteWithResponse(null, null, timeout(), client.v2().get());
235-
logger.trace(
236-
() -> new ParameterizedMessage(
237-
"container [{}]: blob [{}] deleted status [{}].",
238-
container,
239-
blobItem.getName(),
240-
response.getStatusCode()
241-
)
242-
);
243-
244-
blobsDeleted.incrementAndGet();
245-
if (len >= 0) {
246-
bytesDeleted.addAndGet(len);
223+
String continuationToken = null;
224+
225+
do {
226+
// Fetch one page at a time, others are going to be fetched by continuation token
227+
// TODO: reconsider reverting to simplified approach once https://github.com/Azure/azure-sdk-for-java/issues/26064
228+
// gets addressed.
229+
final Optional<PagedResponse<BlobItem>> pageOpt = blobContainer.listBlobs(listBlobsOptions, timeout())
230+
.streamByPage(continuationToken)
231+
.findFirst();
232+
233+
if (!pageOpt.isPresent()) {
234+
// No more pages, should never happen
235+
break;
236+
}
237+
238+
final PagedResponse<BlobItem> page = pageOpt.get();
239+
for (final BlobItem blobItem : page.getValue()) {
240+
// Skipping prefixes as those are not deletable and should not be there
241+
assert (blobItem.isPrefix() == null || !blobItem.isPrefix()) : "Only blobs (not prefixes) are expected";
242+
243+
outstanding.incrementAndGet();
244+
executor.execute(new AbstractRunnable() {
245+
@Override
246+
protected void doRun() throws Exception {
247+
final long len = blobItem.getProperties().getContentLength();
248+
249+
final BlobClient azureBlob = blobContainer.getBlobClient(blobItem.getName());
250+
logger.trace(
251+
() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blobItem.getName())
252+
);
253+
final Response<Void> response = azureBlob.deleteWithResponse(null, null, timeout(), client.v2().get());
254+
logger.trace(
255+
() -> new ParameterizedMessage(
256+
"container [{}]: blob [{}] deleted status [{}].",
257+
container,
258+
blobItem.getName(),
259+
response.getStatusCode()
260+
)
261+
);
262+
263+
blobsDeleted.incrementAndGet();
264+
if (len >= 0) {
265+
bytesDeleted.addAndGet(len);
266+
}
247267
}
248-
}
249268

250-
@Override
251-
public void onFailure(Exception e) {
252-
exceptions.add(e);
253-
}
269+
@Override
270+
public void onFailure(Exception e) {
271+
exceptions.add(e);
272+
}
254273

255-
@Override
256-
public void onAfter() {
257-
if (outstanding.decrementAndGet() == 0) {
258-
result.onResponse(null);
274+
@Override
275+
public void onAfter() {
276+
if (outstanding.decrementAndGet() == 0) {
277+
result.onResponse(null);
278+
}
259279
}
260-
}
261-
});
262-
}
280+
});
281+
}
282+
283+
// Fetch next continuation token
284+
continuationToken = page.getContinuationToken();
285+
} while (StringUtils.isNotBlank(continuationToken));
263286
});
287+
264288
if (outstanding.decrementAndGet() == 0) {
265289
result.onResponse(null);
266290
}
@@ -301,20 +325,39 @@ public Map<String, BlobMetadata> listBlobsByPrefix(String keyPath, String prefix
301325
.setPrefix(keyPath + (prefix == null ? "" : prefix));
302326

303327
SocketAccess.doPrivilegedVoidException(() -> {
304-
for (final BlobItem blobItem : blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())) {
305-
// Skipping over the prefixes, only look for the blobs
306-
if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
307-
continue;
328+
String continuationToken = null;
329+
330+
do {
331+
// Fetch one page at a time, others are going to be fetched by continuation token
332+
// TODO: reconsider reverting to simplified approach once https://github.com/Azure/azure-sdk-for-java/issues/26064
333+
// gets addressed
334+
final Optional<PagedResponse<BlobItem>> pageOpt = blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())
335+
.streamByPage(continuationToken)
336+
.findFirst();
337+
338+
if (!pageOpt.isPresent()) {
339+
// No more pages, should never happen
340+
break;
308341
}
309342

310-
final String name = getBlobName(blobItem.getName(), container, keyPath);
311-
logger.trace(() -> new ParameterizedMessage("blob name [{}]", name));
343+
final PagedResponse<BlobItem> page = pageOpt.get();
344+
for (final BlobItem blobItem : page.getValue()) {
345+
// Skipping over the prefixes, only look for the blobs
346+
if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
347+
continue;
348+
}
312349

313-
final BlobItemProperties properties = blobItem.getProperties();
314-
logger.trace(() -> new ParameterizedMessage("blob name [{}], size [{}]", name, properties.getContentLength()));
315-
blobsBuilder.put(name, new PlainBlobMetadata(name, properties.getContentLength()));
316-
}
350+
final String name = getBlobName(blobItem.getName(), container, keyPath);
351+
logger.trace(() -> new ParameterizedMessage("blob name [{}]", name));
352+
353+
final BlobItemProperties properties = blobItem.getProperties();
354+
logger.trace(() -> new ParameterizedMessage("blob name [{}], size [{}]", name, properties.getContentLength()));
355+
blobsBuilder.put(name, new PlainBlobMetadata(name, properties.getContentLength()));
356+
}
317357

358+
// Fetch next continuation token
359+
continuationToken = page.getContinuationToken();
360+
} while (StringUtils.isNotBlank(continuationToken));
318361
});
319362

320363
return MapBuilder.newMapBuilder(blobsBuilder).immutableMap();
@@ -330,18 +373,36 @@ public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxExcept
330373
.setPrefix(keyPath);
331374

332375
SocketAccess.doPrivilegedVoidException(() -> {
333-
for (final BlobItem blobItem : blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())) {
334-
// Skipping over the blobs, only look for prefixes
335-
if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
336-
// Expecting name in the form /container/keyPath.* and we want to strip off the /container/
337-
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /.
338-
// Lastly, we add the length of keyPath to the offset to strip this container's path.
339-
final String name = getBlobName(blobItem.getName(), container, keyPath).replaceAll("/$", "");
340-
logger.trace(() -> new ParameterizedMessage("blob name [{}]", name));
341-
blobsBuilder.add(name);
376+
String continuationToken = null;
377+
378+
do {
379+
// Fetch one page at a time, others are going to be fetched by continuation token
380+
// TODO: reconsider reverting to simplified approach once https://github.com/Azure/azure-sdk-for-java/issues/26064
381+
// gets addressed
382+
final Optional<PagedResponse<BlobItem>> pageOpt = blobContainer.listBlobsByHierarchy("/", listBlobsOptions, timeout())
383+
.streamByPage(continuationToken)
384+
.findFirst();
385+
386+
if (!pageOpt.isPresent()) {
387+
// No more pages, should never happen
388+
break;
342389
}
343-
}
344-
;
390+
391+
final PagedResponse<BlobItem> page = pageOpt.get();
392+
for (final BlobItem blobItem : page.getValue()) {
393+
// Skipping over the blobs, only look for prefixes
394+
if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
395+
// Expecting name in the form /container/keyPath.* and we want to strip off the /container/
396+
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /.
397+
// Lastly, we add the length of keyPath to the offset to strip this container's path.
398+
final String name = getBlobName(blobItem.getName(), container, keyPath).replaceAll("/$", "");
399+
logger.trace(() -> new ParameterizedMessage("blob name [{}]", name));
400+
blobsBuilder.add(name);
401+
}
402+
}
403+
// Fetch next continuation token
404+
continuationToken = page.getContinuationToken();
405+
} while (StringUtils.isNotBlank(continuationToken));
345406
});
346407

347408
return Collections.unmodifiableMap(

test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java

+1
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ public void handle(final HttpExchange exchange) throws IOException {
208208

209209
}
210210
list.append("</Blobs>");
211+
list.append("<NextMarker />");
211212
list.append("</EnumerationResults>");
212213

213214
byte[] response = list.toString().getBytes(StandardCharsets.UTF_8);

0 commit comments

Comments
 (0)