-
-
Notifications
You must be signed in to change notification settings - Fork 153
Use SpecCluster #162
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use SpecCluster #162
Conversation
I'm currently facing issues with the way distributed handles asyncio. from dask_kubernetes import KubeCluster
cluster = KubeCluster() ---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-4-3751171c0c19> in <module>
----> 1 cluster = KubeCluster()
~/Projects/dask/dask-kubernetes/dask_kubernetes/core.py in __init__(self, pod_template, name, namespace, n_workers, host, port, env, auth, **kwargs)
334 }
335
--> 336 super().__init__({}, scheduler, worker, **kwargs)
337
338 @classmethod
~/miniconda3/lib/python3.7/site-packages/distributed/deploy/spec.py in __init__(self, workers, scheduler, worker, asynchronous, loop, silence_logs)
142 if not self.asynchronous:
143 self._loop_runner.start()
--> 144 self.sync(self._start)
145 self.sync(self._correct_state)
146 self.sync(self._wait_for_workers)
~/miniconda3/lib/python3.7/site-packages/distributed/deploy/cluster.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
243 return future
244 else:
--> 245 return sync(self.loop, func, *args, **kwargs)
~/miniconda3/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
330 e.wait(10)
331 if error[0]:
--> 332 six.reraise(*error[0])
333 else:
334 return result[0]
~/miniconda3/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
691 if value.__traceback__ is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None
~/miniconda3/lib/python3.7/site-packages/distributed/utils.py in f()
315 if callback_timeout is not None:
316 future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
--> 317 result[0] = yield future
318 except Exception as exc:
319 error[0] = sys.exc_info()
~/miniconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
~/miniconda3/lib/python3.7/site-packages/distributed/deploy/spec.py in _start(self)
156 self._lock = asyncio.Lock()
157 self.status = "starting"
--> 158 self.scheduler = await self.scheduler
159 self.status = "running"
160
~/Projects/dask/dask-kubernetes/dask_kubernetes/core.py in _()
46 async def _():
47 async with self.lock:
---> 48 await self.start()
49 return self
50
~/Projects/dask/dask-kubernetes/dask_kubernetes/core.py in start(self)
54 async with async_timeout.timeout(1):
55 self.pod = await self.core_api.create_namespaced_pod(
---> 56 self.namespace, self.pod_template
57 )
58 self.status = "running"
~/miniconda3/lib/python3.7/site-packages/kubernetes_asyncio/client/api_client.py in __call_api(self, resource_path, method, path_params, query_params, header_params, body, post_params, files, response_type, auth_settings, _return_http_data_only, collection_formats, _preload_content, _request_timeout)
164 post_params=post_params, body=body,
165 _preload_content=_preload_content,
--> 166 _request_timeout=_request_timeout)
167
168 self.last_response = response_data
~/miniconda3/lib/python3.7/site-packages/kubernetes_asyncio/client/rest.py in POST(self, url, headers, query_params, post_params, body, _preload_content, _request_timeout)
228 _preload_content=_preload_content,
229 _request_timeout=_request_timeout,
--> 230 body=body))
231
232 async def PUT(self, url, headers=None, query_params=None, post_params=None,
~/miniconda3/lib/python3.7/site-packages/kubernetes_asyncio/client/rest.py in request(self, method, url, query_params, headers, body, post_params, _preload_content, _request_timeout)
169 raise ApiException(status=0, reason=msg)
170
--> 171 r = await self.pool_manager.request(**args)
172 if _preload_content:
173
~/miniconda3/lib/python3.7/site-packages/aiohttp/client.py in _request(self, method, str_or_url, params, data, json, cookies, headers, skip_auto_headers, auth, allow_redirects, max_redirects, compress, chunked, expect100, raise_for_status, read_until_eof, proxy, proxy_auth, timeout, verify_ssl, fingerprint, ssl_context, ssl, proxy_headers, trace_request_ctx)
415 timer = tm.timer()
416 try:
--> 417 with timer:
418 while True:
419 url, auth_from_url = strip_auth_from_url(url)
~/miniconda3/lib/python3.7/site-packages/aiohttp/helpers.py in __enter__(self)
566
567 if task is None:
--> 568 raise RuntimeError('Timeout context manager should be used '
569 'inside a task')
570
RuntimeError: Timeout context manager should be used inside a task |
I'm not very familiar with |
Hmm possibly. It's a bit of a pain that the library is swagger generated, which makes it harder to make changes to. I think the error I'm getting could be a bit misleading. It's within |
Hey, I should be able to help test it, if there would be a need for that. |
@retrry see the checklist in the original comment here. There are also a bunch of changes in Thanks for your offer to test this out. I'm aiming to finish this in the next few weeks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm excited to see this!
I know that this is still WIP, but I couldn't hold myself back from writing a bunch of tiny feedback comments :)
dask_kubernetes/core.py
Outdated
connect_kwargs: dict | ||
kwargs to be passed to asyncssh connections | ||
kwargs: | ||
TODO Document Scheduler kwargs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we defer users to the dask.distributed.Scheduler
docstring?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a neat way of doing this in Python? Or should I just provide a link to the dask docs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably, but I would just link, and maybe add an entry in See Also
Ok I think this is ready for final review and hopefully a merge. I've done some more manual testing on the Pangeo Binder with the switch to a local scheduler by default and everything seems to be working ok. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor comments
In dask-yarn we call this |
I'm more than happy to be uniform. Consistency is good! I wasn't 100% happy with |
@jacobtomlinson thoughts on merging? |
Yep I think I'm ready! |
Woo!
…On Fri, Oct 11, 2019 at 2:58 AM Jacob Tomlinson ***@***.***> wrote:
Merged #162 <#162> into
master.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#162?email_source=notifications&email_token=AACKZTDUK7W6A5B2AGPB4GTQOAW2JA5CNFSM4H7QJNNKYY3PNVWWK3TUL52HS4DFWZEXG43VMVCXMZLOORHG65DJMZUWGYLUNFXW5KTDN5WW2ZLOORPWSZGOUE7S46A#event-2705272440>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTGUX5TRGYPX77IDICTQOAW2JANCNFSM4H7QJNNA>
.
|
Nice work @jacobtomlinson. Glad to see this come together! |
This PR contains a rather large overhaul of the library to use the new
SpecCluster
. I've taken this opportunity to wrap up changes from other PRs adding asyncio support.Changes:
Pod
,Worker
andScheduler
LocalCluster
KubeCluster
onSpecCluster
instead ofCluster
KubeCluster
Currently this relies on changes to
SpecCluster
in dask/distributed#2827.Still to do: