Skip to content

Commit 8bd5083

Browse files
authored
Add Autoscaling to Operator (#451)
1 parent 4a69e3a commit 8bd5083

File tree

16 files changed

+422
-11
lines changed

16 files changed

+422
-11
lines changed

.github/workflows/helmcluster.yaml

+6-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ jobs:
2222
matrix:
2323
python-version: ["3.8", "3.9", "3.10"]
2424

25+
env:
26+
KUBECONFIG: .pytest-kind/pytest-kind/kubeconfig
27+
2528
steps:
2629
- uses: actions/checkout@v2
2730
- uses: actions/setup-python@v2
@@ -30,6 +33,7 @@ jobs:
3033
- name: Install deps
3134
run: ./ci/install-deps.sh
3235
- name: Run tests
33-
env:
34-
KUBECONFIG: .pytest-kind/pytest-kind/kubeconfig
3536
run: pytest dask_kubernetes/common/tests dask_kubernetes/helm/tests
37+
- name: Debug k8s resources
38+
if: success() || failure()
39+
run: kubectl get all -A

.github/workflows/kubecluster.yaml

+6-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ jobs:
2222
matrix:
2323
python-version: ["3.8", "3.9", "3.10"]
2424

25+
env:
26+
KUBECONFIG: .pytest-kind/pytest-kind/kubeconfig
27+
2528
steps:
2629
- uses: actions/checkout@v2
2730
- uses: actions/setup-python@v2
@@ -30,6 +33,7 @@ jobs:
3033
- name: Install deps
3134
run: ./ci/install-deps.sh
3235
- name: Run tests
33-
env:
34-
KUBECONFIG: .pytest-kind/pytest-kind/kubeconfig
3536
run: pytest dask_kubernetes/common/tests dask_kubernetes/classic/tests
37+
- name: Debug k8s resources
38+
if: success() || failure()
39+
run: kubectl get all -A

.github/workflows/operator.yaml

+6-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ jobs:
2424
matrix:
2525
python-version: ["3.8", "3.9", "3.10"]
2626

27+
env:
28+
KUBECONFIG: .pytest-kind/pytest-kind/kubeconfig
29+
2730
steps:
2831
- uses: actions/checkout@v2
2932
- uses: actions/setup-python@v2
@@ -33,6 +36,8 @@ jobs:
3336
run: ./ci/install-deps.sh
3437
- name: Run tests
3538
env:
36-
KUBECONFIG: .pytest-kind/pytest-kind/kubeconfig
3739
TEST_ISTIO: "true"
3840
run: pytest dask_kubernetes/common/tests dask_kubernetes/operator/tests dask_kubernetes/experimental/tests
41+
- name: Debug k8s resources
42+
if: success() || failure()
43+
run: kubectl get all -A

ci/pre-commit-crd.py

+1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ def main(version, *args):
8383
run_action("daskcluster.yaml", temp_dir, crd_path, output_paths)
8484
run_action("daskworkergroup.yaml", temp_dir, crd_path, output_paths)
8585
run_action("daskjob.yaml", temp_dir, crd_path, output_paths)
86+
run_action("daskautoscaler.yaml", temp_dir, crd_path, output_paths)
8687

8788
else:
8889
run_action(changed_file, temp_dir, crd_path, output_paths)

dask_kubernetes/conftest.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ def customresources(k8s_cluster):
6767
temp_dir = tempfile.TemporaryDirectory()
6868
crd_path = os.path.join(DIR, "operator", "customresources")
6969

70-
for crd in ["daskcluster", "daskworkergroup", "daskjob"]:
70+
for crd in ["daskcluster", "daskworkergroup", "daskjob", "daskautoscaler"]:
7171
run_generate(
7272
os.path.join(crd_path, f"{crd}.yaml"),
7373
os.path.join(crd_path, f"{crd}.patch.yaml"),

dask_kubernetes/experimental/kubecluster.py

+62-5
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,15 @@ async def _scale(self, n, worker_group="default"):
491491
custom_objects_api.api_client.set_default_header(
492492
"content-type", "application/merge-patch+json"
493493
)
494+
# Disable adaptivity if enabled
495+
with suppress(kubernetes.client.ApiException):
496+
await custom_objects_api.delete_namespaced_custom_object(
497+
group="kubernetes.dask.org",
498+
version="v1",
499+
plural="daskautoscalers",
500+
namespace=self.namespace,
501+
name=self.name,
502+
)
494503
await custom_objects_api.patch_namespaced_custom_object_scale(
495504
group="kubernetes.dask.org",
496505
version="v1",
@@ -500,11 +509,59 @@ async def _scale(self, n, worker_group="default"):
500509
body={"spec": {"replicas": n}},
501510
)
502511

503-
def adapt(self, *args, **kwargs):
504-
"""Turn on adaptivity"""
505-
raise NotImplementedError(
506-
"Adaptive mode is not supported yet for this KubeCluster."
507-
)
512+
def adapt(self, minimum=None, maximum=None):
513+
"""Turn on adaptivity
514+
515+
Parameters
516+
----------
517+
minimum : int
518+
Minimum number of workers
519+
minimum : int
520+
Maximum number of workers
521+
522+
Examples
523+
--------
524+
>>> cluster.adapt() # Allow scheduler to add/remove workers within k8s cluster resource limits
525+
>>> cluster.adapt(minimum=1, maximum=10) # Allow scheduler to add/remove workers within 1-10 range
526+
"""
527+
return self.sync(self._adapt, minimum, maximum)
528+
529+
async def _adapt(self, minimum=None, maximum=None):
530+
async with kubernetes.client.api_client.ApiClient() as api_client:
531+
custom_objects_api = kubernetes.client.CustomObjectsApi(api_client)
532+
custom_objects_api.api_client.set_default_header(
533+
"content-type", "application/merge-patch+json"
534+
)
535+
try:
536+
await custom_objects_api.patch_namespaced_custom_object_scale(
537+
group="kubernetes.dask.org",
538+
version="v1",
539+
plural="daskautoscalers",
540+
namespace=self.namespace,
541+
name=self.name,
542+
body={"spec": {"minimum": minimum, "maximum": maximum}},
543+
)
544+
except kubernetes.client.ApiException:
545+
await custom_objects_api.create_namespaced_custom_object(
546+
group="kubernetes.dask.org",
547+
version="v1",
548+
plural="daskautoscalers",
549+
namespace=self.namespace,
550+
body={
551+
"apiVersion": "kubernetes.dask.org/v1",
552+
"kind": "DaskAutoscaler",
553+
"metadata": {
554+
"name": self.name,
555+
"dask.org/cluster-name": self.cluster_name,
556+
"dask.org/component": "autoscaler",
557+
},
558+
"spec": {
559+
"cluster": self.cluster_name,
560+
"minimum": minimum,
561+
"maximum": maximum,
562+
},
563+
},
564+
)
508565

509566
def _build_scheduler_spec(self, cluster_name):
510567
# TODO: Take the values provided in the current class constructor

dask_kubernetes/experimental/tests/test_kubecluster.py

+16
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,19 @@ def test_additional_worker_groups(kopf_runner, docker_image):
8484
def test_cluster_without_operator(docker_image):
8585
with pytest.raises(TimeoutError, match="is the Dask Operator running"):
8686
KubeCluster(name="noop", n_workers=1, image=docker_image, resource_timeout=1)
87+
88+
89+
def test_adapt(kopf_runner, docker_image):
90+
with kopf_runner:
91+
with KubeCluster(
92+
name="adaptive",
93+
image=docker_image,
94+
n_workers=0,
95+
) as cluster:
96+
cluster.adapt(minimum=0, maximum=1)
97+
with Client(cluster) as client:
98+
assert client.submit(lambda x: x + 1, 10).result() == 11
99+
100+
# Need to clean up the DaskAutoscaler object
101+
# See https://github.com/dask/dask-kubernetes/issues/546
102+
cluster.scale(0)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
[]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
apiVersion: apiextensions.k8s.io/v1
2+
kind: CustomResourceDefinition
3+
metadata:
4+
name: daskautoscalers.kubernetes.dask.org
5+
spec:
6+
scope: Namespaced
7+
group: kubernetes.dask.org
8+
names:
9+
kind: DaskAutoscaler
10+
plural: daskautoscalers
11+
singular: daskautoscaler
12+
versions:
13+
- name: v1
14+
served: true
15+
storage: true
16+
schema:
17+
openAPIV3Schema:
18+
type: object
19+
properties:
20+
spec:
21+
type: object
22+
required:
23+
- cluster
24+
- minimum
25+
- maximum
26+
properties:
27+
cluster:
28+
type: string
29+
description: Name of the cluster to associate this worker group with
30+
minimum:
31+
type: integer
32+
description: Minimum number of workers
33+
maximum:
34+
type: integer
35+
description: Maximum number of workers
36+
status:
37+
type: object
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
apiVersion: apiextensions.k8s.io/v1
2+
kind: CustomResourceDefinition
3+
metadata:
4+
name: daskautoscalers.kubernetes.dask.org
5+
spec:
6+
group: kubernetes.dask.org
7+
names:
8+
kind: DaskAutoscaler
9+
plural: daskautoscalers
10+
singular: daskautoscaler
11+
scope: Namespaced
12+
versions:
13+
- name: v1
14+
schema:
15+
openAPIV3Schema:
16+
properties:
17+
spec:
18+
properties:
19+
cluster:
20+
description: Name of the cluster to associate this worker group with
21+
type: string
22+
maximum:
23+
description: Maximum number of workers
24+
type: integer
25+
minimum:
26+
description: Minimum number of workers
27+
type: integer
28+
required:
29+
- cluster
30+
- minimum
31+
- maximum
32+
type: object
33+
status:
34+
type: object
35+
type: object
36+
served: true
37+
storage: true
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
apiVersion: apiextensions.k8s.io/v1
2+
kind: CustomResourceDefinition
3+
metadata:
4+
name: daskautoscalers.kubernetes.dask.org
5+
spec:
6+
group: kubernetes.dask.org
7+
names:
8+
kind: DaskAutoscaler
9+
plural: daskautoscalers
10+
singular: daskautoscaler
11+
scope: Namespaced
12+
versions:
13+
- name: v1
14+
schema:
15+
openAPIV3Schema:
16+
properties:
17+
spec:
18+
properties:
19+
cluster:
20+
description: Name of the cluster to associate this worker group with
21+
type: string
22+
maximum:
23+
description: Maximum number of workers
24+
type: integer
25+
minimum:
26+
description: Minimum number of workers
27+
type: integer
28+
required:
29+
- cluster
30+
- minimum
31+
- maximum
32+
type: object
33+
status:
34+
type: object
35+
type: object
36+
served: true
37+
storage: true

0 commit comments

Comments
 (0)