Skip to content

Commit 355a523

Browse files
authored
Add support for http proxies (#1426)
Adds support for HTTP(S) proxies, in addition to SOCKS5 proxies that we've been supporting for a little while. Closes svix/monorepo-private#8990.
2 parents 2a7f11c + b360cc5 commit 355a523

File tree

6 files changed

+133
-54
lines changed

6 files changed

+133
-54
lines changed

server/Cargo.lock

Lines changed: 18 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/svix-server/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ form_urlencoded = "1.1.0"
7474
lapin = "2.1.1"
7575
sentry = { version = "0.32.2", features = ["tracing"] }
7676
omniqueue = { git = "https://github.com/svix/omniqueue-rs", rev = "5ae22000e2ea214ba707cac81657f098e5785a76", default-features = false, features = ["in_memory", "rabbitmq-with-message-ids", "redis_cluster"] }
77+
# Not a well-known author, and no longer gets updates => pinned.
78+
# Switch to hyper-http-proxy when upgrading hyper to 1.0.
79+
hyper-proxy = { version = "=0.9.1", default-features = false, features = ["openssl-tls"] }
7780

7881
[target.'cfg(not(target_env = "msvc"))'.dependencies]
7982
tikv-jemallocator = { version = "0.5", optional = true }

server/svix-server/src/cfg.rs

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use figment::{
77
providers::{Env, Format, Toml},
88
Figment,
99
};
10-
use http::uri::InvalidUri;
1110
use ipnet::IpNet;
1211
use serde::{Deserialize, Deserializer};
1312
use tracing::Level;
@@ -199,28 +198,36 @@ pub struct ConfigurationInner {
199198

200199
#[derive(Clone, Debug, Deserialize)]
201200
pub struct ProxyConfig {
202-
/// SOCKS5 proxy address.
201+
/// Proxy address.
203202
///
204-
/// More proxy types may be supported in the future.
203+
/// Currently supported proxy types are:
204+
/// - `socks5://`, i.e. a SOCKS5 proxy, with domain name resolution being
205+
/// done before the proxy gets involved
206+
/// - `http://` or `https://` proxy, sending HTTP requests to the proxy;
207+
/// both HTTP and HTTPS targets are supported
205208
#[serde(rename = "proxy_addr")]
206209
pub addr: ProxyAddr,
207210
}
208211

209212
#[derive(Clone, Debug)]
210-
pub struct ProxyAddr {
211-
parsed: http::Uri,
213+
pub enum ProxyAddr {
214+
/// A SOCKS5 proxy.
215+
Socks5(http::Uri),
216+
/// An HTTP / HTTPs proxy.
217+
Http(http::Uri),
212218
}
213219

214220
impl ProxyAddr {
215-
pub fn new(raw: impl AsRef<str>) -> Result<Self, InvalidUri> {
216-
let parsed = raw.as_ref().parse()?;
217-
Ok(Self { parsed })
218-
}
219-
}
220-
221-
impl From<ProxyAddr> for http::Uri {
222-
fn from(value: ProxyAddr) -> Self {
223-
value.parsed
221+
pub fn new(raw: impl Into<String>) -> Result<Self, Box<dyn std::error::Error>> {
222+
let raw = raw.into();
223+
let parsed: http::Uri = raw.parse()?;
224+
match parsed.scheme_str().unwrap_or("") {
225+
"socks5" => Ok(Self::Socks5(parsed)),
226+
"http" | "https" => Ok(Self::Http(parsed)),
227+
_ => Err("Unsupported proxy scheme. \
228+
Supported schemes are `socks5://`, `http://` and `https://`."
229+
.into()),
230+
}
224231
}
225232
}
226233

@@ -510,7 +517,7 @@ mod tests {
510517
figment::Jail::expect_with(|jail| {
511518
jail.set_env("SVIX_QUEUE_TYPE", "memory");
512519
jail.set_env("SVIX_JWT_SECRET", "x");
513-
jail.set_env("SVIX_PROXY_ADDR", "x");
520+
jail.set_env("SVIX_PROXY_ADDR", "socks5://127.0.0.1");
514521

515522
let cfg = load().unwrap();
516523
assert!(cfg.proxy_config.is_some());

server/svix-server/src/core/webhook_http_client.rs

Lines changed: 67 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,17 @@ use hyper::{
2121
ext::HeaderCaseMap,
2222
Body, Client, Uri,
2323
};
24-
use hyper_openssl::HttpsConnector;
24+
use hyper_openssl::{HttpsConnector, MaybeHttpsStream};
25+
use hyper_proxy::{Intercept, Proxy, ProxyConnector as HttpProxyConnector, ProxyStream};
2526
use hyper_socks2::SocksConnector;
2627
use ipnet::IpNet;
27-
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
28+
use openssl::ssl::{SslConnector, SslConnectorBuilder, SslMethod, SslVerifyMode};
2829
use serde::Serialize;
2930
use thiserror::Error;
3031
use tokio::{net::TcpStream, sync::Mutex};
3132
use tower::Service;
3233

33-
use crate::cfg::ProxyConfig;
34+
use crate::cfg::{ProxyAddr, ProxyConfig};
3435

3536
pub type CaseSensitiveHeaderMap = HashMap<String, HeaderValue>;
3637

@@ -55,7 +56,7 @@ pub enum Error {
5556

5657
#[derive(Clone)]
5758
pub struct WebhookClient {
58-
client: Client<HttpsConnector<SvixHttpConnector>, Body>,
59+
client: Client<SvixHttpsConnector, Body>,
5960
whitelist_nets: Arc<Vec<IpNet>>,
6061
}
6162

@@ -64,7 +65,7 @@ impl WebhookClient {
6465
whitelist_nets: Option<Arc<Vec<IpNet>>>,
6566
whitelist_names: Option<Arc<Vec<String>>>,
6667
dangerous_disable_tls_verification: bool,
67-
proxy_config: Option<ProxyConfig>,
68+
proxy_config: Option<&ProxyConfig>,
6869
) -> Self {
6970
let whitelist_nets = whitelist_nets.unwrap_or_else(|| Arc::new(Vec::new()));
7071
let whitelist_names = whitelist_names.unwrap_or_else(|| Arc::new(Vec::new()));
@@ -81,8 +82,8 @@ impl WebhookClient {
8182
ssl.set_verify(SslVerifyMode::NONE);
8283
}
8384

84-
let http = SvixHttpConnector::new(http, proxy_config);
85-
let https = HttpsConnector::with_connector(http, ssl).expect("HttpsConnector build failed");
85+
let https = SvixHttpsConnector::new(http, proxy_config, ssl)
86+
.expect("SvixHttpsConnector build failed");
8687

8788
let client: Client<_, hyper::Body> = Client::builder()
8889
.http1_ignore_invalid_headers_in_responses(true)
@@ -428,54 +429,87 @@ impl RequestBuilder {
428429
}
429430
}
430431

431-
/// Plain-HTTP connector that blocks outgoing requests to private IPs with
432-
/// support for optionally proxying via SOCKS5.
433-
#[derive(Clone, Debug)]
434-
enum SvixHttpConnector {
435-
Regular(NonLocalHttpConnector),
436-
Proxied(SocksConnector<NonLocalHttpConnector>),
432+
/// HTTP connector that blocks outgoing requests to private IPs with support
433+
/// for HTTPS and optionally proxying via SOCKS5 or HTTP(S).
434+
#[derive(Clone)]
435+
enum SvixHttpsConnector {
436+
Regular(HttpsConnector<NonLocalHttpConnector>),
437+
Socks5Proxy(HttpsConnector<SocksConnector<NonLocalHttpConnector>>),
438+
HttpProxy(HttpProxyConnector<HttpConnector<NonLocalDnsResolver>>),
437439
}
438440

439-
impl SvixHttpConnector {
440-
fn new(inner: NonLocalHttpConnector, proxy_cfg: Option<ProxyConfig>) -> Self {
441+
impl SvixHttpsConnector {
442+
fn new(
443+
inner: NonLocalHttpConnector,
444+
proxy_cfg: Option<&ProxyConfig>,
445+
ssl: SslConnectorBuilder,
446+
) -> Result<Self, Box<dyn std::error::Error>> {
441447
match proxy_cfg {
442-
Some(proxy_cfg) => Self::Proxied(SocksConnector {
443-
proxy_addr: proxy_cfg.addr.into(),
444-
auth: None,
445-
connector: inner,
446-
}),
447-
None => Self::Regular(inner),
448+
Some(proxy_cfg) => match proxy_cfg.addr.clone() {
449+
// In the SOCKS5 case, TLS is handled inside of the proxy
450+
// TcpStream, by the same code that would do it without a proxy
451+
ProxyAddr::Socks5(proxy_addr) => {
452+
let socks = SocksConnector {
453+
proxy_addr,
454+
auth: None,
455+
connector: inner,
456+
};
457+
let socks_https = HttpsConnector::with_connector(socks, ssl)?;
458+
Ok(Self::Socks5Proxy(socks_https))
459+
}
460+
// In the HTTP proxy case, TLS is handled by the proxy connector
461+
ProxyAddr::Http(proxy_addr) => {
462+
let proxy = Proxy::new(Intercept::All, proxy_addr);
463+
Ok(Self::HttpProxy(HttpProxyConnector::from_proxy(
464+
inner, proxy,
465+
)?))
466+
}
467+
},
468+
None => {
469+
let https = HttpsConnector::with_connector(inner, ssl)?;
470+
Ok(Self::Regular(https))
471+
}
448472
}
449473
}
450474
}
451475

452-
impl Service<Uri> for SvixHttpConnector {
453-
type Response = TcpStream;
454-
type Error = hyper_socks2::Error;
476+
impl Service<Uri> for SvixHttpsConnector {
477+
type Response = ProxyStream<TcpStream>;
478+
type Error = Box<dyn std::error::Error + Send + Sync>;
455479
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
456480

457481
fn poll_ready(
458482
&mut self,
459483
cx: &mut std::task::Context<'_>,
460484
) -> std::task::Poll<Result<(), Self::Error>> {
461485
match self {
462-
Self::Regular(inner) => inner
463-
.poll_ready(cx)
464-
.map_err(|e| hyper_socks2::Error::Connector(e.into())),
465-
Self::Proxied(inner) => inner.poll_ready(cx),
486+
Self::Regular(inner) => inner.poll_ready(cx),
487+
Self::Socks5Proxy(inner) => inner.poll_ready(cx),
488+
Self::HttpProxy(inner) => inner.poll_ready(cx).map_err(Into::into),
466489
}
467490
}
468491

469492
fn call(&mut self, req: Uri) -> Self::Future {
493+
fn convert_stream(maybe_https: MaybeHttpsStream<TcpStream>) -> ProxyStream<TcpStream> {
494+
match maybe_https {
495+
MaybeHttpsStream::Http(stream) => ProxyStream::NoProxy(stream),
496+
MaybeHttpsStream::Https(stream) => ProxyStream::Secured(stream),
497+
}
498+
}
499+
470500
match self {
471501
Self::Regular(inner) => {
472502
let fut = inner.call(req);
473-
Box::pin(async move {
474-
fut.await
475-
.map_err(|e| hyper_socks2::Error::Connector(e.into()))
476-
})
503+
Box::pin(async move { Ok(convert_stream(fut.await?)) })
504+
}
505+
Self::Socks5Proxy(inner) => {
506+
let fut = inner.call(req);
507+
Box::pin(async move { Ok(convert_stream(fut.await?)) })
508+
}
509+
Self::HttpProxy(inner) => {
510+
let fut = inner.call(req);
511+
Box::pin(async move { fut.await.map_err(Into::into) })
477512
}
478-
Self::Proxied(inner) => Box::pin(inner.call(req)),
479513
}
480514
}
481515
}

server/svix-server/src/worker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -901,7 +901,7 @@ pub async fn queue_handler(
901901
cfg.whitelist_subnets.clone(),
902902
Some(Arc::new(vec!["backend".to_owned()])),
903903
cfg.dangerous_disable_tls_verification,
904-
cfg.proxy_config.clone(),
904+
cfg.proxy_config.as_ref(),
905905
);
906906

907907
tokio::spawn(

server/svix-server/tests/it/e2e_proxy.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,38 @@ async fn test_message_delivery_via_socks5() {
1717
use crate::utils::start_svix_server_with_cfg;
1818

1919
let mut cfg = get_default_test_config();
20-
cfg.proxy_config = Some(proxy_config());
20+
cfg.proxy_config = Some(socks_proxy_config());
2121
let (client, _) = start_svix_server_with_cfg(&cfg).await;
22-
run_socks5_test(&client).await;
22+
run_proxy_test(&client).await;
2323
}
2424

25-
fn proxy_config() -> ProxyConfig {
25+
fn socks_proxy_config() -> ProxyConfig {
2626
ProxyConfig {
2727
addr: ProxyAddr::new("socks5://localhost:1080").unwrap(),
2828
}
2929
}
3030

31-
async fn run_socks5_test(client: &TestClient) {
31+
#[ignore] // requires an http proxy to be running
32+
#[tokio::test]
33+
async fn test_message_delivery_via_http_proxy() {
34+
use crate::utils::start_svix_server_with_cfg;
35+
36+
let mut cfg = get_default_test_config();
37+
cfg.proxy_config = Some(http_proxy_config());
38+
let (client, _) = start_svix_server_with_cfg(&cfg).await;
39+
run_proxy_test(&client).await;
40+
}
41+
42+
fn http_proxy_config() -> ProxyConfig {
43+
ProxyConfig {
44+
addr: ProxyAddr::new("http://localhost:8888").unwrap(),
45+
}
46+
}
47+
48+
async fn run_proxy_test(client: &TestClient) {
3249
let mut receiver = TestReceiver::start(StatusCode::OK);
3350

34-
let app_id = create_test_app(client, "kafkaSinkTest").await.unwrap().id;
51+
let app_id = create_test_app(client, "proxyTest").await.unwrap().id;
3552
create_test_endpoint(client, &app_id, &receiver.endpoint)
3653
.await
3754
.unwrap();

0 commit comments

Comments
 (0)