Skip to content

Commit d9d1aaa

Browse files
pmkccopybara-github
authored andcommitted
Add RunRemoteCommandsInParallel and use it to run curl in parallel to avoid exhausting SSH conntections.
PiperOrigin-RevId: 384957145
1 parent faa8914 commit d9d1aaa

11 files changed

+490
-25
lines changed

CHANGES.next.md

+1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
- Add pd extreme support to PKB.
5050
- Add '--delete_samples' to measure VM deletion during benchmark teardown
5151
phase
52+
- Add cURL benchmark for object storage.
5253

5354
### Enhancements:
5455

perfkitbenchmarker/linux_benchmarks/mlperf_benchmark.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,8 @@ def Prepare(benchmark_spec, vm=None):
191191
location = benchmark_spec.tpu_groups['train'].GetZone()
192192
storage_service.PrepareService(util.GetRegionFromZone(location))
193193
storage_service.MakeBucket(bucket)
194-
storage_service.ChmodBucket(benchmark_spec.gcp_service_account, 'W',
195-
bucket)
194+
storage_service.AclBucket(benchmark_spec.gcp_service_account, gcs.WRITER,
195+
bucket)
196196

197197
# For MLPerf v0.6, the benchmake code of different hardware are different.
198198
if (benchmark_spec.tpu_groups['train'].GetAcceleratorType() == 'v3-32' or

perfkitbenchmarker/linux_benchmarks/mlperf_multiworkers_benchmark.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -689,7 +689,8 @@ def _PrepareBucket(benchmark_spec):
689689
storage_service = benchmark_spec.storage_service
690690
storage_service.PrepareService(util.GetRegionFromZone(location))
691691
storage_service.MakeBucket(bucket, raise_on_failure=False)
692-
storage_service.ChmodBucket(benchmark_spec.gcp_service_account, 'W', bucket)
692+
storage_service.AclBucket(benchmark_spec.gcp_service_account, gcs.WRITER,
693+
bucket)
693694

694695

695696
def _ClearTmpDirectory(benchmark_spec, vm):

perfkitbenchmarker/linux_benchmarks/mnist_benchmark.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ def Prepare(benchmark_spec):
141141
location = benchmark_spec.tpu_groups['train'].GetZone()
142142
storage_service.PrepareService(util.GetRegionFromZone(location))
143143
storage_service.MakeBucket(bucket)
144-
storage_service.ChmodBucket(benchmark_spec.gcp_service_account, 'W', bucket)
144+
storage_service.AclBucket(benchmark_spec.gcp_service_account, gcs.WRITER,
145+
bucket)
145146
else:
146147
benchmark_spec.model_dir = '/tmp'
147148

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
# Copyright 2021 PerfKitBenchmarker Authors. All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Use cURL to upload and download data to object storage in parallel.
15+
16+
Consistent with object_storage_service multistream scenario.
17+
18+
Due to the difficulty of signing to requests to S3 by hand
19+
(https://docs.aws.amazon.com/AmazonS3/latest/userguide/RESTAuthentication.html).
20+
The benchmark uses insecure short lived buckets and should be used with caution.
21+
22+
TODO(pclay): Consider signing requests and not using public buckets.
23+
"""
24+
25+
import logging
26+
from typing import List, Tuple
27+
28+
from absl import flags
29+
import numpy as np
30+
from perfkitbenchmarker import configs
31+
from perfkitbenchmarker import errors
32+
from perfkitbenchmarker import flag_util
33+
from perfkitbenchmarker import object_storage_service
34+
from perfkitbenchmarker import providers
35+
from perfkitbenchmarker import sample
36+
from perfkitbenchmarker import vm_util
37+
from perfkitbenchmarker.linux_benchmarks import object_storage_service_benchmark
38+
39+
BENCHMARK_NAME = 'object_storage_curl'
40+
41+
BENCHMARK_CONFIG = """
42+
object_storage_curl:
43+
description: Use cURL to upload and download data to object storage in parallel.
44+
vm_groups:
45+
default:
46+
vm_spec: *default_single_core
47+
flags:
48+
# Required
49+
object_storage_multistream_objects_per_stream: 1
50+
object_storage_streams_per_vm: 10
51+
"""
52+
53+
# Blocksize for dd to pipe data into uploads.
54+
DD_BLOCKSIZE = 4000
55+
56+
# Magic strings
57+
_UPLOAD = 'upload'
58+
_DOWNLOAD = 'download'
59+
_START_TIME = 'START_TIME'
60+
_CURL_RESULTS = 'CURL_RESULTS'
61+
62+
flags.DEFINE_string('object_storage_curl_object_size', '1MB',
63+
'Size of objects to upload / download. Similar to '
64+
'--object_storage_object_sizes, but only takes a single '
65+
'size.')
66+
flags.DEFINE_bool('object_storage_curl_i_am_ok_with_public_read_write_buckets',
67+
False, 'Acknowledge that this bucket will create buckets '
68+
'which are publicly readable and writable. Required to run '
69+
'this benchmark.')
70+
71+
FLAGS = flags.FLAGS
72+
73+
74+
def GetConfig(user_config):
75+
return configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME)
76+
77+
78+
def CheckPrerequisites(_):
79+
"""Validate some unsupported flags."""
80+
if (flag_util.StringToBytes(FLAGS.object_storage_curl_object_size) <
81+
DD_BLOCKSIZE):
82+
raise errors.Config.InvalidValue(
83+
'--object_storage_curl_object_size must be larger than 4KB')
84+
# TODO(pclay): Consider supporting multiple objects per stream.
85+
if FLAGS.object_storage_multistream_objects_per_stream != 1:
86+
raise errors.Config.InvalidValue(
87+
'object_storage_curl only supports 1 object per stream')
88+
if FLAGS.object_storage_object_naming_scheme != 'sequential_by_stream':
89+
raise errors.Config.InvalidValue(
90+
'object_storage_curl only supports sequential_by_stream naming.')
91+
if not FLAGS.object_storage_curl_i_am_ok_with_public_read_write_buckets:
92+
raise errors.Config.InvalidValue(
93+
'This benchmark uses public read/write object storage bucket.\n'
94+
'You must explicitly pass '
95+
'--object_storage_curl_i_am_ok_with_public_read_write_buckets to '
96+
'acknowledge that it will be created.\n'
97+
'If PKB is interrupted, you should ensure it is cleaned up.')
98+
99+
100+
# PyType does not currently support returning Abstract classes
101+
# TODO(user): stop suppressing
102+
# pytype: disable=not-instantiable
103+
def _GetService() -> object_storage_service.ObjectStorageService:
104+
"""Get a ready to use instance of ObjectStorageService."""
105+
# TODO(pclay): consider using FLAGS.storage to allow cross cloud testing?
106+
cloud = FLAGS.cloud
107+
providers.LoadProvider(cloud)
108+
service = object_storage_service.GetObjectStorageClass(cloud)()
109+
# This method is idempotent with default args and safe to call in each phase.
110+
service.PrepareService(FLAGS.object_storage_region)
111+
return service
112+
113+
114+
def _GetBucketName() -> str:
115+
return FLAGS.object_storage_bucket_name or 'pkb%s' % FLAGS.run_uri
116+
117+
118+
def Prepare(benchmark_spec):
119+
"""Create and ACL bucket and install curl."""
120+
# We would like to always cleanup server side states when exception happens.
121+
benchmark_spec.always_call_cleanup = True
122+
123+
service = _GetService()
124+
bucket_name = _GetBucketName()
125+
126+
service.MakeBucket(bucket_name)
127+
service.MakeBucketPubliclyReadable(bucket_name, also_make_writable=True)
128+
129+
vms = benchmark_spec.vms
130+
vm_util.RunThreaded(lambda vm: vm.InstallPackages('curl'), vms)
131+
132+
133+
def Run(benchmark_spec) -> List[sample.Sample]:
134+
"""Run storage benchmark and publish results.
135+
136+
Args:
137+
benchmark_spec: The benchmark specification. Contains all data that is
138+
required to run the benchmark.
139+
140+
Returns:
141+
The same samples as object_storage_multistream.
142+
"""
143+
service = _GetService()
144+
bucket = _GetBucketName()
145+
146+
object_bytes = flag_util.StringToBytes(FLAGS.object_storage_curl_object_size)
147+
blocks = object_bytes // DD_BLOCKSIZE
148+
streams_per_vm = FLAGS.object_storage_streams_per_vm
149+
150+
generate_data_cmd = (
151+
'openssl aes-256-ctr -iter 1 -pass file:/dev/urandom -in /dev/zero'
152+
f' | dd bs={DD_BLOCKSIZE} count={blocks} iflag=fullblock')
153+
def StartTimeCmd(index):
154+
return f"date '+{_START_TIME} {index} %s.%N'"
155+
156+
def CurlCmd(operation, index):
157+
return (
158+
f"curl -fsw '{_CURL_RESULTS} {index} %{{time_total}} "
159+
# Pad size with zero to force curl to populate it.
160+
f"%{{response_code}} 0%{{size_{operation}}}\n' -o /dev/null")
161+
162+
def Upload(vm):
163+
commands = []
164+
for object_index in range(streams_per_vm):
165+
object_name = f'{vm.name}_{object_index}'
166+
url = service.GetUploadUrl(bucket=bucket, object_name=object_name)
167+
commands.append(
168+
f'{StartTimeCmd(object_index)}; {generate_data_cmd} | '
169+
f'{CurlCmd(_UPLOAD, object_index)} -X {service.UPLOAD_HTTP_METHOD} '
170+
f"--data-binary @- '{url}'")
171+
stdout, _ = vm.RemoteCommandsInParallel(commands)
172+
return stdout
173+
174+
def Download(vm):
175+
commands = []
176+
for object_index in range(streams_per_vm):
177+
object_name = f'{vm.name}_{object_index}'
178+
url = service.GetDownloadUrl(bucket=bucket, object_name=object_name)
179+
commands.append(f'{StartTimeCmd(object_index)}; '
180+
f"{CurlCmd(_DOWNLOAD, object_index)} '{url}'")
181+
stdout, _ = vm.RemoteCommandsInParallel(commands)
182+
return stdout
183+
184+
vms = benchmark_spec.vms
185+
samples = []
186+
for operation, func in [(_UPLOAD, Upload), (_DOWNLOAD, Download)]:
187+
output = vm_util.RunThreaded(func, vms)
188+
start_times, latencies = _LoadWorkerOutput(output)
189+
object_storage_service_benchmark.ProcessMultiStreamResults(
190+
start_times,
191+
latencies,
192+
all_sizes=[object_bytes],
193+
sizes=[np.array([object_bytes])] * streams_per_vm * len(vms),
194+
operation=operation,
195+
results=samples,
196+
# We do not retry curl. We simply do not report failing latencies.
197+
# This under-reports both latency and throughput. Since this benchmark
198+
# is intended to measure throughput this is reasonable.
199+
allow_failing_streams=True)
200+
return samples
201+
202+
203+
def Cleanup(_):
204+
service = _GetService()
205+
bucket_name = _GetBucketName()
206+
if not FLAGS.object_storage_dont_delete_bucket:
207+
service.DeleteBucket(bucket_name)
208+
service.CleanupService()
209+
210+
211+
def _LoadWorkerOutput(
212+
output: List[str]) -> Tuple[List[np.ndarray], List[np.ndarray]]:
213+
"""Parse the output of Upload and Download functions.
214+
215+
The output of Upload and Download is
216+
# START_TIME <index> <Unix start time in seconds>
217+
START_TIME 0 12345.6789
218+
# CURL_RESULTS <index> <latency in s> <HTTP code> <bytes transmitted>
219+
CURL_RESULTS 0 1.2345 200 01000
220+
221+
Lines of output are not ordered and may be interleaved.
222+
223+
Args:
224+
output: the output of each upload or download command
225+
226+
Returns:
227+
the start times and latencies of the curl commands
228+
229+
Raises:
230+
ValueError if output is unexpected.
231+
Exception if the curl request failed with a 4XX code.
232+
"""
233+
start_times = []
234+
latencies = []
235+
236+
for worker_out in output:
237+
worker_start_times = [None] * FLAGS.object_storage_streams_per_vm
238+
worker_latencies = [None] * FLAGS.object_storage_streams_per_vm
239+
for line in worker_out.strip().split('\n'):
240+
try:
241+
line_type, index, value, *curl_data = line.split()
242+
if line_type == _START_TIME:
243+
assert not curl_data
244+
worker_start_times[int(index)] = float(value)
245+
elif line_type == _CURL_RESULTS:
246+
response_code, bytes_transmitted = curl_data
247+
bytes_transmitted = int(bytes_transmitted)
248+
if response_code.startswith('4'):
249+
raise Exception(
250+
f'cURL command failed with HTTP Code {response_code}')
251+
elif response_code == '200':
252+
bytes_expected = flag_util.StringToBytes(
253+
FLAGS.object_storage_curl_object_size)
254+
if bytes_transmitted != bytes_expected:
255+
raise ValueError(
256+
f'cURL transmitted {bytes_transmitted}'
257+
f' instead of {bytes_expected}.')
258+
# curl 7.74 used μs instead of seconds. Not used in major OS types.
259+
# https://github.com/curl/curl/issues/6321
260+
assert '.' in value, 'Invalid curl output.'
261+
worker_latencies[int(index)] = float(value)
262+
else:
263+
logging.warning('cURL command failed with HTTP code %s. '
264+
'Not reporting latency.', response_code)
265+
else:
266+
raise ValueError(f'Unexpected line start: {line_type}.')
267+
# Always show raw line when there is a parsing error.
268+
except (ValueError, AssertionError) as e:
269+
raise ValueError(f'Unexpected output:\n{line}') from e
270+
271+
for start_time, latency in zip(worker_start_times, worker_latencies):
272+
if latency:
273+
start_times.append(np.array([start_time], dtype=np.float64))
274+
latencies.append(np.array([latency], dtype=np.float64))
275+
276+
return start_times, latencies

perfkitbenchmarker/linux_benchmarks/object_storage_service_benchmark.py

+22-9
Original file line numberDiff line numberDiff line change
@@ -403,8 +403,14 @@ def MultiThreadDeleteDelay(num_vms, threads_per_vm):
403403
return (num_vms * threads_per_vm) / (MULTISTREAM_DELETE_OPS_PER_SEC)
404404

405405

406-
def _ProcessMultiStreamResults(start_times, latencies, sizes, operation,
407-
all_sizes, results, metadata=None):
406+
def ProcessMultiStreamResults(start_times,
407+
latencies,
408+
sizes,
409+
operation,
410+
all_sizes,
411+
results,
412+
metadata=None,
413+
allow_failing_streams=False):
408414
"""Read and process results from the api_multistream worker process.
409415
410416
Results will be reported per-object size and combined for all
@@ -421,17 +427,24 @@ def _ProcessMultiStreamResults(start_times, latencies, sizes, operation,
421427
distribution used, in bytes.
422428
results: a list to append Sample objects to.
423429
metadata: dict. Base sample metadata
430+
allow_failing_streams: Whether to expect a result for all streams.
424431
"""
425432

426-
num_streams = FLAGS.object_storage_streams_per_vm * FLAGS.num_vms
427-
428-
assert len(start_times) == num_streams
429-
assert len(latencies) == num_streams
430-
assert len(sizes) == num_streams
433+
total_num_streams = FLAGS.object_storage_streams_per_vm * FLAGS.num_vms
434+
if allow_failing_streams:
435+
num_streams = len(start_times)
436+
assert len(latencies) == num_streams
437+
assert len(sizes) == num_streams
438+
else:
439+
assert len(start_times) == total_num_streams
440+
assert len(latencies) == total_num_streams
441+
assert len(sizes) == total_num_streams
442+
num_streams = total_num_streams
431443

432444
if metadata is None:
433445
metadata = {}
434-
metadata['num_streams'] = num_streams
446+
metadata['num_streams'] = total_num_streams
447+
metadata['num_failing_streams'] = total_num_streams - num_streams
435448
metadata['objects_per_stream'] = (
436449
FLAGS.object_storage_multistream_objects_per_stream)
437450
metadata['object_naming'] = FLAGS.object_storage_object_naming_scheme
@@ -1053,7 +1066,7 @@ def _MultiStreamOneWay(results, metadata, vms, command_builder,
10531066
if FLAGS.object_storage_worker_output:
10541067
with open(FLAGS.object_storage_worker_output, 'w') as out_file:
10551068
out_file.write(json.dumps(output))
1056-
_ProcessMultiStreamResults(
1069+
ProcessMultiStreamResults(
10571070
start_times,
10581071
latencies,
10591072
sizes,

0 commit comments

Comments
 (0)