Skip to content

Commit be74d82

Browse files
committed
feat(frontend): implement OAuth authentication (#13151)
1 parent 5e6e2ca commit be74d82

File tree

13 files changed

+179
-20
lines changed

13 files changed

+179
-20
lines changed

Cargo.lock

+4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proto/user.proto

+2
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@ message AuthInfo {
1414
PLAINTEXT = 1;
1515
SHA256 = 2;
1616
MD5 = 3;
17+
OAUTH = 4;
1718
}
1819
EncryptionType encryption_type = 1;
1920
bytes encrypted_value = 2;
21+
map<string, string> metadata = 3;
2022
}
2123

2224
// User defines a user in the system.

src/frontend/src/handler/alter_user.rs

+18-3
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// limitations under the License.
1414

1515
use pgwire::pg_response::{PgResponse, StatementType};
16-
use risingwave_common::error::ErrorCode::{InternalError, PermissionDenied};
16+
use risingwave_common::error::ErrorCode::{self, InternalError, PermissionDenied};
1717
use risingwave_common::error::Result;
1818
use risingwave_pb::user::update_user_request::UpdateField;
1919
use risingwave_pb::user::UserInfo;
@@ -23,7 +23,9 @@ use super::RwPgResponse;
2323
use crate::binder::Binder;
2424
use crate::catalog::CatalogError;
2525
use crate::handler::HandlerArgs;
26-
use crate::user::user_authentication::encrypted_password;
26+
use crate::user::user_authentication::{
27+
build_oauth_info, encrypted_password, OAUTH_ISSUER_KEY, OAUTH_JWKS_URL_KEY,
28+
};
2729
use crate::user::user_catalog::UserCatalog;
2830

2931
fn alter_prost_user_info(
@@ -111,6 +113,16 @@ fn alter_prost_user_info(
111113
}
112114
update_fields.push(UpdateField::AuthInfo);
113115
}
116+
UserOption::OAuth(options) => {
117+
let auth_info = build_oauth_info(options).ok_or_else(|| {
118+
ErrorCode::InvalidParameterValue(format!(
119+
"{} and {} must be provided",
120+
OAUTH_JWKS_URL_KEY, OAUTH_ISSUER_KEY
121+
))
122+
})?;
123+
user_info.auth_info = Some(auth_info);
124+
update_fields.push(UpdateField::AuthInfo)
125+
}
114126
}
115127
}
116128
Ok((user_info, update_fields))
@@ -181,6 +193,8 @@ pub async fn handle_alter_user(
181193

182194
#[cfg(test)]
183195
mod tests {
196+
use std::collections::HashMap;
197+
184198
use risingwave_pb::user::auth_info::EncryptionType;
185199
use risingwave_pb::user::AuthInfo;
186200

@@ -219,7 +233,8 @@ mod tests {
219233
user_info.auth_info,
220234
Some(AuthInfo {
221235
encryption_type: EncryptionType::Md5 as i32,
222-
encrypted_value: b"9f2fa6a30871a92249bdd2f1eeee4ef6".to_vec()
236+
encrypted_value: b"9f2fa6a30871a92249bdd2f1eeee4ef6".to_vec(),
237+
metadata: HashMap::new(),
223238
})
224239
);
225240
}

src/frontend/src/handler/create_user.rs

+17-3
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// limitations under the License.
1414

1515
use pgwire::pg_response::{PgResponse, StatementType};
16-
use risingwave_common::error::ErrorCode::PermissionDenied;
16+
use risingwave_common::error::ErrorCode::{self, PermissionDenied};
1717
use risingwave_common::error::Result;
1818
use risingwave_pb::user::grant_privilege::{Action, ActionWithGrantOption, Object};
1919
use risingwave_pb::user::{GrantPrivilege, UserInfo};
@@ -23,7 +23,9 @@ use super::RwPgResponse;
2323
use crate::binder::Binder;
2424
use crate::catalog::{CatalogError, DatabaseId};
2525
use crate::handler::HandlerArgs;
26-
use crate::user::user_authentication::encrypted_password;
26+
use crate::user::user_authentication::{
27+
build_oauth_info, encrypted_password, OAUTH_ISSUER_KEY, OAUTH_JWKS_URL_KEY,
28+
};
2729
use crate::user::user_catalog::UserCatalog;
2830

2931
fn make_prost_user_info(
@@ -91,6 +93,15 @@ fn make_prost_user_info(
9193
user_info.auth_info = encrypted_password(&user_info.name, &password.0);
9294
}
9395
}
96+
UserOption::OAuth(options) => {
97+
let auth_info = build_oauth_info(options).ok_or_else(|| {
98+
ErrorCode::InvalidParameterValue(format!(
99+
"{} and {} must be provided",
100+
OAUTH_JWKS_URL_KEY, OAUTH_ISSUER_KEY
101+
))
102+
})?;
103+
user_info.auth_info = Some(auth_info);
104+
}
94105
}
95106
}
96107

