|
| 1 | +import multiprocessing |
| 2 | +import pytest |
| 3 | +from unittest.mock import patch, Mock |
| 4 | + |
| 5 | +from typing import Optional |
| 6 | +from kubeflow.training import TrainingClient |
| 7 | +from kubeflow.training import KubeflowOrgV1ReplicaSpec |
| 8 | +from kubeflow.training import KubeflowOrgV1PyTorchJob |
| 9 | +from kubeflow.training import KubeflowOrgV1PyTorchJobSpec |
| 10 | +from kubeflow.training import KubeflowOrgV1RunPolicy |
| 11 | +from kubeflow.training import KubeflowOrgV1SchedulingPolicy |
| 12 | +from kubeflow.training import constants |
| 13 | + |
| 14 | +from kubernetes.client import V1PodTemplateSpec |
| 15 | +from kubernetes.client import V1ObjectMeta |
| 16 | +from kubernetes.client import V1PodSpec |
| 17 | +from kubernetes.client import V1Container |
| 18 | +from kubernetes.client import V1ResourceRequirements |
| 19 | + |
| 20 | + |
| 21 | +def create_namespaced_custom_object_response(*args, **kwargs): |
| 22 | + if args[2] == "timeout": |
| 23 | + raise multiprocessing.TimeoutError() |
| 24 | + elif args[2] == "runtime": |
| 25 | + raise RuntimeError() |
| 26 | + |
| 27 | + |
| 28 | +def generate_container() -> V1Container: |
| 29 | + return V1Container( |
| 30 | + name="pytorch", |
| 31 | + image="gcr.io/kubeflow-ci/pytorch-dist-mnist-test:v1.0", |
| 32 | + args=["--backend", "gloo"], |
| 33 | + resources=V1ResourceRequirements(limits={"memory": "1Gi", "cpu": "0.4"}), |
| 34 | + ) |
| 35 | + |
| 36 | + |
| 37 | +def generate_pytorchjob( |
| 38 | + job_namespace: str, |
| 39 | + master: KubeflowOrgV1ReplicaSpec, |
| 40 | + worker: KubeflowOrgV1ReplicaSpec, |
| 41 | + scheduling_policy: Optional[KubeflowOrgV1SchedulingPolicy] = None, |
| 42 | +) -> KubeflowOrgV1PyTorchJob: |
| 43 | + return KubeflowOrgV1PyTorchJob( |
| 44 | + api_version=constants.API_VERSION, |
| 45 | + kind=constants.PYTORCHJOB_KIND, |
| 46 | + metadata=V1ObjectMeta(name="pytorchjob-mnist-ci-test", namespace=job_namespace), |
| 47 | + spec=KubeflowOrgV1PyTorchJobSpec( |
| 48 | + run_policy=KubeflowOrgV1RunPolicy( |
| 49 | + clean_pod_policy="None", |
| 50 | + scheduling_policy=scheduling_policy, |
| 51 | + ), |
| 52 | + pytorch_replica_specs={"Master": master, "Worker": worker}, |
| 53 | + ), |
| 54 | + ) |
| 55 | + |
| 56 | + |
| 57 | +def create_job(): |
| 58 | + job_namespace = "test" |
| 59 | + container = generate_container() |
| 60 | + master = KubeflowOrgV1ReplicaSpec( |
| 61 | + replicas=1, |
| 62 | + restart_policy="OnFailure", |
| 63 | + template=V1PodTemplateSpec( |
| 64 | + metadata=V1ObjectMeta( |
| 65 | + annotations={constants.ISTIO_SIDECAR_INJECTION: "false"} |
| 66 | + ), |
| 67 | + spec=V1PodSpec(containers=[container]), |
| 68 | + ), |
| 69 | + ) |
| 70 | + |
| 71 | + worker = KubeflowOrgV1ReplicaSpec( |
| 72 | + replicas=1, |
| 73 | + restart_policy="OnFailure", |
| 74 | + template=V1PodTemplateSpec( |
| 75 | + metadata=V1ObjectMeta( |
| 76 | + annotations={constants.ISTIO_SIDECAR_INJECTION: "false"} |
| 77 | + ), |
| 78 | + spec=V1PodSpec(containers=[container]), |
| 79 | + ), |
| 80 | + ) |
| 81 | + pytorchjob = generate_pytorchjob(job_namespace, master, worker) |
| 82 | + return pytorchjob |
| 83 | + |
| 84 | + |
| 85 | +class DummyJobClass: |
| 86 | + def __init__(self, kind) -> None: |
| 87 | + self.kind = kind |
| 88 | + |
| 89 | + |
| 90 | +test_data = [ |
| 91 | + ( |
| 92 | + "invalid extra parameter", |
| 93 | + {"job": create_job(), "namespace": "test", "base_image": "test_image"}, |
| 94 | + ValueError, |
| 95 | + ), |
| 96 | + ("invalid job kind", {"job_kind": "invalid_job_kind"}, ValueError), |
| 97 | + ( |
| 98 | + "job name missing ", |
| 99 | + {"train_func": lambda: "test train function"}, |
| 100 | + ValueError, |
| 101 | + ), |
| 102 | + ("job name missing", {"base_image": "test_image"}, ValueError), |
| 103 | + ( |
| 104 | + "uncallable train function", |
| 105 | + {"name": "test job", "train_func": "uncallable train function"}, |
| 106 | + ValueError, |
| 107 | + ), |
| 108 | + ( |
| 109 | + "invalid TFJob replica", |
| 110 | + { |
| 111 | + "name": "test job", |
| 112 | + "train_func": lambda: "test train function", |
| 113 | + "job_kind": constants.TFJOB_KIND, |
| 114 | + }, |
| 115 | + ValueError, |
| 116 | + ), |
| 117 | + ( |
| 118 | + "invalid PyTorchJob replica", |
| 119 | + { |
| 120 | + "name": "test job", |
| 121 | + "train_func": lambda: "test train function", |
| 122 | + "job_kind": constants.PYTORCHJOB_KIND, |
| 123 | + }, |
| 124 | + ValueError, |
| 125 | + ), |
| 126 | + ( |
| 127 | + "invalid pod template spec parameters", |
| 128 | + { |
| 129 | + "name": "test job", |
| 130 | + "train_func": lambda: "test train function", |
| 131 | + "job_kind": constants.MXJOB_KIND, |
| 132 | + }, |
| 133 | + KeyError, |
| 134 | + ), |
| 135 | + ( |
| 136 | + "paddle job can't be created using function", |
| 137 | + { |
| 138 | + "name": "test job", |
| 139 | + "train_func": lambda: "test train function", |
| 140 | + "job_kind": constants.PADDLEJOB_KIND, |
| 141 | + }, |
| 142 | + ValueError, |
| 143 | + ), |
| 144 | + ( |
| 145 | + "invalid job object", |
| 146 | + {"job": DummyJobClass(constants.TFJOB_KIND)}, |
| 147 | + ValueError, |
| 148 | + ), |
| 149 | + ( |
| 150 | + "create_namespaced_custom_object timeout error", |
| 151 | + {"job": create_job(), "namespace": "timeout"}, |
| 152 | + TimeoutError, |
| 153 | + ), |
| 154 | + ( |
| 155 | + "create_namespaced_custom_object runtime error", |
| 156 | + {"job": create_job(), "namespace": "runtime"}, |
| 157 | + RuntimeError, |
| 158 | + ), |
| 159 | + ( |
| 160 | + "valid flow", |
| 161 | + {"job": create_job(), "namespace": "test"}, |
| 162 | + "success", |
| 163 | + ), |
| 164 | +] |
| 165 | + |
| 166 | + |
| 167 | +@pytest.fixture |
| 168 | +def training_client(): |
| 169 | + with patch( |
| 170 | + "kubernetes.client.CustomObjectsApi", |
| 171 | + return_value=Mock( |
| 172 | + create_namespaced_custom_object=Mock( |
| 173 | + side_effect=create_namespaced_custom_object_response |
| 174 | + ) |
| 175 | + ), |
| 176 | + ), patch("kubernetes.client.CoreV1Api", return_value=Mock()), patch( |
| 177 | + "kubernetes.config.load_kube_config", return_value=Mock() |
| 178 | + ): |
| 179 | + client = TrainingClient(job_kind=constants.PYTORCHJOB_KIND) |
| 180 | + yield client |
| 181 | + |
| 182 | + |
| 183 | +@pytest.mark.parametrize("test_name,kwargs,expected_output", test_data) |
| 184 | +def test_create_job(training_client, test_name, kwargs, expected_output): |
| 185 | + """ |
| 186 | + test create_job function of training client |
| 187 | + """ |
| 188 | + print("Executing test:", test_name) |
| 189 | + try: |
| 190 | + training_client.create_job(**kwargs) |
| 191 | + assert expected_output == "success" |
| 192 | + except Exception as e: |
| 193 | + assert type(e) == expected_output |
| 194 | + print("test execution complete") |
0 commit comments