Skip to content

Commit 636dc52

Browse files
authored
refactor: Remove redundant retries from batch_add_requests (#391)
### Description Remove retries from `batch_add_requests` (such retries already exist on the http client level). Add deprecation warnings when retires related arguments are used. Align sync and async version to not ignore exceptions from the http client. Add tests. ### Issues - Closes: #390
1 parent 8908006 commit 636dc52

File tree

2 files changed

+129
-67
lines changed

2 files changed

+129
-67
lines changed

src/apify_client/clients/resource_clients/request_queue.py

Lines changed: 36 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,8 @@
33
import asyncio
44
import logging
55
import math
6-
from dataclasses import dataclass
7-
from datetime import timedelta
6+
from collections.abc import Iterable
87
from queue import Queue
9-
from time import sleep
108
from typing import TYPE_CHECKING, Any, TypedDict
119

1210
from apify_shared.utils import filter_out_none_values_recursively, ignore_docs, parse_date_fields
@@ -17,7 +15,7 @@
1715
from apify_client.clients.base import ResourceClient, ResourceClientAsync
1816

1917
if TYPE_CHECKING:
20-
from collections.abc import Iterable
18+
from datetime import timedelta
2119

2220
from apify_shared.consts import StorageGeneralAccess
2321

@@ -43,19 +41,6 @@ class BatchAddRequestsResult(TypedDict):
4341
unprocessedRequests: list[dict]
4442

4543

46-
@dataclass
47-
class AddRequestsBatch:
48-
"""Batch of requests to add to the request queue.
49-
50-
Args:
51-
requests: List of requests to be added to the request queue.
52-
num_of_retries: Number of times this batch has been retried.
53-
"""
54-
55-
requests: Iterable[dict]
56-
num_of_retries: int = 0
57-
58-
5944
class RequestQueueClient(ResourceClient):
6045
"""Sub-client for manipulating a single request queue."""
6146

@@ -301,8 +286,8 @@ def batch_add_requests(
301286
*,
302287
forefront: bool = False,
303288
max_parallel: int = 1,
304-
max_unprocessed_requests_retries: int = 3,
305-
min_delay_between_unprocessed_requests_retries: timedelta = timedelta(milliseconds=500),
289+
max_unprocessed_requests_retries: int | None = None,
290+
min_delay_between_unprocessed_requests_retries: timedelta | None = None,
306291
) -> BatchAddRequestsResult:
307292
"""Add requests to the request queue in batches.
308293
@@ -316,13 +301,17 @@ def batch_add_requests(
316301
max_parallel: Specifies the maximum number of parallel tasks for API calls. This is only applicable
317302
to the async client. For the sync client, this value must be set to 1, as parallel execution
318303
is not supported.
319-
max_unprocessed_requests_retries: Number of retry attempts for unprocessed requests.
320-
min_delay_between_unprocessed_requests_retries: Minimum delay between retry attempts for unprocessed
321-
requests.
304+
max_unprocessed_requests_retries: Deprecated argument. Will be removed in next major release.
305+
min_delay_between_unprocessed_requests_retries: Deprecated argument. Will be removed in next major release.
322306
323307
Returns:
324308
Result containing lists of processed and unprocessed requests.
325309
"""
310+
if max_unprocessed_requests_retries:
311+
logger.warning('`max_unprocessed_requests_retries` is deprecated and not used anymore.')
312+
if min_delay_between_unprocessed_requests_retries:
313+
logger.warning('`min_delay_between_unprocessed_requests_retries` is deprecated and not used anymore.')
314+
326315
if max_parallel != 1:
327316
raise NotImplementedError('max_parallel is only supported in async client')
328317

@@ -339,38 +328,30 @@ def batch_add_requests(
339328
)
340329

341330
# Put the batches into the queue for processing.
342-
queue = Queue[AddRequestsBatch]()
331+
queue = Queue[Iterable[dict]]()
343332

344-
for b in batches:
345-
queue.put(AddRequestsBatch(b))
333+
for batch in batches:
334+
queue.put(batch)
346335

347336
processed_requests = list[dict]()
348337
unprocessed_requests = list[dict]()
349338

350339
# Process all batches in the queue sequentially.
351340
while not queue.empty():
352-
batch = queue.get()
341+
request_batch = queue.get()
353342

354343
# Send the batch to the API.
355344
response = self.http_client.call(
356345
url=self._url('requests/batch'),
357346
method='POST',
358347
params=request_params,
359-
json=list(batch.requests),
348+
json=list(request_batch),
360349
timeout_secs=_MEDIUM_TIMEOUT,
361350
)
362351

363-
# Retry if the request failed and the retry limit has not been reached.
364-
if not response.is_success and batch.num_of_retries < max_unprocessed_requests_retries:
365-
batch.num_of_retries += 1
366-
sleep(min_delay_between_unprocessed_requests_retries.total_seconds())
367-
queue.put(batch)
368-
369-
# Otherwise, add the processed/unprocessed requests to their respective lists.
370-
else:
371-
response_parsed = parse_date_fields(pluck_data(response.json()))
372-
processed_requests.extend(response_parsed.get('processedRequests', []))
373-
unprocessed_requests.extend(response_parsed.get('unprocessedRequests', []))
352+
response_parsed = parse_date_fields(pluck_data(response.json()))
353+
processed_requests.extend(response_parsed.get('processedRequests', []))
354+
unprocessed_requests.extend(response_parsed.get('unprocessedRequests', []))
374355

375356
return {
376357
'processedRequests': processed_requests,
@@ -667,14 +648,12 @@ async def delete_request_lock(
667648

668649
async def _batch_add_requests_worker(
669650
self,
670-
queue: asyncio.Queue[AddRequestsBatch],
651+
queue: asyncio.Queue[Iterable[dict]],
671652
request_params: dict,
672-
max_unprocessed_requests_retries: int,
673-
min_delay_between_unprocessed_requests_retries: timedelta,
674653
) -> BatchAddRequestsResult:
675654
"""Worker function to process a batch of requests.
676655
677-
This worker will process batches from the queue, retrying requests that fail until the retry limit is reached.
656+
This worker will process batches from the queue.
678657
679658
Return result containing lists of processed and unprocessed requests by the worker.
680659
"""
@@ -684,7 +663,7 @@ async def _batch_add_requests_worker(
684663
while True:
685664
# Get the next batch from the queue.
686665
try:
687-
batch = await queue.get()
666+
request_batch = await queue.get()
688667
except asyncio.CancelledError:
689668
break
690669

@@ -694,25 +673,13 @@ async def _batch_add_requests_worker(
694673
url=self._url('requests/batch'),
695674
method='POST',
696675
params=request_params,
697-
json=list(batch.requests),
676+
json=list(request_batch),
698677
timeout_secs=_MEDIUM_TIMEOUT,
699678
)
700679

701680
response_parsed = parse_date_fields(pluck_data(response.json()))
702-
703-
# Retry if the request failed and the retry limit has not been reached.
704-
if not response.is_success and batch.num_of_retries < max_unprocessed_requests_retries:
705-
batch.num_of_retries += 1
706-
await asyncio.sleep(min_delay_between_unprocessed_requests_retries.total_seconds())
707-
await queue.put(batch)
708-
709-
# Otherwise, add the processed/unprocessed requests to their respective lists.
710-
else:
711-
processed_requests.extend(response_parsed.get('processedRequests', []))
712-
unprocessed_requests.extend(response_parsed.get('unprocessedRequests', []))
713-
714-
except Exception as exc:
715-
logger.warning(f'Error occurred while processing a batch of requests: {exc}')
681+
processed_requests.extend(response_parsed.get('processedRequests', []))
682+
unprocessed_requests.extend(response_parsed.get('unprocessedRequests', []))
716683

717684
finally:
718685
# Mark the batch as done whether it succeeded or failed.
@@ -729,8 +696,8 @@ async def batch_add_requests(
729696
*,
730697
forefront: bool = False,
731698
max_parallel: int = 5,
732-
max_unprocessed_requests_retries: int = 3,
733-
min_delay_between_unprocessed_requests_retries: timedelta = timedelta(milliseconds=500),
699+
max_unprocessed_requests_retries: int | None = None,
700+
min_delay_between_unprocessed_requests_retries: timedelta | None = None,
734701
) -> BatchAddRequestsResult:
735702
"""Add requests to the request queue in batches.
736703
@@ -744,15 +711,19 @@ async def batch_add_requests(
744711
max_parallel: Specifies the maximum number of parallel tasks for API calls. This is only applicable
745712
to the async client. For the sync client, this value must be set to 1, as parallel execution
746713
is not supported.
747-
max_unprocessed_requests_retries: Number of retry attempts for unprocessed requests.
748-
min_delay_between_unprocessed_requests_retries: Minimum delay between retry attempts for unprocessed
749-
requests.
714+
max_unprocessed_requests_retries: Deprecated argument. Will be removed in next major release.
715+
min_delay_between_unprocessed_requests_retries: Deprecated argument. Will be removed in next major release.
750716
751717
Returns:
752718
Result containing lists of processed and unprocessed requests.
753719
"""
720+
if max_unprocessed_requests_retries:
721+
logger.warning('`max_unprocessed_requests_retries` is deprecated and not used anymore.')
722+
if min_delay_between_unprocessed_requests_retries:
723+
logger.warning('`min_delay_between_unprocessed_requests_retries` is deprecated and not used anymore.')
724+
754725
tasks = set[asyncio.Task]()
755-
queue: asyncio.Queue[AddRequestsBatch] = asyncio.Queue()
726+
queue: asyncio.Queue[Iterable[dict]] = asyncio.Queue()
756727
request_params = self._params(clientKey=self.client_key, forefront=forefront)
757728

758729
# Compute the payload size limit to ensure it doesn't exceed the maximum allowed size.
@@ -766,15 +737,13 @@ async def batch_add_requests(
766737
)
767738

768739
for batch in batches:
769-
await queue.put(AddRequestsBatch(batch))
740+
await queue.put(batch)
770741

771742
# Start a required number of worker tasks to process the batches.
772743
for i in range(max_parallel):
773744
coro = self._batch_add_requests_worker(
774745
queue,
775746
request_params,
776-
max_unprocessed_requests_retries,
777-
min_delay_between_unprocessed_requests_retries,
778747
)
779748
task = asyncio.create_task(coro, name=f'batch_add_requests_worker_{i}')
780749
tasks.add(task)
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import pytest
2+
import respx
3+
4+
import apify_client
5+
from apify_client import ApifyClient, ApifyClientAsync
6+
7+
_PARTIALLY_ADDED_BATCH_RESPONSE_CONTENT = """{
8+
"data": {
9+
"processedRequests": [
10+
{
11+
"requestId": "YiKoxjkaS9gjGTqhF",
12+
"uniqueKey": "http://example.com/1",
13+
"wasAlreadyPresent": true,
14+
"wasAlreadyHandled": false
15+
}
16+
],
17+
"unprocessedRequests": [
18+
{
19+
"uniqueKey": "http://example.com/2",
20+
"url": "http://example.com/2",
21+
"method": "GET"
22+
}
23+
]
24+
}
25+
}"""
26+
27+
28+
@respx.mock
29+
async def test_batch_not_processed_raises_exception_async() -> None:
30+
"""Test that client exceptions are not silently ignored"""
31+
client = ApifyClientAsync(token='')
32+
33+
respx.route(method='POST', host='api.apify.com').mock(return_value=respx.MockResponse(401))
34+
requests = [
35+
{'uniqueKey': 'http://example.com/1', 'url': 'http://example.com/1', 'method': 'GET'},
36+
{'uniqueKey': 'http://example.com/2', 'url': 'http://example.com/2', 'method': 'GET'},
37+
]
38+
rq_client = client.request_queue(request_queue_id='whatever')
39+
40+
with pytest.raises(apify_client._errors.ApifyApiError):
41+
await rq_client.batch_add_requests(requests=requests)
42+
43+
44+
@respx.mock
45+
async def test_batch_processed_partially_async() -> None:
46+
client = ApifyClientAsync(token='')
47+
48+
respx.route(method='POST', host='api.apify.com').mock(
49+
return_value=respx.MockResponse(200, content=_PARTIALLY_ADDED_BATCH_RESPONSE_CONTENT)
50+
)
51+
requests = [
52+
{'uniqueKey': 'http://example.com/1', 'url': 'http://example.com/1', 'method': 'GET'},
53+
{'uniqueKey': 'http://example.com/2', 'url': 'http://example.com/2', 'method': 'GET'},
54+
]
55+
rq_client = client.request_queue(request_queue_id='whatever')
56+
57+
response = await rq_client.batch_add_requests(requests=requests)
58+
assert requests[0]['uniqueKey'] in {request['uniqueKey'] for request in response['processedRequests']}
59+
assert response['unprocessedRequests'] == [requests[1]]
60+
61+
62+
@respx.mock
63+
def test_batch_not_processed_raises_exception_sync() -> None:
64+
"""Test that client exceptions are not silently ignored"""
65+
client = ApifyClient(token='')
66+
67+
respx.route(method='POST', host='api.apify.com').mock(return_value=respx.MockResponse(401))
68+
requests = [
69+
{'uniqueKey': 'http://example.com/1', 'url': 'http://example.com/1', 'method': 'GET'},
70+
{'uniqueKey': 'http://example.com/2', 'url': 'http://example.com/2', 'method': 'GET'},
71+
]
72+
rq_client = client.request_queue(request_queue_id='whatever')
73+
74+
with pytest.raises(apify_client._errors.ApifyApiError):
75+
rq_client.batch_add_requests(requests=requests)
76+
77+
78+
@respx.mock
79+
async def test_batch_processed_partially_sync() -> None:
80+
client = ApifyClient(token='')
81+
82+
respx.route(method='POST', host='api.apify.com').mock(
83+
return_value=respx.MockResponse(200, content=_PARTIALLY_ADDED_BATCH_RESPONSE_CONTENT)
84+
)
85+
requests = [
86+
{'uniqueKey': 'http://example.com/1', 'url': 'http://example.com/1', 'method': 'GET'},
87+
{'uniqueKey': 'http://example.com/2', 'url': 'http://example.com/2', 'method': 'GET'},
88+
]
89+
rq_client = client.request_queue(request_queue_id='whatever')
90+
91+
response = rq_client.batch_add_requests(requests=requests)
92+
assert requests[0]['uniqueKey'] in {request['uniqueKey'] for request in response['processedRequests']}
93+
assert response['unprocessedRequests'] == [requests[1]]

0 commit comments

Comments
 (0)