|
| 1 | +# Copyright 2021 PerfKitBenchmarker Authors. All rights reserved. |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | +"""Use cURL to upload and download data to object storage in parallel. |
| 15 | +
|
| 16 | +Consistent with object_storage_service multistream scenario. |
| 17 | +
|
| 18 | +Due to the difficulty of signing to requests to S3 by hand |
| 19 | +(https://docs.aws.amazon.com/AmazonS3/latest/userguide/RESTAuthentication.html). |
| 20 | +The benchmark uses insecure short lived buckets and should be used with caution. |
| 21 | +
|
| 22 | +TODO(pclay): Consider signing requests and not using public buckets. |
| 23 | +""" |
| 24 | + |
| 25 | +import logging |
| 26 | +from typing import List, Tuple |
| 27 | + |
| 28 | +from absl import flags |
| 29 | +import numpy as np |
| 30 | +from perfkitbenchmarker import configs |
| 31 | +from perfkitbenchmarker import errors |
| 32 | +from perfkitbenchmarker import flag_util |
| 33 | +from perfkitbenchmarker import object_storage_service |
| 34 | +from perfkitbenchmarker import providers |
| 35 | +from perfkitbenchmarker import sample |
| 36 | +from perfkitbenchmarker import vm_util |
| 37 | +from perfkitbenchmarker.linux_benchmarks import object_storage_service_benchmark |
| 38 | + |
| 39 | +BENCHMARK_NAME = 'object_storage_curl' |
| 40 | + |
| 41 | +BENCHMARK_CONFIG = """ |
| 42 | +object_storage_curl: |
| 43 | + description: Use cURL to upload and download data to object storage in parallel. |
| 44 | + vm_groups: |
| 45 | + default: |
| 46 | + vm_spec: *default_single_core |
| 47 | + flags: |
| 48 | + # Required |
| 49 | + object_storage_multistream_objects_per_stream: 1 |
| 50 | + object_storage_streams_per_vm: 10 |
| 51 | +""" |
| 52 | + |
| 53 | +# Blocksize for dd to pipe data into uploads. |
| 54 | +DD_BLOCKSIZE = 4000 |
| 55 | + |
| 56 | +# Magic strings |
| 57 | +UPLOAD = 'upload' |
| 58 | +DOWNLOAD = 'download' |
| 59 | +START_TIME = 'START_TIME' |
| 60 | +CURL_RESULTS = 'CURL_RESULTS' |
| 61 | + |
| 62 | +flags.DEFINE_string('object_storage_curl_object_size', '1MB', |
| 63 | + 'Size of objects to upload / download. Similar to ' |
| 64 | + '--object_storage_object_sizes, but only takes a single ' |
| 65 | + 'size.') |
| 66 | +flags.DEFINE_bool('object_storage_curl_i_am_ok_with_public_read_write_buckets', |
| 67 | + False, 'Acknowledge that this bucket will create buckets ' |
| 68 | + 'which are publicly readable and writable. Required to run ' |
| 69 | + 'this benchmark.') |
| 70 | + |
| 71 | +FLAGS = flags.FLAGS |
| 72 | + |
| 73 | + |
| 74 | +def GetConfig(user_config): |
| 75 | + return configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME) |
| 76 | + |
| 77 | + |
| 78 | +def CheckPrerequisites(_): |
| 79 | + """Validate some unsupported flags.""" |
| 80 | + if (flag_util.StringToBytes(FLAGS.object_storage_curl_object_size) < |
| 81 | + DD_BLOCKSIZE): |
| 82 | + raise errors.Config.InvalidValue( |
| 83 | + '--object_storage_curl_object_size must be larger than 4KB') |
| 84 | + # TODO(pclay): Consider supporting multiple objects per stream. |
| 85 | + if FLAGS.object_storage_multistream_objects_per_stream != 1: |
| 86 | + raise errors.Config.InvalidValue( |
| 87 | + 'object_storage_curl only supports 1 object per stream') |
| 88 | + if FLAGS.object_storage_object_naming_scheme != 'sequential_by_stream': |
| 89 | + raise errors.Config.InvalidValue( |
| 90 | + 'object_storage_curl only supports sequential_by_stream naming.') |
| 91 | + if not FLAGS.object_storage_curl_i_am_ok_with_public_read_write_buckets: |
| 92 | + raise errors.Config.InvalidValue( |
| 93 | + 'This benchmark uses public read/write object storage bucket.\n' |
| 94 | + 'You must explicitly pass ' |
| 95 | + '--object_storage_curl_i_am_ok_with_public_read_write_buckets to ' |
| 96 | + 'acknowledge that it will be created.\n' |
| 97 | + 'If PKB is interrupted, you should ensure it is cleaned up.') |
| 98 | + |
| 99 | + |
| 100 | +# PyType does not currently support returning Abstract classes |
| 101 | +# TODO(user): stop suppressing |
| 102 | +# pytype: disable=not-instantiable |
| 103 | +def _GetService() -> object_storage_service.ObjectStorageService: |
| 104 | + """Get a ready to use instance of ObjectStorageService.""" |
| 105 | + # TODO(pclay): consider using FLAGS.storage to allow cross cloud testing? |
| 106 | + cloud = FLAGS.cloud |
| 107 | + providers.LoadProvider(cloud) |
| 108 | + service = object_storage_service.GetObjectStorageClass(cloud)() |
| 109 | + # This method is idempotent with default args and safe to call in each phase. |
| 110 | + service.PrepareService(FLAGS.object_storage_region) |
| 111 | + return service |
| 112 | + |
| 113 | + |
| 114 | +def _GetBucketName() -> str: |
| 115 | + return FLAGS.object_storage_bucket_name or 'pkb%s' % FLAGS.run_uri |
| 116 | + |
| 117 | + |
| 118 | +def Prepare(benchmark_spec): |
| 119 | + """Create and ACL bucket and install curl.""" |
| 120 | + # We would like to always cleanup server side states when exception happens. |
| 121 | + benchmark_spec.always_call_cleanup = True |
| 122 | + |
| 123 | + service = _GetService() |
| 124 | + bucket_name = _GetBucketName() |
| 125 | + |
| 126 | + service.MakeBucket(bucket_name) |
| 127 | + service.MakeBucketPubliclyReadable(bucket_name, also_make_writable=True) |
| 128 | + |
| 129 | + vms = benchmark_spec.vms |
| 130 | + vm_util.RunThreaded(lambda vm: vm.InstallPackages('curl'), vms) |
| 131 | + |
| 132 | + |
| 133 | +def Run(benchmark_spec) -> List[sample.Sample]: |
| 134 | + """Run storage benchmark and publish results. |
| 135 | +
|
| 136 | + Args: |
| 137 | + benchmark_spec: The benchmark specification. Contains all data that is |
| 138 | + required to run the benchmark. |
| 139 | +
|
| 140 | + Returns: |
| 141 | + The same samples as object_storage_multistream. |
| 142 | + """ |
| 143 | + service = _GetService() |
| 144 | + bucket = _GetBucketName() |
| 145 | + |
| 146 | + object_bytes = flag_util.StringToBytes(FLAGS.object_storage_curl_object_size) |
| 147 | + blocks = object_bytes // DD_BLOCKSIZE |
| 148 | + streams_per_vm = FLAGS.object_storage_streams_per_vm |
| 149 | + |
| 150 | + generate_data_cmd = ( |
| 151 | + 'openssl aes-256-ctr -iter 1 -pass file:/dev/urandom -in /dev/zero' |
| 152 | + f' | dd bs={DD_BLOCKSIZE} count={blocks} iflag=fullblock') |
| 153 | + def StartTimeCmd(index): |
| 154 | + return f"date '+{START_TIME} {index} %s.%N'" |
| 155 | + |
| 156 | + def CurlCmd(operation, index): |
| 157 | + return ( |
| 158 | + f"curl -fsw '{CURL_RESULTS} {index} %{{time_total}} " |
| 159 | + # Pad size with zero to force curl to populate it. |
| 160 | + f"%{{response_code}} 0%{{size_{operation}}}\n' -o /dev/null") |
| 161 | + |
| 162 | + def Upload(vm): |
| 163 | + commands = [] |
| 164 | + for object_index in range(streams_per_vm): |
| 165 | + object_name = f'{vm.name}_{object_index}' |
| 166 | + url = service.GetUploadUrl(bucket=bucket, object_name=object_name) |
| 167 | + commands.append( |
| 168 | + f'{StartTimeCmd(object_index)}; {generate_data_cmd} | ' |
| 169 | + f'{CurlCmd(UPLOAD, object_index)} -X {service.UPLOAD_HTTP_METHOD} ' |
| 170 | + f"--data-binary @- '{url}'") |
| 171 | + stdout, _ = vm.RemoteCommandsInParallel(commands) |
| 172 | + return stdout |
| 173 | + |
| 174 | + def Download(vm): |
| 175 | + commands = [] |
| 176 | + for object_index in range(streams_per_vm): |
| 177 | + object_name = f'{vm.name}_{object_index}' |
| 178 | + url = service.GetDownloadUrl(bucket=bucket, object_name=object_name) |
| 179 | + commands.append(f'{StartTimeCmd(object_index)}; ' |
| 180 | + f"{CurlCmd(DOWNLOAD, object_index)} '{url}'") |
| 181 | + stdout, _ = vm.RemoteCommandsInParallel(commands) |
| 182 | + return stdout |
| 183 | + |
| 184 | + vms = benchmark_spec.vms |
| 185 | + samples = [] |
| 186 | + for operation, func in ((UPLOAD, Upload), (DOWNLOAD, Download)): |
| 187 | + output = vm_util.RunThreaded(func, vms) |
| 188 | + start_times, latencies = _LoadWorkerOutput(output) |
| 189 | + object_storage_service_benchmark.ProcessMultiStreamResults( |
| 190 | + start_times, |
| 191 | + latencies, |
| 192 | + all_sizes=[object_bytes], |
| 193 | + sizes=[np.array([object_bytes])] * streams_per_vm * len(vms), |
| 194 | + operation=operation, |
| 195 | + results=samples, |
| 196 | + # We do not retry curl. We simply do not report failing latencies. |
| 197 | + # This under-reports both latency and throughput. Since this benchmark |
| 198 | + # is intended to measure throughput this is reasonable. |
| 199 | + allow_failing_streams=True) |
| 200 | + return samples |
| 201 | + |
| 202 | + |
| 203 | +def Cleanup(_): |
| 204 | + service = _GetService() |
| 205 | + bucket_name = _GetBucketName() |
| 206 | + if not FLAGS.object_storage_dont_delete_bucket: |
| 207 | + service.DeleteBucket(bucket_name) |
| 208 | + service.CleanupService() |
| 209 | + |
| 210 | + |
| 211 | +def _LoadWorkerOutput( |
| 212 | + output: List[str]) -> Tuple[List[np.ndarray], List[np.ndarray]]: |
| 213 | + """Parse the output of Upload and Download functions. |
| 214 | +
|
| 215 | + The output of Upload and Download is |
| 216 | + # START_TIME <index> <Unix start time in seconds> |
| 217 | + START_TIME 0 12345.6789 |
| 218 | + # CURL_RESULTS <index> <latency in s> <HTTP code> <bytes transmitted> |
| 219 | + CURL_RESULTS 0 1.2345 200 01000 |
| 220 | +
|
| 221 | + Lines of output are not ordered and may be interleaved. |
| 222 | +
|
| 223 | + Args: |
| 224 | + output: the output of each upload or download command |
| 225 | +
|
| 226 | + Returns: |
| 227 | + the start times and latencies of the curl commands |
| 228 | +
|
| 229 | + Raises: |
| 230 | + ValueError if output is unexpected. |
| 231 | + """ |
| 232 | + start_times = [] |
| 233 | + latencies = [] |
| 234 | + |
| 235 | + for worker_out in output: |
| 236 | + worker_start_times = [None] * FLAGS.object_storage_streams_per_vm |
| 237 | + worker_latencies = [None] * FLAGS.object_storage_streams_per_vm |
| 238 | + for line in worker_out.strip().split('\n'): |
| 239 | + try: |
| 240 | + line_type, index, value, *curl_data = line.split() |
| 241 | + if line_type == START_TIME: |
| 242 | + assert not curl_data |
| 243 | + worker_start_times[int(index)] = float(value) |
| 244 | + elif line_type == CURL_RESULTS: |
| 245 | + response_code, bytes_transmitted = curl_data |
| 246 | + bytes_transmitted = int(bytes_transmitted) |
| 247 | + if response_code.startswith('4'): |
| 248 | + raise Exception( |
| 249 | + f'cURL command failed with HTTP Code {response_code}') |
| 250 | + elif response_code == '200': |
| 251 | + bytes_expected = flag_util.StringToBytes( |
| 252 | + FLAGS.object_storage_curl_object_size) |
| 253 | + if bytes_transmitted != bytes_expected: |
| 254 | + raise ValueError( |
| 255 | + f'cURL transmitted {bytes_transmitted}' |
| 256 | + f' instead of {bytes_expected}.') |
| 257 | + # curl 7.74 used μs instead of seconds. Not used in major OS types. |
| 258 | + # https://github.com/curl/curl/issues/6321 |
| 259 | + assert '.' in value, 'Invalid curl output.' |
| 260 | + worker_latencies[int(index)] = float(value) |
| 261 | + else: |
| 262 | + logging.warning('cURL command failed with HTTP code %s. ' |
| 263 | + 'Not reporting latency.', response_code) |
| 264 | + else: |
| 265 | + raise ValueError(f'Unexpected line start: {line_type}.') |
| 266 | + # Always show raw line when there is a parsing error. |
| 267 | + except (ValueError, AssertionError) as e: |
| 268 | + raise ValueError(f'Unexpected output:\n{line}') from e |
| 269 | + |
| 270 | + for start_time, latency in zip(worker_start_times, worker_latencies): |
| 271 | + if latency: |
| 272 | + start_times.append(np.array([start_time], dtype=np.float64)) |
| 273 | + latencies.append(np.array([latency], dtype=np.float64)) |
| 274 | + |
| 275 | + return start_times, latencies |
0 commit comments