Skip to content

feat: support configuring the manifold URL #968

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions app/buck2_client/src/commands/debug/persist_event_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use buck2_client_ctx::common::BuckArgMatches;
use buck2_client_ctx::exit_result::ExitResult;
use buck2_common::chunk_reader::ChunkReader;
use buck2_common::manifold;
use buck2_common::manifold::BucketsConfig;
use buck2_common::manifold::ManifoldChunkedUploader;
use buck2_common::manifold::ManifoldClient;
use buck2_core::fs::paths::abs_path::AbsPathBuf;
Expand Down Expand Up @@ -72,9 +73,12 @@ impl PersistEventLogsCommand {
buck2_core::facebook_only();
let sink = create_scribe_sink(&ctx)?;
let trace_id = self.trace_id.clone();
let buckets_config = ctx.buckets_config()?;

ctx.with_runtime(|mut ctx| async move {
let mut stdin = io::BufReader::new(ctx.stdin());
let (local_result, remote_result) = self.write_and_upload(&mut stdin).await;
let (local_result, remote_result) =
self.write_and_upload(buckets_config, &mut stdin).await;

let (local_error_messages, local_error_category, local_success) =
status_from_result(local_result);
Expand All @@ -97,6 +101,7 @@ impl PersistEventLogsCommand {

async fn write_and_upload(
self,
buckets_config: Option<BucketsConfig>,
stdin: impl io::AsyncBufRead + Unpin,
) -> (buck2_error::Result<()>, buck2_error::Result<()>) {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
Expand All @@ -113,7 +118,13 @@ impl PersistEventLogsCommand {
}
};
let write = write_task(&file, tx, stdin);
let upload = upload_task(&file, rx, self.manifold_name, self.no_upload);
let upload = upload_task(
&file,
rx,
buckets_config,
self.manifold_name,
self.no_upload,
);

// Wait for both tasks to finish. If the upload fails we want to keep writing to disk
let (write_result, upload_result) = tokio::join!(write, upload);
Expand Down Expand Up @@ -165,14 +176,20 @@ async fn create_log_file(local_path: String) -> Result<tokio::fs::File, buck2_er
async fn upload_task(
file_mutex: &Mutex<File>,
mut rx: tokio::sync::mpsc::UnboundedReceiver<u64>,
buckets_config: Option<BucketsConfig>,
manifold_name: String,
no_upload: bool,
) -> buck2_error::Result<()> {
if no_upload {
return Ok(());
}

let manifold_client = ManifoldClient::new().await?;
let manifold_client = ManifoldClient::new_with_config(buckets_config).await?;
// No need to do more work if we don't have an endpoint configured (default
// in OSS)
if !manifold_client.will_upload() {
return Ok(());
}
let manifold_path = format!("flat/{}", manifold_name);
let mut uploader = Uploader::new(file_mutex, &manifold_path, &manifold_client)?;

Expand Down
2 changes: 1 addition & 1 deletion app/buck2_client/src/commands/debug/upload_re_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl UploadReLogsCommand {

// TODO: This should receive the path from the caller.
ctx.with_runtime(|ctx| async move {
let manifold = ManifoldClient::new().await?;
let manifold = ManifoldClient::new_with_config(ctx.buckets_config()?).await?;
let re_logs_dir = ctx.paths()?.re_logs_dir();
upload_re_logs(
&manifold,
Expand Down
4 changes: 2 additions & 2 deletions app/buck2_client/src/commands/rage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl RageCommand {
let client_ctx = ctx.empty_client_context("rage")?;

// Don't fail the rage if you can't figure out whether to do vpnless.
let manifold = ManifoldClient::new().await?;
let manifold = ManifoldClient::new_with_config(ctx.buckets_config()?).await?;

let rage_id = TraceId::new();
let mut manifold_id = format!("{}", rage_id);
Expand Down Expand Up @@ -544,7 +544,7 @@ async fn upload_re_logs_impl(
upload_re_logs::upload_re_logs(manifold, bucket, re_logs_dir, &re_session_id, &filename)
.await?;

Ok(manifold_leads(&bucket, filename))
Ok(manifold_leads(manifold, &bucket, filename))
}

async fn dispatch_result_event(
Expand Down
6 changes: 5 additions & 1 deletion app/buck2_client/src/commands/rage/dice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ pub async fn upload_dice_dump(
)
.await?;

Ok(manifold_leads(&manifold_bucket, manifold_filename))
Ok(manifold_leads(
manifold,
&manifold_bucket,
manifold_filename,
))
}

struct DiceDump {
Expand Down
26 changes: 19 additions & 7 deletions app/buck2_client/src/commands/rage/manifold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,23 @@ enum ManifoldError {
OpenFileError(String),
}

pub(crate) fn manifold_leads(bucket: &Bucket, filename: String) -> String {
let full_path = format!("{}/{}", bucket.name, filename);
let command = format!("manifold get {}", full_path);
let url = format!("https://interncache-all.fbcdn.net/manifold/{}", full_path);
format!("{}\n{}", command, url)
pub(crate) fn manifold_leads(
manifold: &ManifoldClient,
bucket: &Bucket,
filename: String,
) -> String {
let url = manifold.file_view_url(bucket, &filename);
let command = manifold.file_dump_command(bucket, &filename);
let mut out = String::new();
if let Some(url) = url {
out.push_str(&url);
}
if let Some(command) = command {
out.push('\n');
out.push_str(&command);
}

out
}

pub(crate) async fn file_to_manifold(
Expand All @@ -45,7 +57,7 @@ pub(crate) async fn file_to_manifold(
.read_and_upload(bucket, &filename, Default::default(), &mut file)
.await?;

Ok(manifold_leads(&bucket, filename))
Ok(manifold_leads(manifold, &bucket, filename))
}

pub(crate) async fn buf_to_manifold(
Expand All @@ -60,5 +72,5 @@ pub(crate) async fn buf_to_manifold(
.read_and_upload(bucket, &filename, Default::default(), &mut cursor)
.await?;

Ok(manifold_leads(&bucket, filename))
Ok(manifold_leads(manifold, &bucket, filename))
}
9 changes: 9 additions & 0 deletions app/buck2_client_ctx/src/client_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use buck2_core::fs::working_dir::AbsWorkingDir;
use buck2_error::BuckErrorContext;
use buck2_event_observer::verbosity::Verbosity;
use buck2_wrapper_common::invocation_id::TraceId;
use buck2_common::manifold::BucketsConfig;
use dupe::Dupe;
use tokio::runtime::Runtime;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -317,6 +318,14 @@ impl<'a> ClientCommandContext<'a> {
.log_download_method
.clone())
}

pub fn buckets_config(&self) -> buck2_error::Result<Option<BucketsConfig>> {
Ok(self
.immediate_config
.daemon_startup_config()?
.buckets_config
.clone())
}
}

/// Provides a common interface for buck subcommands that use event subscribers for logging.
Expand Down
5 changes: 5 additions & 0 deletions app/buck2_common/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use serde::Serialize;

use crate::legacy_configs::configs::LegacyBuckConfig;
use crate::legacy_configs::key::BuckconfigKeyRef;
use crate::manifold::BucketsConfig;

/// Helper enum to categorize the kind of timeout we get from the startup config.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -383,6 +384,7 @@ pub struct DaemonStartupConfig {
pub http: HttpConfig,
pub resource_control: ResourceControlConfig,
pub log_download_method: LogDownloadMethod,
pub buckets_config: Option<BucketsConfig>,
pub health_check_config: HealthCheckConfig,
}

Expand Down Expand Up @@ -451,6 +453,7 @@ impl DaemonStartupConfig {
.map(ToOwned::to_owned),
http: HttpConfig::from_config(config)?,
resource_control: ResourceControlConfig::from_config(config)?,
buckets_config: BucketsConfig::from_config(config)?,
log_download_method,
health_check_config: HealthCheckConfig::from_config(config)?,
})
Expand Down Expand Up @@ -478,6 +481,8 @@ impl DaemonStartupConfig {
} else {
LogDownloadMethod::None
},
// TODO(jadel): is this a regression in test vs before?
buckets_config: None,
health_check_config: HealthCheckConfig::default(),
}
}
Expand Down
Loading
Loading