Skip to content

Commit 47df8e7

Browse files
committed
enable xprof
1 parent 74fac15 commit 47df8e7

File tree

6 files changed

+158
-7
lines changed

6 files changed

+158
-7
lines changed

dags/map_reproducibility/internal_runs/sample_a3ultra_maxtext_single_run.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def main():
5959

6060
# Setup configuration
6161
relative_config_yaml_path = (
62-
"recipes/a3ultra/a3ultra_llama3.1-8b_8gpus_bf16_maxtext.yaml"
62+
"recipes/a3ultra/a3ultra_llama3.1-8b_8gpus_fp8_maxtext.yaml"
6363
)
6464
timeout = DAG_CONFIGS_ULTRA[relative_config_yaml_path]["timeout_minutes"]
6565

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import sys
2+
import os
3+
import unittest
4+
5+
# --- Setup sys.path and repo check (Keep as is) ---
6+
base_recipe_repo_root = os.path.abspath(
7+
os.path.join(
8+
os.path.dirname(os.path.abspath(__file__)),
9+
"..",
10+
"..",
11+
"..",
12+
"..",
13+
"internal-gpu-recipes",
14+
)
15+
)
16+
17+
if not os.path.exists(base_recipe_repo_root):
18+
print(
19+
f"Skipping test_sample_workload_utils.py - required directory not found: {base_recipe_repo_root}"
20+
)
21+
22+
script_dir = os.path.dirname(os.path.abspath(__file__))
23+
project_root = os.path.abspath(os.path.join(script_dir, "..", "..", ".."))
24+
25+
print(f"Test Script directory: {script_dir}")
26+
print(f"Test Project root: {project_root}")
27+
28+
if project_root not in sys.path:
29+
sys.path.insert(0, project_root)
30+
# --- End Setup sys.path ---
31+
32+
# Assuming the functions are in this module or imported
33+
from dags.map_reproducibility.utils.sample_workload_utils import (
34+
sample_workload_gcs_to_cns_cmds,
35+
execute_workload_commands,
36+
)
37+
from dags.map_reproducibility.utils.common_utils import (
38+
find_xprof_gcs_path,
39+
)
40+
41+
42+
class TestSampleWorkloadUtils(unittest.TestCase):
43+
44+
def test_execute_workload_commands_real_success(self):
45+
"""
46+
Test execute_workload_commands with a real subprocess that succeeds.
47+
"""
48+
# Use simple commands guaranteed to succeed in most environments
49+
gcs_path = "gs://yujunzou-dev-supercomputer-testing/maxtext/yujunzou-coreml-llama-3-1-8b-1745453263-maxtext-xpbx-1745453272-xppn/tensorboard/plugins/profile/2025_04_24_00_13_31/yujunzou-coreml-llama-3-1-8b-1745453263-maxtext-xpbx-0.xplane.pb"
50+
commands = sample_workload_gcs_to_cns_cmds(gcs_path)
51+
52+
# --- Act ---
53+
# Execute the commands using the real subprocess mechanism
54+
success, results = execute_workload_commands(commands, "/tmp")
55+
print(f"Real execution success flag: {success}")
56+
print(f"Real execution results: {results}")
57+
58+
def test_find_xprof_gcs_path_real_success(self):
59+
"""
60+
Test find_xprof_gcs_path with a real subprocess that succeeds.
61+
"""
62+
gcs_run_bucket_folder = "gs://yujunzou-dev-supercomputer-testing/maxtext/yujunzou-coreml-llama-3-1-8b-1745363352-maxtext-okrp-1745363360-h593/"
63+
xprof_path = find_xprof_gcs_path(gcs_run_bucket_folder)
64+
print(f"xprof_path is {xprof_path}")
65+
66+
67+
if __name__ == "__main__":
68+
# Run only the tests in the TestSampleWorkloadUtils class
69+
suite = unittest.TestSuite()
70+
suite.addTest(
71+
TestSampleWorkloadUtils("test_execute_workload_commands_real_success")
72+
)
73+
runner = unittest.TextTestRunner()
74+
runner.run(suite)

dags/map_reproducibility/utils/benchmarkdb_utils.py

+2
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ def write_run(
5050
comment: str = "",
5151
is_test: bool = False,
5252
logs_profile="",
53+
gcs_metrics_bucket="",
5354
workload_others="",
5455
experiment_id="",
5556
):
@@ -255,6 +256,7 @@ def validate_software_id(software_id: str, is_test: bool = False) -> bool:
255256
hardware_num_superblocks=num_of_superblock,
256257
logs_comments=comment,
257258
logs_profile=logs_profile,
259+
gcs_metrics_bucket=gcs_metrics_bucket,
258260
workload_others=workload_others,
259261
experiment_id=experiment_id,
260262
)

dags/map_reproducibility/utils/common_utils.py

