Skip to content

Commit c544fff

Browse files
Bump kr8s to v0.20.* (#938)
* Bump kr8s to v0.20.* * Switch to async for syntax * Move more things to async for * Revert one accidental async for
1 parent ba90177 commit c544fff

File tree

6 files changed

+102
-78
lines changed

6 files changed

+102
-78
lines changed

dask_kubernetes/operator/_objects.py

+81-60
Original file line numberDiff line numberDiff line change
@@ -10,53 +10,65 @@ class DaskCluster(new_class("DaskCluster", "kubernetes.dask.org/v1")):
1010
scalable_spec = "worker.replicas"
1111

1212
async def worker_groups(self) -> List[DaskWorkerGroup]:
13-
return await DaskWorkerGroup.list(
14-
label_selector=f"dask.org/cluster-name={self.name}",
15-
namespace=self.namespace,
16-
)
13+
return [
14+
wg
15+
async for wg in DaskWorkerGroup.list(
16+
label_selector=f"dask.org/cluster-name={self.name}",
17+
namespace=self.namespace,
18+
)
19+
]
1720

1821
async def scheduler_pod(self) -> Pod:
1922
pods = []
2023
while not pods:
21-
pods = await Pod.list(
22-
label_selector=",".join(
23-
[
24-
f"dask.org/cluster-name={self.name}",
25-
"dask.org/component=scheduler",
26-
]
27-
),
28-
namespace=self.namespace,
29-
)
24+
pods = [
25+
pod
26+
async for pod in Pod.list(
27+
label_selector=",".join(
28+
[
29+
f"dask.org/cluster-name={self.name}",
30+
"dask.org/component=scheduler",
31+
]
32+
),
33+
namespace=self.namespace,
34+
)
35+
]
3036
assert len(pods) == 1
3137
return pods[0]
3238

3339
async def scheduler_deployment(self) -> Deployment:
3440
deployments = []
3541
while not deployments:
36-
deployments = await Deployment.list(
37-
label_selector=",".join(
38-
[
39-
f"dask.org/cluster-name={self.name}",
40-
"dask.org/component=scheduler",
41-
]
42-
),
43-
namespace=self.namespace,
44-
)
42+
deployments = [
43+
deployment
44+
async for deployment in Deployment.list(
45+
label_selector=",".join(
46+
[
47+
f"dask.org/cluster-name={self.name}",
48+
"dask.org/component=scheduler",
49+
]
50+
),
51+
namespace=self.namespace,
52+
)
53+
]
4554
assert len(deployments) == 1
4655
return deployments[0]
4756

4857
async def scheduler_service(self) -> Service:
4958
services = []
5059
while not services:
51-
services = await Service.list(
52-
label_selector=",".join(
53-
[
54-
f"dask.org/cluster-name={self.name}",
55-
"dask.org/component=scheduler",
56-
]
57-
),
58-
namespace=self.namespace,
59-
)
60+
services = [
61+
service
62+
async for service in Service.list(
63+
label_selector=",".join(
64+
[
65+
f"dask.org/cluster-name={self.name}",
66+
"dask.org/component=scheduler",
67+
]
68+
),
69+
namespace=self.namespace,
70+
)
71+
]
6072
assert len(services) == 1
6173
return services[0]
6274

@@ -74,28 +86,34 @@ class DaskWorkerGroup(new_class("DaskWorkerGroup", "kubernetes.dask.org/v1")):
7486
scalable_spec = "worker.replicas"
7587

7688
async def pods(self) -> List[Pod]:
77-
return await Pod.list(
78-
label_selector=",".join(
79-
[
80-
f"dask.org/cluster-name={self.spec.cluster}",
81-
"dask.org/component=worker",
82-
f"dask.org/workergroup-name={self.name}",
83-
]
84-
),
85-
namespace=self.namespace,
86-
)
89+
return [
90+
pod
91+
async for pod in Pod.list(
92+
label_selector=",".join(
93+
[
94+
f"dask.org/cluster-name={self.spec.cluster}",
95+
"dask.org/component=worker",
96+
f"dask.org/workergroup-name={self.name}",
97+
]
98+
),
99+
namespace=self.namespace,
100+
)
101+
]
87102

88103
async def deployments(self) -> List[Deployment]:
89-
return await Deployment.list(
90-
label_selector=",".join(
91-
[
92-
f"dask.org/cluster-name={self.spec.cluster}",
93-
"dask.org/component=worker",
94-
f"dask.org/workergroup-name={self.name}",
95-
]
96-
),
97-
namespace=self.namespace,
98-
)
104+
return [
105+
deployment
106+
async for deployment in Deployment.list(
107+
label_selector=",".join(
108+
[
109+
f"dask.org/cluster-name={self.spec.cluster}",
110+
"dask.org/component=worker",
111+
f"dask.org/workergroup-name={self.name}",
112+
]
113+
),
114+
namespace=self.namespace,
115+
)
116+
]
99117

100118
async def cluster(self) -> DaskCluster:
101119
return await DaskCluster.get(self.spec.cluster, namespace=self.namespace)
@@ -113,14 +131,17 @@ async def cluster(self) -> DaskCluster:
113131
async def pod(self) -> Pod:
114132
pods = []
115133
while not pods:
116-
pods = await Pod.list(
117-
label_selector=",".join(
118-
[
119-
f"dask.org/cluster-name={self.name}",
120-
"dask.org/component=job-runner",
121-
]
122-
),
123-
namespace=self.namespace,
124-
)
134+
pods = [
135+
pod
136+
async for pod in Pod.list(
137+
label_selector=",".join(
138+
[
139+
f"dask.org/cluster-name={self.name}",
140+
"dask.org/component=job-runner",
141+
]
142+
),
143+
namespace=self.namespace,
144+
)
145+
]
125146
assert len(pods) == 1
126147
return pods[0]

dask_kubernetes/operator/controller/controller.py

+16-10
Original file line numberDiff line numberDiff line change
@@ -487,11 +487,14 @@ async def retire_workers(
487487
f"Scaling {worker_group_name} failed via the HTTP API and the Dask RPC, falling back to LIFO scaling. "
488488
"This can result in lost data, see https://kubernetes.dask.org/en/latest/operator_troubleshooting.html."
489489
)
490-
workers = await kr8s.asyncio.get(
491-
"deployments",
492-
namespace=namespace,
493-
label_selector={"dask.org/workergroup-name": worker_group_name},
494-
)
490+
workers = [
491+
deployment
492+
async for deployment in kr8s.asyncio.get(
493+
"deployments",
494+
namespace=namespace,
495+
label_selector={"dask.org/workergroup-name": worker_group_name},
496+
)
497+
]
495498
return retire_workers_lifo(workers, n_workers)
496499

497500

@@ -651,11 +654,14 @@ async def daskworkergroup_replica_update(
651654
# the number of workers ends in the correct state
652655
async with worker_group_scale_locks[f"{namespace}/{name}"]:
653656
current_workers = len(
654-
await kr8s.asyncio.get(
655-
"deployments",
656-
namespace=namespace,
657-
label_selector={"dask.org/workergroup-name": name},
658-
)
657+
[
658+
deployment
659+
async for deployment in kr8s.asyncio.get(
660+
"deployments",
661+
namespace=namespace,
662+
label_selector={"dask.org/workergroup-name": name},
663+
)
664+
]
659665
)
660666
assert isinstance(new, int)
661667
desired_workers = new

dask_kubernetes/operator/kubecluster/discovery.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@
88

99
async def discover() -> AsyncIterator[Tuple[str, Callable]]:
1010
try:
11-
clusters = await kr8s.asyncio.get("daskclusters", namespace=kr8s.ALL)
12-
for cluster in clusters:
11+
async for cluster in kr8s.asyncio.get("daskclusters", namespace=kr8s.ALL):
1312
yield (cluster.name, KubeCluster)
1413
except Exception:
1514
return

dask_kubernetes/operator/kubecluster/kubecluster.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -574,13 +574,11 @@ def get_logs(self):
574574
async def _get_logs(self):
575575
logs = Logs()
576576

577-
pods = await kr8s.asyncio.get(
577+
async for pod in kr8s.asyncio.get(
578578
"pods",
579579
namespace=self.namespace,
580580
label_selector=f"dask.org/cluster-name={self.name}",
581-
)
582-
583-
for pod in pods:
581+
):
584582
if "scheduler" in pod.name or "worker" in pod.name:
585583
try:
586584
if pod.status.phase != "Running":

dask_kubernetes/operator/networking.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ async def get_external_address_for_scheduler_service(
5454
host = lb.get("hostname", None) or lb.ip
5555
elif service.spec.type == "NodePort":
5656
port = _get_port(service, port_name, is_node_port=True)
57-
nodes = await kr8s.asyncio.get("nodes")
57+
nodes = [node async for node in kr8s.asyncio.get("nodes")]
5858
host = nodes[0].status.addresses[0].address
5959
elif service.spec.type == "ClusterIP":
6060
port = _get_port(service, port_name)

pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ dependencies = [
1515
"dask>=2022.08.1",
1616
"distributed>=2022.08.1",
1717
"kopf>=1.35.3",
18-
"kr8s==0.17.*",
18+
"kr8s==0.20.*",
1919
"kubernetes-asyncio>=12.0.1",
2020
"kubernetes>=12.0.1",
2121
"pykube-ng>=22.9.0",

0 commit comments

Comments
 (0)