Skip to content

Commit d4f4042

Browse files
committed
batch executes cmd
1 parent bb7ca56 commit d4f4042

File tree

3 files changed

+37
-147
lines changed

3 files changed

+37
-147
lines changed

dags/map_reproducibility/internal_runs/sample_a3ultra_maxtext_single_run.py

-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ def main():
6161
relative_config_yaml_path = (
6262
"recipes/a3ultra/a3ultra_llama3.1-8b_8gpus_bf16_maxtext.yaml"
6363
)
64-
config_name = relative_config_yaml_path.replace(".yaml", "")
6564
timeout = DAG_CONFIGS_ULTRA[relative_config_yaml_path]["timeout_minutes"]
6665

6766
run_internal_sample_aotc_workload(

dags/map_reproducibility/utils/common_utils.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,16 @@ def get_internal_pre_workload_job_name(
124124
job_name = f"coreml-{helm_model_id}-{now}-{framework}-{random_id}"
125125
if is_sample_run:
126126
job_name = f"{getpass.getuser()}-{job_name}"
127-
print(f"NAME: {job_name}")
127+
print(f"{'*' * 20}NAME: {job_name}")
128128
return job_name
129129

130130

131+
def get_patheon_job_link(region, cluster_name, job_name):
132+
pantheon_link = f"https://pantheon.corp.google.com/kubernetes/job/{region}/{cluster_name}/default/{job_name}"
133+
print(f"{'*' * 20}LINK: {pantheon_link}")
134+
return pantheon_link
135+
136+
131137
def install_helm_cmds():
132138
install_helm_cmd = (
133139
"curl -fsSL -o get_helm.sh "

dags/map_reproducibility/utils/sample_workload_utils.py

+30-145
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
get_job_gcs_bucket_folder,
3737
parse_internal_config_filename,
3838
parse_internal_config_content,
39+
get_patheon_job_link,
3940
)
4041

4142
from dags.map_reproducibility.utils.benchmarkdb_utils import write_run
@@ -94,7 +95,7 @@ def get_values_file_path(
9495

9596

9697
def execute_workload_commands(commands: list, cwd: str) -> Tuple[bool, list]:
97-
"""Execute each command individually while preserving bash context.
98+
"""Execute shell commands and capture their outputs.
9899
99100
Args:
100101
commands: List of shell commands to execute
@@ -103,161 +104,42 @@ def execute_workload_commands(commands: list, cwd: str) -> Tuple[bool, list]:
103104
Returns:
104105
Tuple of (success, list of command results)
105106
"""
106-
logger.info(
107-
f"Executing commands sequentially: {commands} in directory: {cwd}"
108-
)
107+
logger.info(f"Executing commands: {commands} in directory: {cwd}")
109108

110-
command_results = []
109+
# Join commands with semicolons for sequential execution
110+
combined_command = ";".join(commands)
111111

112-
# Start a bash process that we'll keep alive
112+
# Run the combined command
113113
process = subprocess.Popen(
114-
["bash"],
115-
stdin=subprocess.PIPE,
114+
["bash", "-c", combined_command],
116115
stdout=subprocess.PIPE,
117116
stderr=subprocess.PIPE,
118117
text=True,
119118
cwd=cwd,
120119
)
121120

122-
try:
123-
# Execute setup commands to enable command tracing
124-
process.stdin.write("set -e\n") # Exit on first error
125-
process.stdin.write(
126-
"cd " + cwd + "\n"
127-
) # Ensure we're in the right directory
128-
process.stdin.flush()
129-
130-
for i, cmd in enumerate(commands):
131-
logger.info(f"Executing command {i+1}: {cmd}")
132-
133-
# Create unique markers for this command
134-
cmd_id = f"CMD_{i}"
135-
136-
# Script to capture both stdout and stderr separately
137-
capture_script = f"""
138-
# Start marker
139-
echo '{cmd_id}_START'
140-
141-
# Create temporary files for stdout and stderr
142-
STDOUT_FILE=$(mktemp)
143-
STDERR_FILE=$(mktemp)
144-
145-
# Execute the command, capturing stdout and stderr
146-
{{ {cmd} > $STDOUT_FILE 2> $STDERR_FILE; }}
147-
CMD_EXIT_CODE=$?
148-
149-
# Output stdout with marker
150-
echo '{cmd_id}_STDOUT_BEGIN'
151-
cat $STDOUT_FILE
152-
echo '{cmd_id}_STDOUT_END'
153-
154-
# Output stderr with marker
155-
echo '{cmd_id}_STDERR_BEGIN'
156-
cat $STDERR_FILE
157-
echo '{cmd_id}_STDERR_END'
158-
159-
# Output exit code with marker
160-
echo '{cmd_id}_EXIT_'$CMD_EXIT_CODE
161-
162-
# Clean up temp files
163-
rm -f $STDOUT_FILE $STDERR_FILE
164-
"""
165-
166-
# Write the capture script
167-
process.stdin.write(capture_script)
168-
process.stdin.flush()
169-
170-
# Process output with state machine
171-
current_state = "WAITING_FOR_START"
172-
stdout_lines = []
173-
stderr_lines = []
174-
exit_code = None
175-
176-
while True:
177-
line = process.stdout.readline().rstrip("\n")
178-
179-
if not line:
180-
continue
181-
182-
if line == f"{cmd_id}_START":
183-
current_state = "STARTED"
184-
elif line == f"{cmd_id}_STDOUT_BEGIN":
185-
current_state = "READING_STDOUT"
186-
elif line == f"{cmd_id}_STDOUT_END":
187-
current_state = "STDOUT_COMPLETE"
188-
elif line == f"{cmd_id}_STDERR_BEGIN":
189-
current_state = "READING_STDERR"
190-
elif line == f"{cmd_id}_STDERR_END":
191-
current_state = "STDERR_COMPLETE"
192-
elif line.startswith(f"{cmd_id}_EXIT_"):
193-
exit_code = int(line.split("_")[-1])
194-
break
195-
elif current_state == "READING_STDOUT":
196-
stdout_lines.append(line)
197-
elif current_state == "READING_STDERR":
198-
stderr_lines.append(line)
199-
200-
# Combine stdout and stderr
201-
stdout_content = "\n".join(stdout_lines)
202-
stderr_content = "\n".join(stderr_lines)
203-
combined_output = stdout_content
204-
if stderr_content:
205-
if combined_output:
206-
combined_output += "\n\nSTDERR:\n" + stderr_content
207-
else:
208-
combined_output = "STDERR:\n" + stderr_content
209-
210-
# Store the command result
211-
cmd_result = {
212-
"command": cmd,
213-
"stdout": stdout_content,
214-
"stderr": stderr_content,
215-
"output": combined_output, # Combined for backward compatibility
216-
"exit_code": exit_code,
217-
}
218-
command_results.append(cmd_result)
219-
220-
# Log the command result - no longer printing exit code
221-
if stdout_content:
222-
logger.info(f"Stdout for command {i+1}:\n{stdout_content}")
223-
if stderr_content:
224-
logger.warning(f"Stderr for command {i+1}:\n{stderr_content}")
225-
226-
# If a command failed and we're using set -e, stop execution
227-
if exit_code != 0:
228-
logger.error(f"Command {i+1} failed")
229-
break
230-
231-
# Close the process properly
232-
process.stdin.write("exit\n")
233-
process.stdin.flush()
234-
process.wait()
235-
236-
# Check if all commands succeeded
237-
all_succeeded = all(result["exit_code"] == 0 for result in command_results)
238-
return all_succeeded, command_results
239-
240-
except Exception as e:
241-
# Get detailed exception information including stack trace
242-
import traceback
243-
244-
stack_trace = traceback.format_exc()
245-
error_message = (
246-
f"Error executing commands: {e}\n\nStack trace:\n{stack_trace}"
247-
)
248-
logger.error(error_message)
121+
# Capture output
122+
stdout, stderr = process.communicate()
123+
exit_code = process.returncode
249124

250-
# Kill the process if it's still running
251-
if process.poll() is None:
252-
process.terminate()
125+
# Create result for the combined execution
126+
command_results = [{
127+
"command": combined_command,
128+
"stdout": stdout,
129+
"stderr": stderr,
130+
"output": stdout + ("\n\nSTDERR:\n" + stderr if stderr else ""),
131+
"exit_code": exit_code,
132+
}]
253133

254-
return False, [{
255-
"command": "unknown",
256-
"stdout": "",
257-
"stderr": error_message,
258-
"output": error_message,
259-
"exit_code": -1,
260-
}]
134+
# Log results
135+
if stdout:
136+
logger.info(f"Stdout for combined commands:\n{stdout}")
137+
if stderr:
138+
logger.warning(f"Stderr for combined commands:\n{stderr}")
139+
if exit_code != 0:
140+
logger.error("Command execution failed")
141+
142+
return exit_code == 0, command_results
261143

262144

263145
def sample_job_configure_project_and_cluster(cluster: str, cluster_region: str):
@@ -360,6 +242,9 @@ def run_internal_sample_aotc_workload(
360242
job_name = get_internal_pre_workload_job_name(
361243
config.MODEL_ID, config.FRAMEWORK, is_sample_run=True
362244
)
245+
pantheon_link = get_patheon_job_link(
246+
region=cluster_region, cluster_name=cluster, job_name=job_name
247+
)
363248

364249
# Adjust timeout for the container
365250
container_timeout = int(timeout) - 4

0 commit comments

Comments
 (0)