|
| 1 | +// Copyright 2025 Pants project contributors (see CONTRIBUTORS.md). |
| 2 | +// Licensed under the Apache License, Version 2.0 (see LICENSE). |
| 3 | + |
| 4 | +use std::{collections::BTreeMap, path::PathBuf, time::Duration}; |
| 5 | + |
| 6 | +use clap::Parser; |
| 7 | +use remote_provider::{RemoteProvider, RemoteStoreOptions}; |
| 8 | +use task_executor::Executor; |
| 9 | + |
| 10 | +use crate::Store; |
| 11 | + |
| 12 | +#[derive(Parser)] |
| 13 | +pub struct StoreCliOpt { |
| 14 | + ///Path to lmdb directory used for local file storage. |
| 15 | + #[arg(long)] |
| 16 | + local_store_path: Option<PathBuf>, |
| 17 | + |
| 18 | + #[arg(long)] |
| 19 | + pub remote_instance_name: Option<String>, |
| 20 | + |
| 21 | + /// The host:port of the gRPC CAS server to connect to. |
| 22 | + #[arg(long)] |
| 23 | + pub cas_server: Option<String>, |
| 24 | + |
| 25 | + /// Path to file containing root certificate authority certificates for the CAS server. |
| 26 | + /// If not set, TLS will not be used when connecting to the CAS server. |
| 27 | + #[arg(long)] |
| 28 | + pub cas_root_ca_cert_file: Option<PathBuf>, |
| 29 | + |
| 30 | + /// Path to file containing client certificates for the CAS server. |
| 31 | + /// If not set, client authentication will not be used when connecting to the CAS server. |
| 32 | + #[arg(long)] |
| 33 | + pub cas_client_certs_file: Option<PathBuf>, |
| 34 | + |
| 35 | + /// Path to file containing client key for the CAS server. |
| 36 | + /// If not set, client authentication will not be used when connecting to the CAS server. |
| 37 | + #[arg(long)] |
| 38 | + pub cas_client_key_file: Option<PathBuf>, |
| 39 | + |
| 40 | + /// Path to file containing oauth bearer token for communication with the CAS server. |
| 41 | + /// If not set, no authorization will be provided to remote servers. |
| 42 | + #[arg(long)] |
| 43 | + pub cas_oauth_bearer_token_path: Option<PathBuf>, |
| 44 | + |
| 45 | + /// Number of bytes to include per-chunk when uploading bytes. |
| 46 | + /// grpc imposes a hard message-size limit of around 4MB. |
| 47 | + #[arg(long, default_value = "3145728")] |
| 48 | + pub upload_chunk_bytes: usize, |
| 49 | + |
| 50 | + /// Number of retries per request to the store service. |
| 51 | + #[arg(long, default_value = "3")] |
| 52 | + pub store_rpc_retries: usize, |
| 53 | + |
| 54 | + /// Number of concurrent requests to the store service. |
| 55 | + #[arg(long, default_value = "128")] |
| 56 | + pub store_rpc_concurrency: usize, |
| 57 | + |
| 58 | + /// Total size of blobs allowed to be sent in a single API call. |
| 59 | + #[arg(long, default_value = "4194304")] |
| 60 | + pub store_batch_api_size_limit: usize, |
| 61 | + |
| 62 | + /// Extra header to pass on remote execution request. |
| 63 | + #[arg(long)] |
| 64 | + pub header: Vec<String>, |
| 65 | +} |
| 66 | + |
| 67 | +impl StoreCliOpt { |
| 68 | + pub fn get_headers( |
| 69 | + &self, |
| 70 | + oauth_bearer_token_path: &Option<PathBuf>, |
| 71 | + ) -> Result<BTreeMap<String, String>, String> { |
| 72 | + let mut headers: BTreeMap<String, String> = collection_from_keyvalues(self.header.iter()); |
| 73 | + if let Some(ref oauth_path) = oauth_bearer_token_path { |
| 74 | + let token = std::fs::read_to_string(oauth_path) |
| 75 | + .map_err(|e| format!("Error reading oauth bearer token file: {}", e))?; |
| 76 | + headers.insert( |
| 77 | + "authorization".to_owned(), |
| 78 | + format!("Bearer {}", token.trim()), |
| 79 | + ); |
| 80 | + } |
| 81 | + Ok(headers) |
| 82 | + } |
| 83 | + |
| 84 | + pub async fn create_store(&self, executor: Executor) -> Result<Store, String> { |
| 85 | + let local_store_path = self |
| 86 | + .local_store_path |
| 87 | + .clone() |
| 88 | + .unwrap_or_else(Store::default_path); |
| 89 | + |
| 90 | + let local_only_store = Store::local_only(executor.clone(), local_store_path)?; |
| 91 | + |
| 92 | + if let Some(cas_server) = &self.cas_server { |
| 93 | + let tls_config = grpc_util::tls::Config::new_from_files( |
| 94 | + self.cas_root_ca_cert_file.as_deref(), |
| 95 | + self.cas_client_certs_file.as_deref(), |
| 96 | + self.cas_client_key_file.as_deref(), |
| 97 | + )?; |
| 98 | + let headers = self.get_headers(&self.cas_oauth_bearer_token_path)?; |
| 99 | + local_only_store |
| 100 | + .into_with_remote(RemoteStoreOptions { |
| 101 | + provider: RemoteProvider::Reapi, |
| 102 | + store_address: cas_server.to_owned(), |
| 103 | + instance_name: self.remote_instance_name.clone(), |
| 104 | + tls_config, |
| 105 | + headers, |
| 106 | + chunk_size_bytes: self.upload_chunk_bytes, |
| 107 | + timeout: Duration::from_secs(30), |
| 108 | + retries: self.store_rpc_retries, |
| 109 | + concurrency_limit: self.store_rpc_concurrency, |
| 110 | + batch_api_size_limit: self.store_batch_api_size_limit, |
| 111 | + }) |
| 112 | + .await |
| 113 | + } else { |
| 114 | + Ok(local_only_store) |
| 115 | + } |
| 116 | + } |
| 117 | +} |
| 118 | + |
| 119 | +pub fn collection_from_keyvalues<Str, It, Col>(keyvalues: It) -> Col |
| 120 | +where |
| 121 | + Str: AsRef<str>, |
| 122 | + It: Iterator<Item = Str>, |
| 123 | + Col: FromIterator<(String, String)>, |
| 124 | +{ |
| 125 | + keyvalues |
| 126 | + .map(|kv| { |
| 127 | + let mut parts = kv.as_ref().splitn(2, '='); |
| 128 | + ( |
| 129 | + parts.next().unwrap().to_string(), |
| 130 | + parts.next().unwrap_or_default().to_string(), |
| 131 | + ) |
| 132 | + }) |
| 133 | + .collect() |
| 134 | +} |
0 commit comments