Skip to content

Commit 8f77f27

Browse files
committed
Add a workflow manager to support running hpc apps in k8s cluster
It uses: * `mpi-operator` from `kubeflow` to manage the worker pods * `kustomize` to port locally generated assets into the pods Example workflow for running HPL: * Ramble workspace config: (the `container_image` points to an image containing HPL dependency, courtesy of @akiki-liang0) ```yaml ramble: variants: package_manager: None workflow_manager: gke-mpi applications: hpl: workloads: calculator: experiments: gketest: env_vars: set: OMP_NUM_THREADS: '{n_threads}' OMPI_ALLOW_RUN_AS_ROOT: 1 OMPI_ALLOW_RUN_AS_ROOT_CONFIRM: 1 LD_LIBRARY_PATH: /usr/local/lib/openmpi:/openBLAS:/root/hpl:/root/hpl/bin:/root/hpl/bin/linux:/usr/local/lib OMP_PLACES: 'core' OMP_PROC_BIND: 'true' variables: mpi_command: mpirun -np {n_ranks} --npernode {processes_per_node} -v --allow-run-as-root --bind-to core -x PATH -x LD_LIBRARY_PATH --mca btl tcp,self processes_per_node: 192 n_nodes: 2 memory_per_node: 762 array_size: 6400000000 N-NBMINs: 1 NBMINs: 4 NPFACTs: 1 PFACTs: 1 N-RFACTs: 1 RFACTs: 1 extra_config_files: | '{experiment_run_dir}/HPL.dat' container_image: '<container_image_uri>' gke_run_dir: '/root/hpl/bin/linux' ``` * Run `ramble workspace setup` to set up the `kustomize` templates * Optionally, can run `ramble on --executor '{batch_print_deployment}' to inspect the generated deployment template * Run `ramble on` to submit the job to an existing gke cluster * During the run, can use `ramble on --executor '{batch_query}'` to look at the job info and get the launcher log * Run `ramble workspace analyze` upon completion, this fetches in the launcher log (the hpl isn't optimized, so ignore the perf result): ``` $ ramble workspace analyze -p From Workspace: gke-hpl (hash: 1f96b0b9cf8fe9b0c67e29740404175ae559bae9ec75757150e2a715eec2e0ae) Experiment hpl.calculator.gketest figures of merit: Status = SUCCESS Tags = ['benchmark', 'benchmark-app', 'linpack'] N-NB-P-Q = 405504-384-16-24 context figures of merit: Time = 2832.29 s GFlops = 1.5695e+04 GFLOP/s ``` * At the end, run `ramble on --executor '{batch_cancel}'` to delete the launcher pod. This is intentionally configured to stay running after the job completion, so that its logs can be fetched. That means for now it needs to be manually cleaned up afterwards.
1 parent 790f840 commit 8f77f27

File tree

10 files changed

+365
-0
lines changed

10 files changed

+365
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
# Copyright 2022-2025 The Ramble Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4+
# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5+
# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
6+
# option. This file may not be copied, modified, or distributed
7+
# except according to those terms.
8+
9+
import os
10+
11+
import pytest
12+
13+
import ramble.workspace
14+
from ramble.main import RambleCommand
15+
16+
workspace = RambleCommand("workspace")
17+
18+
pytestmark = pytest.mark.usefixtures(
19+
"mutable_config",
20+
"mutable_mock_workspace_path",
21+
)
22+
23+
24+
def test_gke_mpi_workflow(request):
25+
workspace_name = request.node.name
26+
test_config = """
27+
ramble:
28+
env_vars:
29+
set:
30+
OMP_NUM_THREADS: '{n_threads}'
31+
variants:
32+
workflow_manager: gke-mpi
33+
variables:
34+
mpi_command: mpirun -n {n_ranks}
35+
processes_per_node: 1
36+
n_nodes: 2
37+
container_image: docker.pkg.dev/myproject/myimage
38+
extra_metadata: |
39+
a: 1
40+
b: 2
41+
extra_container_config_files: |
42+
{experiment_run_dir}/app_config.txt
43+
applications:
44+
hostname:
45+
workloads:
46+
parallel:
47+
experiments:
48+
generated: {}
49+
"""
50+
with ramble.workspace.create(workspace_name) as ws:
51+
ws.write()
52+
config_path = os.path.join(ws.config_dir, ramble.workspace.config_file_name)
53+
with open(config_path, "w+") as f:
54+
f.write(test_config)
55+
ws._re_read()
56+
workspace("setup", "--dry-run", global_args=["-D", ws.root])
57+
58+
run_path = os.path.join(ws.experiment_dir, "hostname", "parallel", "generated")
59+
files = [f for f in os.listdir(run_path) if os.path.isfile(os.path.join(run_path, f))]
60+
assert "batch_submit" in files
61+
assert "batch_query" in files
62+
assert "batch_cancel" in files
63+
assert "gke_mpi.yaml" in files
64+
assert "kustomization.yaml" in files
65+
assert "launcher_execute_script" in files
66+
assert "worker_execute_script" in files
67+
assert "batch_print_deployment" in files
68+
with open(os.path.join(run_path, "batch_submit")) as f:
69+
content = f.read()
70+
assert f"kubectl apply --kustomize {run_path}" in content
71+
with open(os.path.join(run_path, "batch_query")) as f:
72+
content = f.read()
73+
assert "kubectl describe mpijobs hostname-parallel-generated" in content
74+
with open(os.path.join(run_path, "batch_cancel")) as f:
75+
content = f.read()
76+
assert "kubectl delete mpijobs hostname-parallel-generated" in content
77+
with open(os.path.join(run_path, "gke_mpi.yaml")) as f:
78+
content = f.read()
79+
assert "kind: MPIJob" in content
80+
assert "name: hostname-parallel-generated" in content
81+
assert "replicas: 2" in content
82+
assert "image: docker.pkg.dev/myproject/myimage" in content
83+
with open(os.path.join(run_path, "kustomization.yaml")) as f:
84+
content = f.read()
85+
assert "files:" in content
86+
assert os.path.join(run_path, "app_config.txt") in content
87+
with open(os.path.join(run_path, "launcher_execute_script")) as f:
88+
content = f.read()
89+
assert "hostname" in content
90+
with open(os.path.join(run_path, "worker_execute_script")) as f:
91+
content = f.read()
92+
assert "sshd" in content
93+
with open(os.path.join(run_path, "batch_print_deployment")) as f:
94+
content = f.read()
95+
assert "kubectl kustomize" in content
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
#!/bin/bash
2+
kubectl delete mpijobs {job_name}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
#!/bin/bash
2+
kubectl kustomize {experiment_run_dir}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#!/bin/bash
2+
echo "========================"
3+
echo "Print out mpi job status"
4+
echo "========================"
5+
echo ""
6+
kubectl describe mpijobs {job_name}
7+
8+
lname=$(kubectl get pods | grep '{job_name}-launcher' | awk '{print $1}')
9+
if [ ! -z "$lname" ]; then
10+
echo " "
11+
echo "=========================="
12+
echo "Print out the launcher log"
13+
echo "=========================="
14+
echo " "
15+
kubectl logs $lname | tee {experiment_run_dir}/launcher.log
16+
fi
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
#!/bin/bash
2+
kubectl apply --kustomize {experiment_run_dir}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
apiVersion: kubeflow.org/v2beta1
2+
kind: MPIJob
3+
metadata:
4+
name: {job_name}
5+
{extra_metadata_section}
6+
spec:
7+
slotsPerWorker: {cores_per_node}
8+
runPolicy:
9+
cleanPodPolicy: Running
10+
mpiReplicaSpecs:
11+
Launcher:
12+
replicas: 1
13+
template:
14+
spec:
15+
hostPID: true
16+
hostIPC: true
17+
dnsPolicy: ClusterFirstWithHostNet
18+
volumes:
19+
- name: config
20+
configMap:
21+
name: gke-mpi-config
22+
containers:
23+
- image: {container_image}
24+
name: mpi-launcher
25+
volumeMounts:
26+
- name: config
27+
mountPath: /config
28+
command: ["bash", "{launcher_script_path}"]
29+
securityContext:
30+
privileged: true
31+
Worker:
32+
replicas: {n_nodes}
33+
template:
34+
spec:
35+
containers:
36+
- image: {container_image}
37+
name: mpi-worker
38+
securityContext:
39+
privileged: true
40+
volumeMounts:
41+
- name: config
42+
mountPath: /config
43+
command: ["bash", "{worker_script_path}"]
44+
volumes:
45+
- name: config
46+
configMap:
47+
name: gke-mpi-config
48+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
resources:
2+
- {gke_mpi_yaml}
3+
{config_map_gen_section}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#!/bin/bash
2+
mkdir -p {container_work_dir} && cd {container_work_dir}
3+
# important to resolve symlink
4+
cp --remove-destination -r -L /config/* .
5+
6+
{unformatted_command_without_logs}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#!/bin/bash
2+
mkdir -p {container_work_dir} && cd {container_work_dir}
3+
# important to resolve symlink
4+
cp --remove-destination -r -L /config/* .
5+
/usr/sbin/sshd -De -f /etc/ssh/sshd_config
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
# Copyright 2022-2025 The Ramble Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4+
# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5+
# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
6+
# option. This file may not be copied, modified, or distributed
7+
# except according to those terms.
8+
9+
import os
10+
import shutil
11+
import textwrap
12+
13+
from ramble.wmkit import *
14+
15+
from spack.util.executable import Executable, ProcessError
16+
17+
18+
class GkeMpi(WorkflowManagerBase):
19+
"""GKE workflow manager that uses the MPI operator"""
20+
21+
name = "gke-mpi"
22+
23+
maintainers("linsword13")
24+
25+
tags("workflow", "gke", "mpi")
26+
27+
workflow_manager_variable(
28+
name="job_name",
29+
default="{application_name}-{workload_name}-{experiment_name}",
30+
description="GKE job name",
31+
)
32+
33+
workflow_manager_variable(
34+
name="extra_metadata",
35+
default="",
36+
description="Extra line-separated key:val pairs for the metadata section",
37+
)
38+
39+
workflow_manager_variable(
40+
name="cores_per_node",
41+
default="{processes_per_node}",
42+
description="Cores per node",
43+
)
44+
45+
workflow_manager_variable(
46+
name="container_image",
47+
default="",
48+
description="url to the container image",
49+
)
50+
51+
workflow_manager_variable(
52+
name="extra_container_config_files",
53+
default="",
54+
description="extra line-separated list of config files to be mapped to containers",
55+
)
56+
57+
workflow_manager_variable(
58+
name="container_work_dir",
59+
default="/config",
60+
description="working directory inside the container",
61+
)
62+
63+
workflow_manager_variable(
64+
name="launcher_execute_script_template",
65+
default="launcher_execute_script.tpl",
66+
description="execute script template for the launcher",
67+
)
68+
69+
register_template(
70+
name="launcher_execute_script",
71+
src_path="{launcher_execute_script_template}",
72+
)
73+
74+
workflow_manager_variable(
75+
name="worker_execute_script_template",
76+
default="worker_execute_script.tpl",
77+
description="execute script template for the workers",
78+
)
79+
80+
register_template(
81+
name="worker_execute_script",
82+
src_path="{worker_execute_script_template}",
83+
)
84+
85+
register_template(
86+
name="gke_mpi_yaml",
87+
src_path="gke_mpi.yaml.tpl",
88+
dest_path="gke_mpi.yaml",
89+
extra_vars_func="gke_mpi_yaml_vars",
90+
)
91+
92+
def _gke_mpi_yaml_vars(self):
93+
expander = self.app_inst.expander
94+
extra_metadata_str = expander.expand_var_name("extra_metadata")
95+
launcher_script = expander.expand_var_name("launcher_execute_script")
96+
worker_script = expander.expand_var_name("worker_execute_script")
97+
if extra_metadata_str:
98+
extra_metadata_section = textwrap.indent(
99+
extra_metadata_str, " " * 2
100+
)
101+
else:
102+
extra_metadata_section = ""
103+
104+
return {
105+
"extra_metadata_section": extra_metadata_section,
106+
"launcher_script_path": os.path.join(
107+
"/config", os.path.basename(launcher_script)
108+
),
109+
"worker_script_path": os.path.join(
110+
"/config", os.path.basename(worker_script)
111+
),
112+
}
113+
114+
register_template(
115+
name="kustomization.yaml",
116+
src_path="kustomization.yaml.tpl",
117+
dest_path="kustomization.yaml",
118+
extra_vars_func="kustomization_yaml_vars",
119+
)
120+
121+
def _kustomization_yaml_vars(self):
122+
files = ["{launcher_execute_script}", "{worker_execute_script}"]
123+
expander = self.app_inst.expander
124+
extra_files_str = expander.expand_var_name(
125+
"extra_container_config_files"
126+
)
127+
if extra_files_str:
128+
files.extend(extra_files_str.split("\n"))
129+
file_lines = "\n".join(
130+
[expander.expand_var(f.lstrip("- ")) for f in files]
131+
)
132+
lines = [
133+
"configMapGenerator:",
134+
"- name: gke-mpi-config",
135+
" files:",
136+
textwrap.indent(file_lines, " - "),
137+
"generatorOptions:",
138+
# For some reason kustomization does not apply the generated name properly.
139+
# So disable the suffix as a workaround.
140+
" disableNameSuffixHash: true",
141+
]
142+
config_map_gen_section = "\n".join(lines)
143+
return {
144+
"config_map_gen_section": config_map_gen_section,
145+
}
146+
147+
register_template(
148+
name="batch_submit",
149+
src_path="batch_submit.tpl",
150+
dest_path="batch_submit",
151+
)
152+
153+
register_template(
154+
name="batch_query",
155+
src_path="batch_query.tpl",
156+
dest_path="batch_query",
157+
)
158+
159+
register_template(
160+
name="batch_cancel",
161+
src_path="batch_cancel.tpl",
162+
dest_path="batch_cancel",
163+
)
164+
165+
# A convenience for printing the deployment config
166+
register_template(
167+
name="batch_print_deployment",
168+
src_path="batch_print_deployment.tpl",
169+
dest_path="batch_print_deployment",
170+
)
171+
172+
def _prepare_analysis(self, workspace):
173+
if workspace.dry_run:
174+
return
175+
expander = self.app_inst.expander
176+
query_script = expander.expand_var_name("batch_query")
177+
query_cmd = Executable(query_script)
178+
try:
179+
query_cmd(output=os.devnull)
180+
except ProcessError as e:
181+
logger.warn(f"batch_query returns error {e}")
182+
run_dir = expander.expand_var_name("experiment_run_dir")
183+
launcher_log = os.path.join(run_dir, "launcher.log")
184+
if os.path.exists(launcher_log):
185+
log_file = expander.expand_var_name("log_file")
186+
shutil.copy2(launcher_log, log_file)

0 commit comments

Comments
 (0)