Skip to content

Commit 0b01a41

Browse files
author
Joan Fontanals
authored
Merge branch 'master' into check-grpcio
2 parents 9623393 + fbdde03 commit 0b01a41

File tree

14 files changed

+190
-102
lines changed

14 files changed

+190
-102
lines changed

jina/clients/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ def Client(
3030
prefetch: Optional[int] = 1000,
3131
protocol: Optional[Union[str, List[str]]] = 'GRPC',
3232
proxy: Optional[bool] = False,
33+
reuse_session: Optional[bool] = False,
3334
suppress_root_logging: Optional[bool] = False,
3435
tls: Optional[bool] = False,
3536
traces_exporter_host: Optional[str] = None,
@@ -59,6 +60,7 @@ def Client(
5960
Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
6061
:param protocol: Communication protocol between server and client.
6162
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
63+
:param reuse_session: True if HTTPClient should reuse ClientSession. If true, user will be responsible to close it
6264
:param suppress_root_logging: If set, then no root handlers will be suppressed from logging.
6365
:param tls: If set, connect to gateway using tls encryption
6466
:param traces_exporter_host: If tracing is enabled, this hostname will be used to configure the trace exporter agent.
@@ -113,6 +115,7 @@ def Client(args: Optional['argparse.Namespace'] = None, **kwargs) -> Union[
113115
Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
114116
:param protocol: Communication protocol between server and client.
115117
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
118+
:param reuse_session: True if HTTPClient should reuse ClientSession. If true, user will be responsible to close it
116119
:param suppress_root_logging: If set, then no root handlers will be suppressed from logging.
117120
:param tls: If set, connect to gateway using tls encryption
118121
:param traces_exporter_host: If tracing is enabled, this hostname will be used to configure the trace exporter agent.

jina/clients/base/__init__.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ class BaseClient(InstrumentationMixin, ABC):
2929
"""
3030

3131
def __init__(
32-
self,
33-
args: Optional['argparse.Namespace'] = None,
34-
**kwargs,
32+
self,
33+
args: Optional['argparse.Namespace'] = None,
34+
**kwargs,
3535
):
3636
if args and isinstance(args, argparse.Namespace):
3737
self.args = args
@@ -63,6 +63,12 @@ def __init__(
6363
)
6464
send_telemetry_event(event='start', obj_cls_name=self.__class__.__name__)
6565

66+
async def close(self):
67+
"""Closes the potential resources of the Client.
68+
:return: Return whatever a close method may return
69+
"""
70+
return self.teardown_instrumentation()
71+
6672
def teardown_instrumentation(self):
6773
"""Shut down the OpenTelemetry tracer and meter if available. This ensures that the daemon threads for
6874
exporting metrics data is properly cleaned up.
@@ -118,7 +124,7 @@ def check_input(inputs: Optional['InputType'] = None, **kwargs) -> None:
118124
raise BadClientInput from ex
119125

120126
def _get_requests(
121-
self, **kwargs
127+
self, **kwargs
122128
) -> Union[Iterator['Request'], AsyncIterator['Request']]:
123129
"""
124130
Get request in generator.
@@ -177,13 +183,14 @@ def inputs(self, bytes_gen: 'InputType') -> None:
177183

178184
@abc.abstractmethod
179185
async def _get_results(
180-
self,
181-
inputs: 'InputType',
182-
on_done: 'CallbackFnType',
183-
on_error: Optional['CallbackFnType'] = None,
184-
on_always: Optional['CallbackFnType'] = None,
185-
**kwargs,
186-
): ...
186+
self,
187+
inputs: 'InputType',
188+
on_done: 'CallbackFnType',
189+
on_error: Optional['CallbackFnType'] = None,
190+
on_always: Optional['CallbackFnType'] = None,
191+
**kwargs,
192+
):
193+
...
187194

188195
@abc.abstractmethod
189196
def _is_flow_ready(self, **kwargs) -> bool:

jina/clients/base/helper.py

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ class AioHttpClientlet(ABC):
4848

4949
def __init__(
5050
self,
51-
url: str,
5251
logger: 'JinaLogger',
5352
max_attempts: int = 1,
5453
initial_backoff: float = 0.5,
@@ -59,7 +58,6 @@ def __init__(
5958
) -> None:
6059
"""HTTP Client to be used with the streamer
6160
62-
:param url: url to send http/websocket request to
6361
:param logger: jina logger
6462
:param max_attempts: Number of sending attempts, including the original request.
6563
:param initial_backoff: The first retry will happen with a delay of random(0, initial_backoff)
@@ -68,7 +66,6 @@ def __init__(
6866
:param tracer_provider: Optional tracer_provider that will be used to configure aiohttp tracing.
6967
:param kwargs: kwargs which will be forwarded to the `aiohttp.Session` instance. Used to pass headers to requests
7068
"""
71-
self.url = url
7269
self.logger = logger
7370
self.msg_recv = 0
7471
self.msg_sent = 0
@@ -131,7 +128,6 @@ async def start(self):
131128
"""
132129
with ImportExtensions(required=True):
133130
import aiohttp
134-
135131
self.session = aiohttp.ClientSession(
136132
**self._session_kwargs, trace_configs=self._trace_config
137133
)
@@ -154,9 +150,10 @@ class HTTPClientlet(AioHttpClientlet):
154150

155151
UPDATE_EVENT_PREFIX = 14 # the update event has the following format: "event: update: {document_json}"
156152

157-
async def send_message(self, request: 'Request'):
153+
async def send_message(self, url, request: 'Request'):
158154
"""Sends a POST request to the server
159155
156+
:param url: the URL where to send the message
160157
:param request: request as dict
161158
:return: send post message
162159
"""
@@ -166,23 +163,24 @@ async def send_message(self, request: 'Request'):
166163
req_dict['target_executor'] = req_dict['header']['target_executor']
167164
for attempt in range(1, self.max_attempts + 1):
168165
try:
169-
request_kwargs = {'url': self.url}
166+
request_kwargs = {'url': url}
170167
if not docarray_v2:
171168
request_kwargs['json'] = req_dict
172169
else:
173170
from docarray.base_doc.io.json import orjson_dumps
174171

175172
request_kwargs['data'] = JinaJsonPayload(value=req_dict)
173+
176174
async with self.session.post(**request_kwargs) as response:
177175
try:
178176
r_str = await response.json()
179177
except aiohttp.ContentTypeError:
180178
r_str = await response.text()
181179
r_status = response.status
182-
handle_response_status(response.status, r_str, self.url)
183-
return r_status, r_str
180+
handle_response_status(r_status, r_str, url)
181+
return r_status, r_str
184182
except (ValueError, ConnectionError, BadClient, aiohttp.ClientError, aiohttp.ClientConnectionError) as err:
185-
self.logger.debug(f'Got an error: {err} sending POST to {self.url} in attempt {attempt}/{self.max_attempts}')
183+
self.logger.debug(f'Got an error: {err} sending POST to {url} in attempt {attempt}/{self.max_attempts}')
186184
await retry.wait_or_raise_err(
187185
attempt=attempt,
188186
err=err,
@@ -193,19 +191,20 @@ async def send_message(self, request: 'Request'):
193191
)
194192
except Exception as exc:
195193
self.logger.debug(
196-
f'Got a non-retried error: {exc} sending POST to {self.url}')
194+
f'Got a non-retried error: {exc} sending POST to {url}')
197195
raise exc
198196

199-
async def send_streaming_message(self, doc: 'Document', on: str):
197+
async def send_streaming_message(self, url, doc: 'Document', on: str):
200198
"""Sends a GET SSE request to the server
201199
200+
:param url: the URL where to send the message
202201
:param doc: Request Document
203202
:param on: Request endpoint
204203
:yields: responses
205204
"""
206205
req_dict = doc.to_dict() if hasattr(doc, "to_dict") else doc.dict()
207206
request_kwargs = {
208-
'url': self.url,
207+
'url': url,
209208
'headers': {'Accept': 'text/event-stream'},
210209
'json': req_dict,
211210
}
@@ -219,13 +218,14 @@ async def send_streaming_message(self, doc: 'Document', on: str):
219218
elif event.startswith(b'end'):
220219
pass
221220

222-
async def send_dry_run(self, **kwargs):
221+
async def send_dry_run(self, url, **kwargs):
223222
"""Query the dry_run endpoint from Gateway
223+
:param url: the URL where to send the message
224224
:param kwargs: keyword arguments to make sure compatible API with other clients
225225
:return: send get message
226226
"""
227227
return await self.session.get(
228-
url=self.url, timeout=kwargs.get('timeout', None)
228+
url=url, timeout=kwargs.get('timeout', None)
229229
).__aenter__()
230230

231231
async def recv_message(self):
@@ -267,8 +267,9 @@ async def __anext__(self):
267267
class WebsocketClientlet(AioHttpClientlet):
268268
"""Websocket Client to be used with the streamer"""
269269

270-
def __init__(self, *args, **kwargs) -> None:
270+
def __init__(self, url, *args, **kwargs) -> None:
271271
super().__init__(*args, **kwargs)
272+
self.url = url
272273
self.websocket = None
273274
self.response_iter = None
274275

0 commit comments

Comments
 (0)