+47-2
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,50 @@ def get_internal_pre_workload_job_name(
128128
return job_name
129129

130130

131+
def find_xprof_gcs_path(gcs_path):
132+
"""
133+
Find the .xplane.pb file in the latest date blob from the specified GCS path.
134+
135+
Args:
136+
gcs_path (str): Full GCS path in the format gs://bucket-name/folder/path/
137+
138+
Returns:
139+
str: Path to the .xplane.pb file in the latest date blob
140+
"""
141+
from google.cloud import storage
142+
143+
path_without_prefix = gcs_path.replace("gs://", "")
144+
145+
parts = path_without_prefix.split("/", 1)
146+
bucket_name = parts[0]
147+
print(f"Bucket name: {bucket_name}")
148+
149+
prefix = parts[1] if len(parts) > 1 else ""
150+
prefix = prefix.rstrip("/")
151+
152+
storage_client = storage.Client()
153+
bucket = storage_client.get_bucket(bucket_name)
154+
155+
# List all blobs in the bucket with the given prefix
156+
print(f"Prefix: {prefix}")
157+
blobs = list(bucket.list_blobs(prefix=prefix))
158+
159+
# Look for .xplane.pb file in the latest directory
160+
xplane_pb_file = None
161+
for blob in blobs:
162+
if blob.name.endswith(".xplane.pb"):
163+
xplane_pb_file = blob.name
164+
break
165+
166+
if not xplane_pb_file:
167+
print(f"No .xplane.pb file found in {gcs_path}")
168+
return None
169+
170+
full_xplane_pb_file = f"gs://{bucket_name}/{xplane_pb_file}"
171+
print(f"Found .xplane.pb file: {full_xplane_pb_file}")
172+
return full_xplane_pb_file
173+
174+
131175
def get_patheon_job_link(region, cluster_name, job_name):
132176
pantheon_link = f"https://pantheon.corp.google.com/kubernetes/job/{region}/{cluster_name}/default/{job_name}"
133177
print(f"{'*' * 20}LINK: {pantheon_link}")
@@ -291,7 +335,7 @@ def internal_wait_for_jobs_cmds(timeout="100m"):
291335
return wait_for_job
292336

293337

294-
def get_job_gcs_bucket_folder(job_name):
338+
def get_job_gcs_bucket_folder(job_name, bucket_name=BUCKET_NAME):
295339
"""
296340
Get the GCS bucket folder for a specific job.
297341
@@ -302,8 +346,9 @@ def get_job_gcs_bucket_folder(job_name):
302346
Returns:
303347
str: The full path to the bucket folder containing the job
304348
"""
305-
gcs_location = f"gs://{BUCKET_NAME}/maxtext/"
349+
gcs_location = f"gs://{bucket_name}/maxtext/"
306350
bucket_folder_cmd = f"gcloud storage ls {gcs_location} | grep {job_name}"
351+
print(f"bucket_folder_cmd: {bucket_folder_cmd}")
307352

308353
try:
309354
bucket_folder = (

dags/map_reproducibility/utils/internal_aotc_workload.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ def run_internal_aotc_workload(
194194
topology="",
195195
comment=comment,
196196
is_test=is_db_test_run,
197-
logs_profile=gcs_bucket,
197+
gcs_metrics_bucket=gcs_bucket,
198198
workload_others=str(config),
199199
experiment_id=job_name,
200200
)

dags/map_reproducibility/utils/sample_workload_utils.py

+33-3
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
parse_internal_config_filename,
3838
parse_internal_config_content,
3939
get_patheon_job_link,
40+
find_xprof_gcs_path,
4041
)
4142

4243
from dags.map_reproducibility.utils.benchmarkdb_utils import write_run
@@ -151,6 +152,23 @@ def sample_job_configure_project_and_cluster(cluster: str, cluster_region: str):
151152
return set_project_command
152153

153154

155+
def sample_workload_gcs_to_cns_cmds(log_file_in_gcs, output_file=None):
156+
# If output_file is not provided, use the same name as the input file
157+
log_file_in_gcs = log_file_in_gcs.replace("gs://", "")
158+
if not output_file:
159+
output_file = os.path.basename(log_file_in_gcs)
160+
print(f"output_file name is: {output_file}")
161+
162+
cmds = (
163+
f"LOG_FILE_IN_GCS={log_file_in_gcs} ",
164+
f"filename={output_file} ",
165+
"CNS_PATH=/cns/pi-d/home/${USER}/tensorboard/multislice ",
166+
"/google/data/ro/projects/cloud/bigstore/mpm/fileutil_bs/stable/bin/fileutil_bs cp /bigstore/${LOG_FILE_IN_GCS} ${CNS_PATH}/${filename} ",
167+
"echo file to put into xprof: ${CNS_PATH}/${filename}",
168+
)
169+
return cmds
170+
171+
154172
def write_run_results(
155173
config: Any,
156174
result: WorkloadResult,
@@ -289,8 +307,19 @@ def run_internal_sample_aotc_workload(
289307
print(f"mfu: {mfu}")
290308
print(f"step_time: {step_time}")
291309
comment = "sample benchmarking run"
292-
gcs_bucket = get_job_gcs_bucket_folder(job_name)
310+
gcs_bucket = get_job_gcs_bucket_folder(
311+
job_name, bucket_name=sample_run_bucket_name
312+
)
293313
print(f"GCS bucket is {gcs_bucket}")
314+
logs_profile = None
315+
316+
if config.profiler:
317+
logs_profile = find_xprof_gcs_path(gcs_bucket)
318+
print(f"logs_profile is {logs_profile}")
319+
profiler_cmds = sample_workload_gcs_to_cns_cmds(logs_profile)
320+
profile_success, profiler_error_message = execute_workload_commands(
321+
profiler_cmds, tmpdir
322+
)
294323

295324
write_run(
296325
model_id=config.HELM_NAME_MODEL_ID,
@@ -309,11 +338,12 @@ def run_internal_sample_aotc_workload(
309338
mfu=mfu,
310339
tokens_per_second=1,
311340
writer_path=bq_writer_repo_root,
312-
run_type="internal_perf_regression",
341+
run_type="sample_helm_workload",
313342
topology="",
314343
comment=comment,
315344
is_test=True,
316-
logs_profile=gcs_bucket,
345+
logs_profile=logs_profile,
346+
gcs_metrics_bucket=gcs_bucket,
317347
workload_others=str(config),
318348
experiment_id=job_name,
319349
)

0 commit comments

Comments
 (0)