Skip to content

Commit a6b31e5

Browse files
pmkccopybara-github
authored andcommitted
Implement GCS and S3 auth to improve object_storage_curl benchmark security.
PiperOrigin-RevId: 385044218
1 parent faa8914 commit a6b31e5

12 files changed

+546
-25
lines changed

CHANGES.next.md

+1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
- Add pd extreme support to PKB.
5050
- Add '--delete_samples' to measure VM deletion during benchmark teardown
5151
phase
52+
- Add cURL benchmark for object storage.
5253

5354
### Enhancements:
5455

perfkitbenchmarker/linux_benchmarks/mlperf_benchmark.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,8 @@ def Prepare(benchmark_spec, vm=None):
191191
location = benchmark_spec.tpu_groups['train'].GetZone()
192192
storage_service.PrepareService(util.GetRegionFromZone(location))
193193
storage_service.MakeBucket(bucket)
194-
storage_service.ChmodBucket(benchmark_spec.gcp_service_account, 'W',
195-
bucket)
194+
storage_service.AclBucket(benchmark_spec.gcp_service_account, gcs.WRITER,
195+
bucket)
196196

197197
# For MLPerf v0.6, the benchmake code of different hardware are different.
198198
if (benchmark_spec.tpu_groups['train'].GetAcceleratorType() == 'v3-32' or

perfkitbenchmarker/linux_benchmarks/mlperf_multiworkers_benchmark.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -689,7 +689,8 @@ def _PrepareBucket(benchmark_spec):
689689
storage_service = benchmark_spec.storage_service
690690
storage_service.PrepareService(util.GetRegionFromZone(location))
691691
storage_service.MakeBucket(bucket, raise_on_failure=False)
692-
storage_service.ChmodBucket(benchmark_spec.gcp_service_account, 'W', bucket)
692+
storage_service.AclBucket(benchmark_spec.gcp_service_account, gcs.WRITER,
693+
bucket)
693694

694695

695696
def _ClearTmpDirectory(benchmark_spec, vm):

perfkitbenchmarker/linux_benchmarks/mnist_benchmark.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ def Prepare(benchmark_spec):
141141
location = benchmark_spec.tpu_groups['train'].GetZone()
142142
storage_service.PrepareService(util.GetRegionFromZone(location))
143143
storage_service.MakeBucket(bucket)
144-
storage_service.ChmodBucket(benchmark_spec.gcp_service_account, 'W', bucket)
144+
storage_service.AclBucket(benchmark_spec.gcp_service_account, gcs.WRITER,
145+
bucket)
145146
else:
146147
benchmark_spec.model_dir = '/tmp'
147148

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
# Copyright 2021 PerfKitBenchmarker Authors. All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Use cURL to upload and download data to object storage in parallel.
15+
16+
Consistent with object_storage_service multistream scenario.
17+
18+
Due to the difficulty of signing to requests to S3 by hand
19+
(https://docs.aws.amazon.com/AmazonS3/latest/userguide/RESTAuthentication.html).
20+
The benchmark uses insecure short lived buckets and should be used with caution.
21+
22+
TODO(pclay): Consider signing requests and not using public buckets.
23+
"""
24+
25+
import logging
26+
from typing import List, Tuple
27+
28+
from absl import flags
29+
import numpy as np
30+
from perfkitbenchmarker import configs
31+
from perfkitbenchmarker import errors
32+
from perfkitbenchmarker import flag_util
33+
from perfkitbenchmarker import object_storage_service
34+
from perfkitbenchmarker import providers
35+
from perfkitbenchmarker import sample
36+
from perfkitbenchmarker import vm_util
37+
from perfkitbenchmarker.linux_benchmarks import object_storage_service_benchmark
38+
39+
BENCHMARK_NAME = 'object_storage_curl'
40+
41+
BENCHMARK_CONFIG = """
42+
object_storage_curl:
43+
description: Use cURL to upload and download data to object storage in parallel.
44+
vm_groups:
45+
default:
46+
vm_spec: *default_single_core
47+
flags:
48+
# Required
49+
object_storage_multistream_objects_per_stream: 1
50+
object_storage_streams_per_vm: 10
51+
"""
52+
53+
# Blocksize for dd to pipe data into uploads.
54+
DD_BLOCKSIZE = 4000
55+
56+
# Magic strings
57+
_UPLOAD = 'upload'
58+
_DOWNLOAD = 'download'
59+
_START_TIME = 'START_TIME'
60+
_CURL_RESULTS = 'CURL_RESULTS'
61+
62+
flags.DEFINE_string('object_storage_curl_object_size', '1MB',
63+
'Size of objects to upload / download. Similar to '
64+
'--object_storage_object_sizes, but only takes a single '
65+
'size.')
66+
67+
FLAGS = flags.FLAGS
68+
69+
70+
def GetConfig(user_config):
71+
return configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME)
72+
73+
74+
def CheckPrerequisites(_):
75+
"""Validate some unsupported flags."""
76+
if (flag_util.StringToBytes(FLAGS.object_storage_curl_object_size) <
77+
DD_BLOCKSIZE):
78+
raise errors.Config.InvalidValue(
79+
'--object_storage_curl_object_size must be larger than 4KB')
80+
# TODO(pclay): Consider supporting multiple objects per stream.
81+
if FLAGS.object_storage_multistream_objects_per_stream != 1:
82+
raise errors.Config.InvalidValue(
83+
'object_storage_curl only supports 1 object per stream')
84+
if FLAGS.object_storage_object_naming_scheme != 'sequential_by_stream':
85+
raise errors.Config.InvalidValue(
86+
'object_storage_curl only supports sequential_by_stream naming.')
87+
88+
89+
# PyType does not currently support returning Abstract classes
90+
# TODO(user): stop suppressing
91+
# pytype: disable=not-instantiable
92+
def _GetService() -> object_storage_service.ObjectStorageService:
93+
"""Get a ready to use instance of ObjectStorageService."""
94+
# TODO(pclay): consider using FLAGS.storage to allow cross cloud testing?
95+
cloud = FLAGS.cloud
96+
providers.LoadProvider(cloud)
97+
service = object_storage_service.GetObjectStorageClass(cloud)()
98+
# This method is idempotent with default args and safe to call in each phase.
99+
service.PrepareService(FLAGS.object_storage_region)
100+
return service
101+
102+
103+
def _GetBucketName() -> str:
104+
return FLAGS.object_storage_bucket_name or 'pkb%s' % FLAGS.run_uri
105+
106+
107+
def Prepare(benchmark_spec):
108+
"""Create and ACL bucket and install curl."""
109+
# We would like to always cleanup server side states when exception happens.
110+
benchmark_spec.always_call_cleanup = True
111+
112+
service = _GetService()
113+
bucket_name = _GetBucketName()
114+
115+
service.MakeBucket(bucket_name)
116+
117+
vms = benchmark_spec.vms
118+
vm_util.RunThreaded(lambda vm: vm.InstallPackages('curl'), vms)
119+
120+
121+
def Run(benchmark_spec) -> List[sample.Sample]:
122+
"""Run storage benchmark and publish results.
123+
124+
Args:
125+
benchmark_spec: The benchmark specification. Contains all data that is
126+
required to run the benchmark.
127+
128+
Returns:
129+
The same samples as object_storage_multistream.
130+
"""
131+
service = _GetService()
132+
bucket = _GetBucketName()
133+
134+
object_bytes = flag_util.StringToBytes(FLAGS.object_storage_curl_object_size)
135+
blocks = object_bytes // DD_BLOCKSIZE
136+
streams_per_vm = FLAGS.object_storage_streams_per_vm
137+
138+
generate_data_cmd = (
139+
'openssl aes-256-ctr -iter 1 -pass file:/dev/urandom -in /dev/zero'
140+
f' | dd bs={DD_BLOCKSIZE} count={blocks} iflag=fullblock')
141+
def StartTimeCmd(index):
142+
return f"date '+{_START_TIME} {index} %s.%N'"
143+
144+
def CurlCmd(operation, index):
145+
return (
146+
f"curl -fsw '{_CURL_RESULTS} {index} %{{time_total}} "
147+
# Pad size with zero to force curl to populate it.
148+
f"%{{response_code}} 0%{{size_{operation}}}\n' -o /dev/null")
149+
150+
def Upload(vm):
151+
commands = []
152+
for object_index in range(streams_per_vm):
153+
object_name = f'{vm.name}_{object_index}'
154+
url = service.GetUploadUrl(bucket, object_name)
155+
http_method = service.UPLOAD_HTTP_METHOD
156+
headers = service.GetHttpAuthorizationHeaders(
157+
http_method, bucket, object_name)
158+
headers_string = ' '.join(f"-H '{header}'" for header in headers)
159+
commands.append(
160+
f'{StartTimeCmd(object_index)}; {generate_data_cmd} | '
161+
f'{CurlCmd(_UPLOAD, object_index)} -X {http_method} {headers_string} '
162+
f"--data-binary @- '{url}'")
163+
stdout, stderr = vm.RemoteCommandsInParallel(commands)
164+
print(stdout, stderr)
165+
return stdout
166+
167+
def Download(vm):
168+
commands = []
169+
for object_index in range(streams_per_vm):
170+
object_name = f'{vm.name}_{object_index}'
171+
url = service.GetUploadUrl(bucket, object_name)
172+
headers = service.GetHttpAuthorizationHeaders('GET', bucket, object_name)
173+
headers_string = ' '.join(f"-H '{header}'" for header in headers)
174+
commands.append(
175+
f'{StartTimeCmd(object_index)}; '
176+
f"{CurlCmd(_DOWNLOAD, object_index)} {headers_string} '{url}'")
177+
stdout, stderr = vm.RemoteCommandsInParallel(commands)
178+
print(stdout, stderr)
179+
return stdout
180+
181+
vms = benchmark_spec.vms
182+
samples = []
183+
for operation, func in [(_UPLOAD, Upload), (_DOWNLOAD, Download)]:
184+
output = vm_util.RunThreaded(func, vms)
185+
start_times, latencies = _LoadWorkerOutput(output)
186+
object_storage_service_benchmark.ProcessMultiStreamResults(
187+
start_times,
188+
latencies,
189+
all_sizes=[object_bytes],
190+
sizes=[np.array([object_bytes])] * streams_per_vm * len(vms),
191+
operation=operation,
192+
results=samples,
193+
# We do not retry curl. We simply do not report failing latencies.
194+
# This under-reports both latency and throughput. Since this benchmark
195+
# is intended to measure throughput this is reasonable.
196+
allow_failing_streams=True)
197+
return samples
198+
199+
200+
def Cleanup(_):
201+
service = _GetService()
202+
bucket_name = _GetBucketName()
203+
if not FLAGS.object_storage_dont_delete_bucket:
204+
service.DeleteBucket(bucket_name)
205+
service.CleanupService()
206+
207+
208+
def _LoadWorkerOutput(
209+
output: List[str]) -> Tuple[List[np.ndarray], List[np.ndarray]]:
210+
"""Parse the output of Upload and Download functions.
211+
212+
The output of Upload and Download is
213+
# START_TIME <index> <Unix start time in seconds>
214+
START_TIME 0 12345.6789
215+
# CURL_RESULTS <index> <latency in s> <HTTP code> <bytes transmitted>
216+
CURL_RESULTS 0 1.2345 200 01000
217+
218+
Lines of output are not ordered and may be interleaved.
219+
220+
Args:
221+
output: the output of each upload or download command
222+
223+
Returns:
224+
the start times and latencies of the curl commands
225+
226+
Raises:
227+
ValueError if output is unexpected.
228+
Exception if the curl request failed with a 4XX code.
229+
"""
230+
start_times = []
231+
latencies = []
232+
233+
for worker_out in output:
234+
worker_start_times = [None] * FLAGS.object_storage_streams_per_vm
235+
worker_latencies = [None] * FLAGS.object_storage_streams_per_vm
236+
for line in worker_out.strip().split('\n'):
237+
try:
238+
line_type, index, value, *curl_data = line.split()
239+
if line_type == _START_TIME:
240+
assert not curl_data
241+
worker_start_times[int(index)] = float(value)
242+
elif line_type == _CURL_RESULTS:
243+
response_code, bytes_transmitted = curl_data
244+
bytes_transmitted = int(bytes_transmitted)
245+
if response_code.startswith('4'):
246+
raise Exception(
247+
f'cURL command failed with HTTP Code {response_code}')
248+
elif response_code == '200':
249+
bytes_expected = flag_util.StringToBytes(
250+
FLAGS.object_storage_curl_object_size)
251+
if bytes_transmitted != bytes_expected:
252+
raise ValueError(
253+
f'cURL transmitted {bytes_transmitted}'
254+
f' instead of {bytes_expected}.')
255+
# curl 7.74 used μs instead of seconds. Not used in major OS types.
256+
# https://github.com/curl/curl/issues/6321
257+
assert '.' in value, 'Invalid curl output.'
258+
worker_latencies[int(index)] = float(value)
259+
else:
260+
logging.warning('cURL command failed with HTTP code %s. '
261+
'Not reporting latency.', response_code)
262+
else:
263+
raise ValueError(f'Unexpected line start: {line_type}.')
264+
# Always show raw line when there is a parsing error.
265+
except (ValueError, AssertionError) as e:
266+
raise ValueError(f'Unexpected output:\n{line}') from e
267+
268+
for start_time, latency in zip(worker_start_times, worker_latencies):
269+
if latency:
270+
start_times.append(np.array([start_time], dtype=np.float64))
271+
latencies.append(np.array([latency], dtype=np.float64))
272+
273+
return start_times, latencies

perfkitbenchmarker/linux_benchmarks/object_storage_service_benchmark.py

+22-9
Original file line numberDiff line numberDiff line change
@@ -403,8 +403,14 @@ def MultiThreadDeleteDelay(num_vms, threads_per_vm):
403403
return (num_vms * threads_per_vm) / (MULTISTREAM_DELETE_OPS_PER_SEC)
404404

405405

406-
def _ProcessMultiStreamResults(start_times, latencies, sizes, operation,
407-
all_sizes, results, metadata=None):
406+
def ProcessMultiStreamResults(start_times,
407+
latencies,
408+
sizes,
409+
operation,
410+
all_sizes,
411+
results,
412+
metadata=None,
413+
allow_failing_streams=False):
408414
"""Read and process results from the api_multistream worker process.
409415
410416
Results will be reported per-object size and combined for all
@@ -421,17 +427,24 @@ def _ProcessMultiStreamResults(start_times, latencies, sizes, operation,
421427
distribution used, in bytes.
422428
results: a list to append Sample objects to.
423429
metadata: dict. Base sample metadata
430+
allow_failing_streams: Whether to expect a result for all streams.
424431
"""
425432

426-
num_streams = FLAGS.object_storage_streams_per_vm * FLAGS.num_vms
427-
428-
assert len(start_times) == num_streams
429-
assert len(latencies) == num_streams
430-
assert len(sizes) == num_streams
433+
total_num_streams = FLAGS.object_storage_streams_per_vm * FLAGS.num_vms
434+
if allow_failing_streams:
435+
num_streams = len(start_times)
436+
assert len(latencies) == num_streams
437+
assert len(sizes) == num_streams
438+
else:
439+
assert len(start_times) == total_num_streams
440+
assert len(latencies) == total_num_streams
441+
assert len(sizes) == total_num_streams
442+
num_streams = total_num_streams
431443

432444
if metadata is None:
433445
metadata = {}
434-
metadata['num_streams'] = num_streams
446+
metadata['num_streams'] = total_num_streams
447+
metadata['num_failing_streams'] = total_num_streams - num_streams
435448
metadata['objects_per_stream'] = (
436449
FLAGS.object_storage_multistream_objects_per_stream)
437450
metadata['object_naming'] = FLAGS.object_storage_object_naming_scheme
@@ -1053,7 +1066,7 @@ def _MultiStreamOneWay(results, metadata, vms, command_builder,
10531066
if FLAGS.object_storage_worker_output:
10541067
with open(FLAGS.object_storage_worker_output, 'w') as out_file:
10551068
out_file.write(json.dumps(output))
1056-
_ProcessMultiStreamResults(
1069+
ProcessMultiStreamResults(
10571070
start_times,
10581071
latencies,
10591072
sizes,

0 commit comments

Comments
 (0)