@@ -130,6 +141,8 @@ pub async fn handle_create_user(
130141

131142
#[cfg(test)]
132143
mod tests {
144+
use std::collections::HashMap;
145+
133146
use risingwave_common::catalog::DEFAULT_DATABASE_NAME;
134147
use risingwave_pb::user::auth_info::EncryptionType;
135148
use risingwave_pb::user::AuthInfo;
@@ -157,7 +170,8 @@ mod tests {
157170
user_info.auth_info,
158171
Some(AuthInfo {
159172
encryption_type: EncryptionType::Md5 as i32,
160-
encrypted_value: b"827ccb0eea8a706c4c34a16891f84e7b".to_vec()
173+
encrypted_value: b"827ccb0eea8a706c4c34a16891f84e7b".to_vec(),
174+
metadata: HashMap::new(),
161175
})
162176
);
163177
frontend

src/frontend/src/session.rs

+2
Original file line numberDiff line numberDiff line change
@@ -963,6 +963,8 @@ impl SessionManager for SessionManagerImpl {
963963
),
964964
salt,
965965
}
966+
} else if auth_info.encryption_type == EncryptionType::Oauth as i32 {
967+
UserAuthenticator::OAuth(auth_info.metadata.clone())
966968
} else {
967969
return Err(Box::new(Error::new(
968970
ErrorKind::Unsupported,

src/frontend/src/user/user_authentication.rs

+33
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,15 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
16+
1517
use risingwave_pb::user::auth_info::EncryptionType;
1618
use risingwave_pb::user::AuthInfo;
19+
use risingwave_sqlparser::ast::SqlOption;
1720
use sha2::{Digest, Sha256};
1821

22+
use crate::WithOptions;
23+
1924
// SHA-256 is not supported in PostgreSQL protocol. We need to implement SCRAM-SHA-256 instead
2025
// if necessary.
2126
const SHA256_ENCRYPTED_PREFIX: &str = "SHA-256:";
@@ -24,6 +29,27 @@ const MD5_ENCRYPTED_PREFIX: &str = "md5";
2429
const VALID_SHA256_ENCRYPTED_LEN: usize = SHA256_ENCRYPTED_PREFIX.len() + 64;
2530
const VALID_MD5_ENCRYPTED_LEN: usize = MD5_ENCRYPTED_PREFIX.len() + 32;
2631

32+
pub const OAUTH_JWKS_URL_KEY: &str = "jwks_url";
33+
pub const OAUTH_ISSUER_KEY: &str = "issuer";
34+
35+
/// Build `AuthInfo` for `OAuth`.
36+
#[inline(always)]
37+
pub fn build_oauth_info(options: &Vec<SqlOption>) -> Option<AuthInfo> {
38+
let metadata: HashMap<String, String> = WithOptions::try_from(options.as_slice())
39+
.ok()?
40+
.into_inner()
41+
.into_iter()
42+
.collect();
43+
if !metadata.contains_key(OAUTH_JWKS_URL_KEY) || !metadata.contains_key(OAUTH_ISSUER_KEY) {
44+
return None;
45+
}
46+
Some(AuthInfo {
47+
encryption_type: EncryptionType::Oauth as i32,
48+
encrypted_value: Vec::new(),
49+
metadata,
50+
})
51+
}
52+
2753
/// Try to extract the encryption password from given password. The password is always stored
2854
/// encrypted in the system catalogs. The ENCRYPTED keyword has no effect, but is accepted for
2955
/// backwards compatibility. The method of encryption is by default SHA-256-encrypted. If the
@@ -53,11 +79,13 @@ pub fn encrypted_password(name: &str, password: &str) -> Option<AuthInfo> {
5379
Some(AuthInfo {
5480
encryption_type: EncryptionType::Sha256 as i32,
5581
encrypted_value: password.trim_start_matches(SHA256_ENCRYPTED_PREFIX).into(),
82+
metadata: HashMap::new(),
5683
})
5784
} else if valid_md5_password(password) {
5885
Some(AuthInfo {
5986
encryption_type: EncryptionType::Md5 as i32,
6087
encrypted_value: password.trim_start_matches(MD5_ENCRYPTED_PREFIX).into(),
88+
metadata: HashMap::new(),
6189
})
6290
} else {
6391
Some(encrypt_default(name, password))
@@ -70,6 +98,7 @@ fn encrypt_default(name: &str, password: &str) -> AuthInfo {
7098
AuthInfo {
7199
encryption_type: EncryptionType::Md5 as i32,
72100
encrypted_value: md5_hash(name, password),
101+
metadata: HashMap::new(),
73102
}
74103
}
75104

@@ -81,6 +110,7 @@ pub fn encrypted_raw_password(info: &AuthInfo) -> String {
81110
EncryptionType::Plaintext => "",
82111
EncryptionType::Sha256 => SHA256_ENCRYPTED_PREFIX,
83112
EncryptionType::Md5 => MD5_ENCRYPTED_PREFIX,
113+
EncryptionType::Oauth => "",
84114
};
85115
format!("{}{}", prefix, encrypted_pwd)
86116
}
@@ -156,15 +186,18 @@ mod tests {
156186
Some(AuthInfo {
157187
encryption_type: EncryptionType::Md5 as i32,
158188
encrypted_value: md5_hash(user_name, password),
189+
metadata: HashMap::new(),
159190
}),
160191
None,
161192
Some(AuthInfo {
162193
encryption_type: EncryptionType::Md5 as i32,
163194
encrypted_value: md5_hash(user_name, password),
195+
metadata: HashMap::new(),
164196
}),
165197
Some(AuthInfo {
166198
encryption_type: EncryptionType::Sha256 as i32,
167199
encrypted_value: sha256_hash(user_name, password),
200+
metadata: HashMap::new(),
168201
}),
169202
];
170203
let output_passwords = input_passwords

src/sqlparser/src/ast/statement.rs

+10-2
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,7 @@ pub enum UserOption {
715715
NoLogin,
716716
EncryptedPassword(AstString),
717717
Password(Option<AstString>),
718+
OAuth(Vec<SqlOption>),
718719
}
719720

720721
impl fmt::Display for UserOption {
@@ -731,6 +732,9 @@ impl fmt::Display for UserOption {
731732
UserOption::EncryptedPassword(p) => write!(f, "ENCRYPTED PASSWORD {}", p),
732733
UserOption::Password(None) => write!(f, "PASSWORD NULL"),
733734
UserOption::Password(Some(p)) => write!(f, "PASSWORD {}", p),
735+
UserOption::OAuth(options) => {
736+
write!(f, "({})", display_comma_separated(options.as_slice()))
737+
}
734738
}
735739
}
736740
}
@@ -818,10 +822,14 @@ impl ParseTo for UserOptions {
818822
UserOption::EncryptedPassword(AstString::parse_to(parser)?),
819823
)
820824
}
825+
Keyword::OAUTH => {
826+
let options = parser.parse_options()?;
827+
(&mut builder.password, UserOption::OAuth(options))
828+
}
821829
_ => {
822830
parser.expected(
823831
"SUPERUSER | NOSUPERUSER | CREATEDB | NOCREATEDB | LOGIN \
824-
| NOLOGIN | CREATEUSER | NOCREATEUSER | [ENCRYPTED] PASSWORD | NULL",
832+
| NOLOGIN | CREATEUSER | NOCREATEUSER | [ENCRYPTED] PASSWORD | NULL | OAUTH",
825833
token,
826834
)?;
827835
unreachable!()
@@ -831,7 +839,7 @@ impl ParseTo for UserOptions {
831839
} else {
832840
parser.expected(
833841
"SUPERUSER | NOSUPERUSER | CREATEDB | NOCREATEDB | LOGIN | NOLOGIN \
834-
| CREATEUSER | NOCREATEUSER | [ENCRYPTED] PASSWORD | NULL",
842+
| CREATEUSER | NOCREATEUSER | [ENCRYPTED] PASSWORD | NULL | OAUTH",
835843
token,
836844
)?
837845
}

src/sqlparser/src/keywords.rs

+1
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,7 @@ define_keywords!(
344344
NULLIF,
345345
NULLS,
346346
NUMERIC,
347+
OAUTH,
347348
OBJECT,
348349
OCCURRENCES_REGEX,
349350
OCTET_LENGTH,

src/sqlparser/src/parser.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -2379,7 +2379,7 @@ impl Parser {
23792379
// | CREATEDB | NOCREATEDB
23802380
// | CREATEUSER | NOCREATEUSER
23812381
// | LOGIN | NOLOGIN
2382-
// | [ ENCRYPTED ] PASSWORD 'password' | PASSWORD NULL
2382+
// | [ ENCRYPTED ] PASSWORD 'password' | PASSWORD NULL | OAUTH
23832383
fn parse_create_user(&mut self) -> Result<Statement, ParserError> {
23842384
Ok(Statement::CreateUser(CreateUserStatement::parse_to(self)?))
23852385
}

src/storage/src/hummock/sstable_store.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1020,9 +1020,9 @@ impl SstableWriter for StreamingUploadWriter {
10201020
}
10211021

10221022
async fn finish(mut self, meta: SstableMeta) -> HummockResult<UploadJoinHandle> {
1023-
let meta_data = Bytes::from(meta.encode_to_bytes());
1023+
let metadata = Bytes::from(meta.encode_to_bytes());
10241024

1025-
self.object_uploader.write_bytes(meta_data).await?;
1025+
self.object_uploader.write_bytes(metadata).await?;
10261026
let join_handle = tokio::spawn(async move {
10271027
let uploader_memory_usage = self.object_uploader.get_memory_usage();
10281028
let _tracker = self.tracker.map(|mut t| {

src/utils/pgwire/Cargo.toml

+4
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,15 @@ byteorder = "1.5"
2121
bytes = "1"
2222
futures = { version = "0.3", default-features = false, features = ["alloc"] }
2323
itertools = "0.12"
24+
jsonwebtoken = "9"
2425
openssl = "0.10.60"
2526
panic-message = "0.3"
2627
parking_lot = "0.12"
28+
reqwest = { version = "0.11" }
2729
risingwave_common = { workspace = true }
2830
risingwave_sqlparser = { workspace = true }
31+
serde = { version = "1", features = ["derive"] }
32+
serde_json = "1"
2933
thiserror = "1"
3034
thiserror-ext = { workspace = true }
3135
tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "macros"] }

src/utils/pgwire/src/pg_protocol.rs

+4-6
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ where
387387
match msg {
388388
FeMessage::Ssl => self.process_ssl_msg().await?,
389389
FeMessage::Startup(msg) => self.process_startup_msg(msg)?,
390-
FeMessage::Password(msg) => self.process_password_msg(msg)?,
390+
FeMessage::Password(msg) => self.process_password_msg(msg).await?,
391391
FeMessage::Query(query_msg) => self.process_query_msg(query_msg.get_sql()).await?,
392392
FeMessage::CancelQuery(m) => self.process_cancel_msg(m)?,
393393
FeMessage::Terminate => self.process_terminate(),
@@ -508,7 +508,7 @@ where
508508
})?;
509509
self.ready_for_query()?;
510510
}
511-
UserAuthenticator::ClearText(_) => {
511+
UserAuthenticator::ClearText(_) | UserAuthenticator::OAuth(_) => {
512512
self.stream
513513
.write_no_flush(&BeMessage::AuthenticationCleartextPassword)?;
514514
}
@@ -523,11 +523,9 @@ where
523523
Ok(())
524524
}
525525

526-
fn process_password_msg(&mut self, msg: FePasswordMessage) -> PsqlResult<()> {
526+
async fn process_password_msg(&mut self, msg: FePasswordMessage) -> PsqlResult<()> {
527527
let authenticator = self.session.as_ref().unwrap().user_authenticator();
528-
if !authenticator.authenticate(&msg.password) {
529-
return Err(PsqlError::PasswordError);
530-
}
528+
authenticator.authenticate(&msg.password).await?;
531529
self.stream.write_no_flush(&BeMessage::AuthenticationOk)?;
532530
self.stream
533531
.write_parameter_status_msg_no_flush(&ParameterStatus::default())?;

0 commit comments

Comments
 (0)