Skip to content

Commit 8a36ca3

Browse files
authored
feat(meta): Add creating_status field for stream jobs (#12330)
1 parent bf5b14e commit 8a36ca3

File tree

7 files changed

+26
-7
lines changed

7 files changed

+26
-7
lines changed

proto/catalog.proto

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,13 @@ enum SchemaRegistryNameStrategy {
3232
TOPIC_RECORD_NAME_STRATEGY = 2;
3333
}
3434

35+
enum StreamJobStatus {
36+
// Prefixed by `STREAM_JOB_STATUS` due to protobuf namespacing rules.
37+
STREAM_JOB_STATUS_UNSPECIFIED = 0;
38+
CREATING = 1;
39+
CREATED = 2;
40+
}
41+
3542
message StreamSourceInfo {
3643
// deprecated
3744
plan_common.RowFormatType row_format = 1;
@@ -116,6 +123,7 @@ message Sink {
116123
optional uint64 created_at_epoch = 16;
117124
string db_name = 17;
118125
string sink_from_name = 18;
126+
StreamJobStatus stream_job_status = 19;
119127
}
120128

121129
message Connection {
@@ -157,6 +165,7 @@ message Index {
157165

158166
optional uint64 initialized_at_epoch = 10;
159167
optional uint64 created_at_epoch = 11;
168+
StreamJobStatus stream_job_status = 12;
160169
}
161170

162171
message Function {
@@ -250,6 +259,9 @@ message Table {
250259
// In older versions we can just initialize without it.
251260
bool cleaned_by_watermark = 30;
252261

262+
// Used to filter created / creating tables in meta.
263+
StreamJobStatus stream_job_status = 31;
264+
253265
// Per-table catalog version, used by schema change. `None` for internal tables and tests.
254266
// Not to be confused with the global catalog version for notification service.
255267
TableVersion version = 100;

src/connector/src/sink/catalog/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use risingwave_common::catalog::{
2222
};
2323
use risingwave_common::util::epoch::Epoch;
2424
use risingwave_common::util::sort_util::ColumnOrder;
25-
use risingwave_pb::catalog::{PbSink, PbSinkType};
25+
use risingwave_pb::catalog::{PbSink, PbSinkType, PbStreamJobStatus};
2626

2727
#[derive(Clone, Copy, Debug, Default, Hash, PartialOrd, PartialEq, Eq)]
2828
pub struct SinkId {
@@ -191,6 +191,7 @@ impl SinkCatalog {
191191
created_at_epoch: self.created_at_epoch.map(|e| e.0),
192192
db_name: self.db_name.clone(),
193193
sink_from_name: self.sink_from_name.clone(),
194+
stream_job_status: PbStreamJobStatus::Creating.into(),
194195
}
195196
}
196197

src/frontend/src/catalog/index_catalog.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use itertools::Itertools;
2121
use risingwave_common::catalog::IndexId;
2222
use risingwave_common::util::epoch::Epoch;
2323
use risingwave_common::util::sort_util::ColumnOrder;
24-
use risingwave_pb::catalog::PbIndex;
24+
use risingwave_pb::catalog::{PbIndex, PbStreamJobStatus};
2525

2626
use super::ColumnId;
2727
use crate::catalog::{DatabaseId, OwnedByUserCatalog, SchemaId, TableCatalog};
@@ -184,6 +184,7 @@ impl IndexCatalog {
184184
original_columns: self.original_columns.iter().map(Into::into).collect_vec(),
185185
initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
186186
created_at_epoch: self.created_at_epoch.map(|e| e.0),
187+
stream_job_status: PbStreamJobStatus::Creating.into(),
187188
}
188189
}
189190

src/frontend/src/catalog/table_catalog.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use risingwave_common::error::{ErrorCode, RwError};
2424
use risingwave_common::util::epoch::Epoch;
2525
use risingwave_common::util::sort_util::ColumnOrder;
2626
use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType, PbTableVersion};
27-
use risingwave_pb::catalog::PbTable;
27+
use risingwave_pb::catalog::{PbStreamJobStatus, PbTable};
2828
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
2929
use risingwave_pb::plan_common::DefaultColumnDesc;
3030

@@ -401,6 +401,7 @@ impl TableCatalog {
401401
initialized_at_epoch: self.initialized_at_epoch.map(|epoch| epoch.0),
402402
created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0),
403403
cleaned_by_watermark: self.cleaned_by_watermark,
404+
stream_job_status: PbStreamJobStatus::Creating.into(),
404405
}
405406
}
406407

@@ -542,7 +543,7 @@ mod tests {
542543
use risingwave_common::test_prelude::*;
543544
use risingwave_common::types::*;
544545
use risingwave_common::util::sort_util::OrderType;
545-
use risingwave_pb::catalog::PbTable;
546+
use risingwave_pb::catalog::{PbStreamJobStatus, PbTable};
546547
use risingwave_pb::plan_common::{PbColumnCatalog, PbColumnDesc};
547548

548549
use super::*;
@@ -605,6 +606,7 @@ mod tests {
605606
cardinality: None,
606607
created_at_epoch: None,
607608
cleaned_by_watermark: false,
609+
stream_job_status: PbStreamJobStatus::Creating.into(),
608610
}
609611
.into();
610612

src/frontend/src/handler/create_index.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use pgwire::pg_response::{PgResponse, StatementType};
2121
use risingwave_common::catalog::{IndexId, TableDesc, TableId};
2222
use risingwave_common::error::{ErrorCode, Result};
2323
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
24-
use risingwave_pb::catalog::{PbIndex, PbTable};
24+
use risingwave_pb::catalog::{PbIndex, PbStreamJobStatus, PbTable};
2525
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
2626
use risingwave_pb::user::grant_privilege::{Action, Object};
2727
use risingwave_sqlparser::ast;
@@ -242,6 +242,7 @@ pub(crate) fn gen_create_index_plan(
242242
original_columns,
243243
initialized_at_epoch: None,
244244
created_at_epoch: None,
245+
stream_job_status: PbStreamJobStatus::Creating.into(),
245246
};
246247

247248
let plan: PlanRef = materialize.into();

src/storage/src/filter_key_extractor.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ mod tests {
448448
use risingwave_common::util::sort_util::OrderType;
449449
use risingwave_hummock_sdk::key::TABLE_PREFIX_LEN;
450450
use risingwave_pb::catalog::table::TableType;
451-
use risingwave_pb::catalog::PbTable;
451+
use risingwave_pb::catalog::{PbStreamJobStatus, PbTable};
452452
use risingwave_pb::common::{PbColumnOrder, PbDirection, PbNullsAre, PbOrderType};
453453
use risingwave_pb::plan_common::PbColumnCatalog;
454454

@@ -549,6 +549,7 @@ mod tests {
549549
cardinality: None,
550550
created_at_epoch: None,
551551
cleaned_by_watermark: false,
552+
stream_job_status: PbStreamJobStatus::Created.into(),
552553
}
553554
}
554555

src/tests/compaction_test/src/delete_range_runner.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use risingwave_meta::hummock::test_utils::setup_compute_env_with_config;
3636
use risingwave_meta::hummock::MockHummockMetaClient;
3737
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
3838
use risingwave_object_store::object::parse_remote_object_store;
39-
use risingwave_pb::catalog::PbTable;
39+
use risingwave_pb::catalog::{PbStreamJobStatus, PbTable};
4040
use risingwave_pb::hummock::{CompactionConfig, CompactionGroupInfo};
4141
use risingwave_pb::meta::SystemParams;
4242
use risingwave_rpc_client::HummockMetaClient;
@@ -150,6 +150,7 @@ async fn compaction_test(
150150
cardinality: None,
151151
created_at_epoch: None,
152152
cleaned_by_watermark: false,
153+
stream_job_status: PbStreamJobStatus::Created.into(),
153154
};
154155
let mut delete_range_table = delete_key_table.clone();
155156
delete_range_table.id = 2;

0 commit comments

Comments
 (0)