Skip to content

chore: update benchmark logic to respect allow_large_results=False #1545

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bigframes/session/_io/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def start_query_with_client(
api_timeout=timeout,
)
if metrics is not None:
metrics.count_job_stats()
metrics.count_job_stats(query=sql)
return results_iterator, None

query_job = bq_client.query(
Expand Down
60 changes: 37 additions & 23 deletions bigframes/session/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,35 @@ class ExecutionMetrics:
execution_secs: float = 0
query_char_count: int = 0

def count_job_stats(self, query_job: Optional[bq_job.QueryJob] = None):
def count_job_stats(
self, query_job: Optional[bq_job.QueryJob] = None, query: str = ""
):
if query_job is None:
query_char_count = len(query)
self.execution_count += 1
self.query_char_count += query_char_count
if LOGGING_NAME_ENV_VAR in os.environ:
write_stats_to_disk(query_char_count)
return

stats = get_performance_stats(query_job)
if stats is not None:
bytes_processed, slot_millis, execution_secs, query_char_count = stats
query_char_count, bytes_processed, slot_millis, execution_secs = stats
self.execution_count += 1
self.query_char_count += query_char_count
self.bytes_processed += bytes_processed
self.slot_millis += slot_millis
self.execution_secs += execution_secs
self.query_char_count += query_char_count
if LOGGING_NAME_ENV_VAR in os.environ:
# when running notebooks via pytest nbmake
write_stats_to_disk(
bytes_processed, slot_millis, execution_secs, query_char_count
query_char_count, bytes_processed, slot_millis, execution_secs
)


def get_performance_stats(
query_job: bigquery.QueryJob,
) -> Optional[Tuple[int, int, float, int]]:
) -> Optional[Tuple[int, int, int, float]]:
"""Parse the query job for performance stats.

Return None if the stats do not reflect real work done in bigquery.
Expand All @@ -77,11 +83,14 @@ def get_performance_stats(
execution_secs = (query_job.ended - query_job.created).total_seconds()
query_char_count = len(query_job.query)

return bytes_processed, slot_millis, execution_secs, query_char_count
return query_char_count, bytes_processed, slot_millis, execution_secs


def write_stats_to_disk(
bytes_processed: int, slot_millis: int, exec_seconds: float, query_char_count: int
query_char_count: int,
bytes_processed: Optional[int] = None,
slot_millis: Optional[int] = None,
exec_seconds: Optional[float] = None,
):
"""For pytest runs only, log information about the query job
to a file in order to create a performance report.
Expand All @@ -95,22 +104,27 @@ def write_stats_to_disk(
test_name = os.environ[LOGGING_NAME_ENV_VAR]
current_directory = os.getcwd()

# store bytes processed
bytes_file = os.path.join(current_directory, test_name + ".bytesprocessed")
with open(bytes_file, "a") as f:
f.write(str(bytes_processed) + "\n")

# store slot milliseconds
slot_file = os.path.join(current_directory, test_name + ".slotmillis")
with open(slot_file, "a") as f:
f.write(str(slot_millis) + "\n")

# store execution time seconds
exec_time_file = os.path.join(
current_directory, test_name + ".bq_exec_time_seconds"
)
with open(exec_time_file, "a") as f:
f.write(str(exec_seconds) + "\n")
if (
(bytes_processed is not None)
and (slot_millis is not None)
and (exec_seconds is not None)
):
# store bytes processed
bytes_file = os.path.join(current_directory, test_name + ".bytesprocessed")
with open(bytes_file, "a") as f:
f.write(str(bytes_processed) + "\n")

# store slot milliseconds
slot_file = os.path.join(current_directory, test_name + ".slotmillis")
with open(slot_file, "a") as f:
f.write(str(slot_millis) + "\n")

# store execution time seconds
exec_time_file = os.path.join(
current_directory, test_name + ".bq_exec_time_seconds"
)
with open(exec_time_file, "a") as f:
f.write(str(exec_seconds) + "\n")

# store length of query
query_char_count_file = os.path.join(
Expand Down
107 changes: 67 additions & 40 deletions scripts/run_and_publish_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,55 +95,62 @@ def collect_benchmark_result(
if not (
len(bytes_files)
== len(millis_files)
== len(local_seconds_files)
== len(bq_seconds_files)
== len(query_char_count_files)
<= len(query_char_count_files)
== len(local_seconds_files)
):
raise ValueError(
"Mismatch in the number of report files for bytes, millis, seconds and query char count."
)

for idx in range(len(bytes_files)):
bytes_file = bytes_files[idx]
millis_file = millis_files[idx]
bq_seconds_file = bq_seconds_files[idx]
query_char_count_file = query_char_count_files[idx]

filename = bytes_file.relative_to(path).with_suffix("")

if filename != millis_file.relative_to(path).with_suffix(
""
) or filename != bq_seconds_file.relative_to(path).with_suffix(""):
raise ValueError(
"File name mismatch among bytes, millis, and seconds reports."
)
has_full_metrics = len(bq_seconds_files) == len(local_seconds_files)

for idx in range(len(local_seconds_files)):
query_char_count_file = query_char_count_files[idx]
local_seconds_file = local_seconds_files[idx]
filename = query_char_count_file.relative_to(path).with_suffix("")
if filename != local_seconds_file.relative_to(path).with_suffix(""):
raise ValueError(
"File name mismatch among bytes, millis, and seconds reports."
"File name mismatch between query_char_count and seconds reports."
)

with open(bytes_file, "r") as file:
with open(query_char_count_file, "r") as file:
lines = file.read().splitlines()
query_char_count = sum(int(line) for line in lines) / iterations
query_count = len(lines) / iterations
total_bytes = sum(int(line) for line in lines) / iterations

with open(millis_file, "r") as file:
lines = file.read().splitlines()
total_slot_millis = sum(int(line) for line in lines) / iterations

with open(local_seconds_file, "r") as file:
lines = file.read().splitlines()
local_seconds = sum(float(line) for line in lines) / iterations

with open(bq_seconds_file, "r") as file:
lines = file.read().splitlines()
bq_seconds = sum(float(line) for line in lines) / iterations
if not has_full_metrics:
total_bytes = None
total_slot_millis = None
bq_seconds = None
else:
bytes_file = bytes_files[idx]
millis_file = millis_files[idx]
bq_seconds_file = bq_seconds_files[idx]
if (
filename != bytes_file.relative_to(path).with_suffix("")
or filename != millis_file.relative_to(path).with_suffix("")
or filename != bq_seconds_file.relative_to(path).with_suffix("")
):
raise ValueError(
"File name mismatch among query_char_count, bytes, millis, and seconds reports."
)

with open(query_char_count_file, "r") as file:
lines = file.read().splitlines()
query_char_count = sum(int(line) for line in lines) / iterations
with open(bytes_file, "r") as file:
lines = file.read().splitlines()
total_bytes = sum(int(line) for line in lines) / iterations

with open(millis_file, "r") as file:
lines = file.read().splitlines()
total_slot_millis = sum(int(line) for line in lines) / iterations

with open(bq_seconds_file, "r") as file:
lines = file.read().splitlines()
bq_seconds = sum(float(line) for line in lines) / iterations

results_dict[str(filename)] = [
query_count,
Expand Down Expand Up @@ -194,11 +201,19 @@ def collect_benchmark_result(
)
print(
f"{index} - query count: {row['Query_Count']},"
f" query char count: {row['Query_Char_Count']},",
f" bytes processed sum: {row['Bytes_Processed']},"
f" slot millis sum: {row['Slot_Millis']},"
f" local execution time: {formatted_local_exec_time} seconds,"
f" bigquery execution time: {round(row['BigQuery_Execution_Time_Sec'], 1)} seconds",
+ f" query char count: {row['Query_Char_Count']},"
+ (
f" bytes processed sum: {row['Bytes_Processed']},"
if has_full_metrics
else ""
)
+ (f" slot millis sum: {row['Slot_Millis']}," if has_full_metrics else "")
+ f" local execution time: {formatted_local_exec_time} seconds"
+ (
f", bigquery execution time: {round(row['BigQuery_Execution_Time_Sec'], 1)} seconds"
if has_full_metrics
else ""
)
)

geometric_mean_queries = geometric_mean_excluding_zeros(
Expand All @@ -221,12 +236,24 @@ def collect_benchmark_result(
)

print(
f"---Geometric mean of queries: {geometric_mean_queries}, "
f"Geometric mean of queries char counts: {geometric_mean_query_char_count}, "
f"Geometric mean of bytes processed: {geometric_mean_bytes}, "
f"Geometric mean of slot millis: {geometric_mean_slot_millis}, "
f"Geometric mean of local execution time: {geometric_mean_local_seconds} seconds, "
f"Geometric mean of BigQuery execution time: {geometric_mean_bq_seconds} seconds---"
f"---Geometric mean of queries: {geometric_mean_queries},"
+ f" Geometric mean of queries char counts: {geometric_mean_query_char_count},"
+ (
f" Geometric mean of bytes processed: {geometric_mean_bytes},"
if has_full_metrics
else ""
)
+ (
f" Geometric mean of slot millis: {geometric_mean_slot_millis},"
if has_full_metrics
else ""
)
+ f" Geometric mean of local execution time: {geometric_mean_local_seconds} seconds"
+ (
f", Geometric mean of BigQuery execution time: {geometric_mean_bq_seconds} seconds---"
if has_full_metrics
else ""
)
)

error_message = (
Expand Down