-
Notifications
You must be signed in to change notification settings - Fork 9.1k
HADOOP-19531. [ABFS][FnsOverBlob] Streaming List Path Result Should Happen Inside Retry Loop #7582
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
55dbf22
fde7917
e3f272a
f965d4b
37f0191
2689733
60f1c38
f968baa
5df635d
5b9dfa5
fde1783
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ | |
import javax.xml.parsers.ParserConfigurationException; | ||
import javax.xml.parsers.SAXParser; | ||
import javax.xml.parsers.SAXParserFactory; | ||
import java.io.ByteArrayInputStream; | ||
import java.io.FileNotFoundException; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
|
@@ -1605,12 +1606,9 @@ | |
@Override | ||
public ListResponseData parseListPathResults(AbfsHttpOperation result, URI uri) | ||
throws AzureBlobFileSystemException { | ||
BlobListResultSchema listResultSchema; | ||
try (InputStream stream = result.getListResultStream()) { | ||
if (stream == null) { | ||
return null; | ||
} | ||
try (InputStream stream = new ByteArrayInputStream(result.getListResultData())) { | ||
try { | ||
BlobListResultSchema listResultSchema; | ||
final SAXParser saxParser = saxParserThreadLocal.get(); | ||
saxParser.reset(); | ||
listResultSchema = new BlobListResultSchema(); | ||
|
@@ -1620,19 +1618,17 @@ | |
LOG.debug("ListBlobs listed {} blobs with {} as continuation token", | ||
listResultSchema.paths().size(), | ||
listResultSchema.getNextMarker()); | ||
} catch (SAXException | IOException e) { | ||
throw new AbfsDriverException(e); | ||
return filterDuplicateEntriesAndRenamePendingFiles(listResultSchema, uri); | ||
} catch (SAXException | IOException ex) { | ||
throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, ex); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIT: We can add URI in the error message for better visibility. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. URI is already in log messages. |
||
} | ||
} catch (IOException e) { | ||
LOG.error("Unable to deserialize list results for uri {}", uri.toString(), e); | ||
throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, e); | ||
} | ||
|
||
try { | ||
return filterDuplicateEntriesAndRenamePendingFiles(listResultSchema, uri); | ||
} catch (IOException e) { | ||
LOG.error("Unable to filter list results for uri {}", uri.toString(), e); | ||
throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, e); | ||
} catch (AbfsDriverException ex) { | ||
// Throw as it is to avoid multiple wrapping. | ||
LOG.error("Unable to deserialize list results for Uri {}", uri != null ? uri.toString(): "NULL", ex); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. uri should not be null if we are reaching till this part of the code There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. uri can be null for the cases where this API is called by internal operations like in create flow. |
||
throw ex; | ||
} catch (Exception ex) { | ||
LOG.error("Unable to get stream for list results for uri {}", uri != null ? uri.toString(): "NULL", ex); | ||
throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, ex); | ||
} | ||
} | ||
|
||
|
@@ -1930,7 +1926,8 @@ | |
* @param uri URI to be used for path conversion. | ||
* @return List of entries after removing duplicates. | ||
*/ | ||
private ListResponseData filterDuplicateEntriesAndRenamePendingFiles( | ||
@VisibleForTesting | ||
public ListResponseData filterDuplicateEntriesAndRenamePendingFiles( | ||
Check failure on line 1930 in hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
|
||
BlobListResultSchema listResultSchema, URI uri) throws IOException { | ||
List<FileStatus> fileStatuses = new ArrayList<>(); | ||
Map<Path, Integer> renamePendingJsonPaths = new HashMap<>(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
|
||
package org.apache.hadoop.fs.azurebfs.services; | ||
|
||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.net.HttpURLConnection; | ||
|
@@ -75,7 +76,7 @@ public abstract class AbfsHttpOperation implements AbfsPerfLoggable { | |
private String requestId = ""; | ||
private String expectedAppendPos = ""; | ||
private ListResultSchema listResultSchema = null; | ||
private InputStream listResultStream = null; | ||
private byte[] listResultData = null; | ||
private List<String> blockIdList = null; | ||
|
||
// metrics | ||
|
@@ -221,8 +222,20 @@ public ListResultSchema getListResultSchema() { | |
return listResultSchema; | ||
} | ||
|
||
public final InputStream getListResultStream() { | ||
return listResultStream; | ||
public final byte[] getListResultData() { | ||
return listResultData; | ||
} | ||
|
||
@VisibleForTesting | ||
public byte[] readDataFromStream(InputStream stream) throws IOException{ | ||
try (ByteArrayOutputStream buffer = new ByteArrayOutputStream()) { | ||
byte[] tempBuffer = new byte[CLEAN_UP_BUFFER_SIZE]; | ||
int bytesRead; | ||
while ((bytesRead = stream.read(tempBuffer, 0, CLEAN_UP_BUFFER_SIZE)) != -1) { | ||
buffer.write(tempBuffer, 0, bytesRead); | ||
} | ||
return buffer.toByteArray(); | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -396,8 +409,7 @@ final void parseResponse(final byte[] buffer, | |
// consume the input stream to release resources | ||
int totalBytesRead = 0; | ||
|
||
try { | ||
InputStream stream = getContentInputStream(); | ||
try (InputStream stream = getContentInputStream()) { | ||
if (isNullInputStream(stream)) { | ||
return; | ||
} | ||
|
@@ -409,7 +421,7 @@ final void parseResponse(final byte[] buffer, | |
if (url.toString().contains(QUERY_PARAM_COMP + EQUAL + BLOCKLIST)) { | ||
parseBlockListResponse(stream); | ||
} else { | ||
listResultStream = stream; | ||
parseListPathResponse(stream); | ||
} | ||
} else { | ||
if (buffer != null) { | ||
|
@@ -438,6 +450,11 @@ final void parseResponse(final byte[] buffer, | |
method, getMaskedUrl(), ex.getMessage()); | ||
log.debug("IO Error: ", ex); | ||
throw ex; | ||
} catch (Exception ex) { | ||
log.warn("Unexpected error: {} {}: {}", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need both log.warn and log.debug here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We were doing same for other type of exceptions. |
||
method, getMaskedUrl(), ex.getMessage()); | ||
log.debug("Unexpected Error: ", ex); | ||
throw new IOException(ex); | ||
} finally { | ||
this.recvResponseTimeMs += elapsedTimeMs(startTime); | ||
this.bytesReceived = totalBytesRead; | ||
|
@@ -500,6 +517,18 @@ private void parseBlockListResponse(final InputStream stream) throws IOException | |
blockIdList = client.parseBlockListResponse(stream); | ||
} | ||
|
||
/** | ||
* Parse the list path response from the network stream and save response into a buffer. | ||
* @param stream Network InputStream. | ||
* @throws IOException if an error occurs while reading the stream. | ||
*/ | ||
private void parseListPathResponse(final InputStream stream) throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add javadocs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Taken |
||
if (stream == null || listResultData != null) { | ||
return; | ||
} | ||
listResultData = readDataFromStream(stream); | ||
} | ||
|
||
public List<String> getBlockIdList() { | ||
return blockIdList; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
|
||
import java.io.FileNotFoundException; | ||
import java.io.IOException; | ||
import java.net.SocketException; | ||
import java.net.SocketTimeoutException; | ||
import java.net.URL; | ||
import java.util.ArrayList; | ||
|
@@ -42,6 +43,7 @@ | |
import org.apache.hadoop.fs.Path; | ||
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; | ||
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; | ||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException; | ||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; | ||
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient; | ||
import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler; | ||
|
@@ -63,7 +65,10 @@ | |
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE; | ||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_LIST_MAX_RESULTS; | ||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_METADATA_PREFIX; | ||
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_BLOB_LIST_PARSING; | ||
import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX; | ||
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_MESSAGE; | ||
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION; | ||
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_JDK_MESSAGE; | ||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs; | ||
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; | ||
|
@@ -130,7 +135,9 @@ public Void call() throws Exception { | |
|
||
/** | ||
* Test to verify that each paginated call to ListBlobs uses a new tracing context. | ||
* @throws Exception | ||
* Test also verifies that the retry policy is called when a SocketTimeoutException | ||
* Test also verifies that empty list with valid continuation token is handled. | ||
* @throws Exception if there is an error or test assertions fails. | ||
*/ | ||
@Test | ||
public void testListPathTracingContext() throws Exception { | ||
|
@@ -160,6 +167,10 @@ public void testListPathTracingContext() throws Exception { | |
List<FileStatus> fileStatuses = new ArrayList<>(); | ||
spiedStore.listStatus(new Path("/"), "", fileStatuses, true, null, spiedTracingContext); | ||
|
||
// Assert that there were retries due to SocketTimeoutException | ||
Mockito.verify(spiedClient, Mockito.times(1)) | ||
.getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION); | ||
|
||
// Assert that there were 2 paginated ListPath calls were made 1 and 2. | ||
// 1. Without continuation token | ||
Mockito.verify(spiedClient, times(1)).listPath( | ||
|
@@ -176,6 +187,31 @@ public void testListPathTracingContext() throws Exception { | |
Mockito.verify(spiedTracingContext, times(0)).constructHeader(any(), any(), any()); | ||
} | ||
|
||
@Test | ||
public void testListPathParsingFailure() throws Exception { | ||
assumeBlobServiceType(); | ||
AzureBlobFileSystem spiedFs = Mockito.spy(getFileSystem()); | ||
AzureBlobFileSystemStore spiedStore = Mockito.spy(spiedFs.getAbfsStore()); | ||
AbfsBlobClient spiedClient = Mockito.spy(spiedStore.getClientHandler() | ||
.getBlobClient()); | ||
Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore(); | ||
Mockito.doReturn(spiedClient).when(spiedStore).getClient(); | ||
|
||
Mockito.doThrow(new SocketException(CONNECTION_RESET_MESSAGE)).when(spiedClient).filterDuplicateEntriesAndRenamePendingFiles(any(), any()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we reduce the retry count and verify that request was retried ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This particular test is to make sure retry does not happen as here exception is injected in parsing list result from local stream. I have modified the test |
||
List<FileStatus> fileStatuses = new ArrayList<>(); | ||
AbfsDriverException ex = intercept(AbfsDriverException.class, | ||
() -> { | ||
spiedStore.listStatus(new Path("/"), "", fileStatuses, | ||
true, null, getTestTracingContext(spiedFs, true)); | ||
}); | ||
Assertions.assertThat(ex.getStatusCode()) | ||
.describedAs("Expecting Network Error status code") | ||
.isEqualTo(-1); | ||
Assertions.assertThat(ex.getErrorMessage()) | ||
.describedAs("Expecting COPY_ABORTED error code") | ||
.contains(ERR_BLOB_LIST_PARSING); | ||
} | ||
|
||
/** | ||
* Creates a file, verifies that listStatus returns it, | ||
* even while the file is still open for writing. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,6 +45,7 @@ | |
|
||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON; | ||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KEEP_ALIVE_CACHE_CLOSED; | ||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT; | ||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_NETWORKING_LIBRARY; | ||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME; | ||
import static org.apache.hadoop.fs.azurebfs.constants.HttpOperationType.APACHE_HTTP_CLIENT; | ||
|
@@ -67,6 +68,7 @@ public ITestApacheClientConnectionPool() throws Exception { | |
public void testKacIsClosed() throws Throwable { | ||
Configuration configuration = new Configuration(getRawConfiguration()); | ||
configuration.set(FS_AZURE_NETWORKING_LIBRARY, APACHE_HTTP_CLIENT.name()); | ||
configuration.unset(FS_AZURE_METRIC_FORMAT); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need to unset this config here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test was failing when this config was present in test xml files |
||
try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance( | ||
configuration)) { | ||
KeepAliveCache kac = fs.getAbfsStore().getClientHandler().getIngressClient() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will there be any issue here in case result.getListResultData() is null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it can lead to NPE. Have modified to catch all the exceptions and wrap it around AbfsDriverException and throw back to user