Skip to content

Improve network error handling for batch jobs (HTTP retries) #216

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
gmfmi opened this issue Nov 21, 2024 · 4 comments
Open

Improve network error handling for batch jobs (HTTP retries) #216

gmfmi opened this issue Nov 21, 2024 · 4 comments

Comments

@gmfmi
Copy link

gmfmi commented Nov 21, 2024

Context

In our project, we have to reprocess a large amount of Elastic data in a single batch. The elasticsearch input plugin might run for multiple hours to handle billions of events. But we are facing a problem: when a network issue occurs, the entire job is restarted (not only last HTTP request).

I guess this issue rarely occurs when Logstash is deployed in the Elasticsearch subnet but we got a cloud hybrid configuration, and it makes it impossible to use in production as of now.

The plugin configuration:

input {
 elasticsearch {
 hosts => "https://<cluster_id>.francecentral.azure.elastic-cloud.com:443"
 api_key => "<api_key>"
 ssl_enabled => "true"
 index => "<index_name>"
 search_api => "search_after"
 retries => 5
 scroll => "5m"
 response_type => "hits"
 size => 1000
 query => '{"query":{"bool":{"filter":[ ... ]}},"sort":[ ... ]}'
 }
}

Log sample of the restarting job when a network error occurred:

[2024-11-21T09:29:03,645][DEBUG][logstash.inputs.elasticsearch.searchafter][main][9fee6d2baff37ecc70c364d3215ef2d3eab93bec9b08f68ed299cc50ed87e9b2] Query progress
[2024-11-21T09:29:03,806][DEBUG][logstash.inputs.elasticsearch.searchafter][main][9fee6d2baff37ecc70c364d3215ef2d3eab93bec9b08f68ed299cc50ed87e9b2] Query progress
[2024-11-21T09:29:03,815][WARN ][logstash.inputs.elasticsearch.searchafter][main][9fee6d2baff37ecc70c364d3215ef2d3eab93bec9b08f68ed299cc50ed87e9b2] Attempt to search_after paginated search but failed. Sleeping for 0.02 {:fail_count=>1, :exception=>"<cluster_id>.francecentral.azure.elastic-cloud.com:443 failed to respond"}
[2024-11-21T09:29:03,835][INFO ][logstash.inputs.elasticsearch.searchafter][main][9fee6d2baff37ecc70c364d3215ef2d3eab93bec9b08f68ed299cc50ed87e9b2] Query start
[2024-11-21T09:29:03,835][DEBUG][logstash.inputs.elasticsearch.searchafter][main][9fee6d2baff37ecc70c364d3215ef2d3eab93bec9b08f68ed299cc50ed87e9b2] Query progress
[2024-11-21T09:29:04,222][DEBUG][logstash.inputs.elasticsearch.searchafter][main][9fee6d2baff37ecc70c364d3215ef2d3eab93bec9b08f68ed299cc50ed87e9b2] Query progress

Feature proposal

First, for future documentation readers, it would be nice to improve the retries section of the documentation to explain that it is at the "job" level and not "http request" level.

Then, adding a retry mechanism at the HTTP request level with an exponential backoff (or similar) would be a good option.

I had a quick look at the code base, I think we could add a wrapper around the next_page() function to handle the network error and implement the retries properly

Contribution

If it can help, we can contribute and develop this feature.

@gmfmi
Copy link
Author

gmfmi commented Nov 22, 2024

If that can help, I just patched the search_after section of the code by adding a retry logic. Here is the code of the next_page function that I modified:

def next_page(pit_id: , search_after: nil, slice_id: nil)
  options = search_options(pit_id: pit_id, search_after: search_after, slice_id: slice_id)
  logger.trace("search options", options)
  
  max_retries = 5        # Maximum number of retries
  initial_delay = 1      # Initial delay (1 second)
  retries = 0
  begin
    @client.search(options)
  rescue => err
    if retries < max_retries
      delay = initial_delay * (2 ** retries)
      retries += 1
      logger.warn("Retrying search request, attempt #{retries} of #{max_retries}, waiting #{delay}s. Details: #{err.message}")
      sleep(delay)
      retry
    else
      logger.error("Max retries reached. Failing with error: #{err.message}")
      raise
    end
  end
end

That works as expected but I didn't take the time to implement generic logic for both "search_after" and "scoll" methods. Also, I have not added any tests for now.

@flexitrev
Copy link
Contributor

Thanks for raising this. I think this would be a useful addition. Would you like to open a pull request?

Additionally, we are testing out sincedb like functionality in the plugin, would you be interested in participating in the tech. preview?

#205

@gmfmi
Copy link
Author

gmfmi commented Feb 24, 2025

Yes, I will open a pull request. I was just wondering if implementation details (i.e. params exposed to the end user) should be discussed here or in the PR?

About sincedb, sorry, but I don't think I'll have time to spend on it in the coming weeks.

@jsvd
Copy link
Member

jsvd commented Mar 18, 2025

I've looked at this a bit and I agree that, especially for search_after, we should support retries at the individual search level.

It may be necessary to keep the retries at the PIT level as well though, as errors can cause the PIT to not be available any more, so the plugin must be able to get out of the search-level loop and go back to PIT-level.

The retry at next_page can should use in this case the same value of @retries, perhaps reusing the retryable(JOB_ID) { } capability, though it may not be possible at is needs to capture the exception and re-raise it on the final retry, or next_page must be aware that not getting at data after the retryable means that there was a failure that must bubble up.

Your code suggestion gets us nearly there as it tackles the retry in the right place (next_page) and keeps the existing retryable. My suggestion is for you to open the PR and we can work from there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants