-
Notifications
You must be signed in to change notification settings - Fork 644
[CORE-12248] - Define some test utilities for DR/CL #26528
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
[CORE-12248] - Define some test utilities for DR/CL #26528
Conversation
Added abstract class that will be used to connect to remote clusters and a factory that will be used to create them on demand. Signed-off-by: Michael Boquard <[email protected]> (cherry picked from commit 9020f2dd4d6d1c039d8ef5fb6bca98d0b324f490)
(cherry picked from commit 72df6570ff0d34cc0a17ce071d64daf251f0a189)
Added a fake cluster representation to be used in unit tests Signed-off-by: Michael Boquard <[email protected]> (cherry picked from commit 9a61af5fea27830dc8d2bf4363fe4c85aee617f2)
Added an abstract class and metadata_cache_adapter that decouples cluster::metadata_cache from the make_topic_configs function. This permits us to use that function in unit tests without having to create an entire test fixture. Signed-off-by: Michael Boquard <[email protected]> (cherry picked from commit 2ddd05a55e6afd84c5ae4b40f863a9f8b324d630)
This class sets a fake remote client connection that can also be overridden with other implementations to assist with unit testing cluster linking. Signed-off-by: Michael Boquard <[email protected]> (cherry picked from commit c4f27abb96a892ac323d6d4658bea9f8347b8f26)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR sets up the initial testing framework for cluster linking components, introducing a metadata cache abstraction in the Kafka config response utilities and test utilities for simulating remote cluster connections.
- Introduces metadata_cache_info and metadata_cache_adapter in the Kafka config handler for better unit test integration.
- Adds test utilities and source cluster implementations to simulate remote cluster behaviors in unit tests.
- Updates remote cluster connection interfaces and BUILD dependencies to support the new testing framework.
Reviewed Changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
src/v/kafka/server/handlers/configs/config_response_utils.h | Introduces metadata cache interface and adapter; adds inline overload for make_topic_configs to simplify test usage. |
src/v/kafka/server/handlers/configs/config_response_utils.cc | Implements metadata_cache_adapter methods and updates make_topic_configs to use the new abstraction. |
src/v/cluster_link/tests/test_utils.h/cc | Provides test connection and connection factory implementations for remote cluster connections. |
src/v/cluster_link/tests/source_cluster.h/cc | Adds source cluster and source topic test fixtures. |
src/v/cluster_link/manager.h and BUILD | Updates remote cluster connection interface and adds new dependencies in BUILD files. |
Comments suppressed due to low confidence (1)
src/v/kafka/server/handlers/configs/config_response_utils.h:150
- [nitpick] Add a brief comment clarifying the role of the metadata_cache_adapter in the conversion overload for make_topic_configs to help future maintainers understand the adapter logic.
inline config_response_container_t make_topic_configs(
virtual model::compression get_default_compression() const = 0; | ||
virtual model::cleanup_policy_bitflags | ||
get_default_cleanup_policy_bitflags() const | ||
= 0; | ||
virtual size_t get_default_compacted_topic_segment_size() const = 0; | ||
virtual size_t get_default_segment_size() const = 0; | ||
virtual std::optional<std::chrono::milliseconds> | ||
get_default_retention_duration() const = 0; | ||
virtual std::optional<size_t> get_default_retention_bytes() const = 0; | ||
virtual model::timestamp_type get_default_timestamp_type() const = 0; | ||
virtual uint32_t get_default_batch_max_bytes() const = 0; | ||
virtual model::shadow_indexing_mode get_default_shadow_indexing_mode() const | ||
= 0; | ||
virtual std::optional<size_t> | ||
get_default_retention_local_target_bytes() const = 0; | ||
virtual std::chrono::milliseconds | ||
get_default_retention_local_target_ms() const | ||
= 0; | ||
virtual std::optional<std::chrono::milliseconds> | ||
get_default_segment_ms() const = 0; | ||
virtual std::optional<std::chrono::milliseconds> | ||
get_default_delete_retention_ms() const = 0; | ||
virtual bool get_default_record_key_schema_id_validation() const = 0; | ||
virtual pandaproxy::schema_registry::subject_name_strategy | ||
get_default_record_key_subject_name_strategy() const | ||
= 0; | ||
virtual bool get_default_record_value_schema_id_validation() const = 0; | ||
virtual pandaproxy::schema_registry::subject_name_strategy | ||
get_default_record_value_subject_name_strategy() const | ||
= 0; | ||
virtual std::optional<size_t> | ||
get_default_initial_retention_local_target_bytes() const = 0; | ||
virtual std::optional<std::chrono::milliseconds> | ||
get_default_initial_retention_local_target_ms() const = 0; | ||
virtual std::chrono::milliseconds get_default_iceberg_target_lag_ms() const | ||
= 0; | ||
virtual std::optional<double> | ||
get_default_min_cleanable_dirty_ratio() const = 0; | ||
virtual std::chrono::milliseconds get_default_min_compaction_lag_ms() const | ||
= 0; | ||
virtual std::chrono::milliseconds get_default_max_compaction_lag_ms() const | ||
= 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Consider grouping related default configuration methods together and adding inline comments to the metadata_cache_info interface to enhance its readability and maintainability.
virtual model::compression get_default_compression() const = 0; | |
virtual model::cleanup_policy_bitflags | |
get_default_cleanup_policy_bitflags() const | |
= 0; | |
virtual size_t get_default_compacted_topic_segment_size() const = 0; | |
virtual size_t get_default_segment_size() const = 0; | |
virtual std::optional<std::chrono::milliseconds> | |
get_default_retention_duration() const = 0; | |
virtual std::optional<size_t> get_default_retention_bytes() const = 0; | |
virtual model::timestamp_type get_default_timestamp_type() const = 0; | |
virtual uint32_t get_default_batch_max_bytes() const = 0; | |
virtual model::shadow_indexing_mode get_default_shadow_indexing_mode() const | |
= 0; | |
virtual std::optional<size_t> | |
get_default_retention_local_target_bytes() const = 0; | |
virtual std::chrono::milliseconds | |
get_default_retention_local_target_ms() const | |
= 0; | |
virtual std::optional<std::chrono::milliseconds> | |
get_default_segment_ms() const = 0; | |
virtual std::optional<std::chrono::milliseconds> | |
get_default_delete_retention_ms() const = 0; | |
virtual bool get_default_record_key_schema_id_validation() const = 0; | |
virtual pandaproxy::schema_registry::subject_name_strategy | |
get_default_record_key_subject_name_strategy() const | |
= 0; | |
virtual bool get_default_record_value_schema_id_validation() const = 0; | |
virtual pandaproxy::schema_registry::subject_name_strategy | |
get_default_record_value_subject_name_strategy() const | |
= 0; | |
virtual std::optional<size_t> | |
get_default_initial_retention_local_target_bytes() const = 0; | |
virtual std::optional<std::chrono::milliseconds> | |
get_default_initial_retention_local_target_ms() const = 0; | |
virtual std::chrono::milliseconds get_default_iceberg_target_lag_ms() const | |
= 0; | |
virtual std::optional<double> | |
get_default_min_cleanable_dirty_ratio() const = 0; | |
virtual std::chrono::milliseconds get_default_min_compaction_lag_ms() const | |
= 0; | |
virtual std::chrono::milliseconds get_default_max_compaction_lag_ms() const | |
= 0; | |
// Compression and cleanup policy defaults | |
virtual model::compression get_default_compression() const = 0; | |
virtual model::cleanup_policy_bitflags | |
get_default_cleanup_policy_bitflags() const = 0; | |
// Segment size defaults | |
virtual size_t get_default_compacted_topic_segment_size() const = 0; | |
virtual size_t get_default_segment_size() const = 0; | |
virtual std::optional<std::chrono::milliseconds> | |
get_default_segment_ms() const = 0; | |
// Retention policy defaults | |
virtual std::optional<std::chrono::milliseconds> | |
get_default_retention_duration() const = 0; | |
virtual std::optional<size_t> get_default_retention_bytes() const = 0; | |
virtual std::optional<size_t> | |
get_default_retention_local_target_bytes() const = 0; | |
virtual std::chrono::milliseconds | |
get_default_retention_local_target_ms() const = 0; | |
virtual std::optional<std::chrono::milliseconds> | |
get_default_delete_retention_ms() const = 0; | |
virtual std::optional<size_t> | |
get_default_initial_retention_local_target_bytes() const = 0; | |
virtual std::optional<std::chrono::milliseconds> | |
get_default_initial_retention_local_target_ms() const = 0; | |
// Timestamp and batch defaults | |
virtual model::timestamp_type get_default_timestamp_type() const = 0; | |
virtual uint32_t get_default_batch_max_bytes() const = 0; | |
// Shadow indexing defaults | |
virtual model::shadow_indexing_mode get_default_shadow_indexing_mode() const | |
= 0; | |
// Schema validation and subject name strategy defaults | |
virtual bool get_default_record_key_schema_id_validation() const = 0; | |
virtual pandaproxy::schema_registry::subject_name_strategy | |
get_default_record_key_subject_name_strategy() const = 0; | |
virtual bool get_default_record_value_schema_id_validation() const = 0; | |
virtual pandaproxy::schema_registry::subject_name_strategy | |
get_default_record_value_subject_name_strategy() const = 0; | |
// Compaction and iceberg defaults | |
virtual std::optional<double> | |
get_default_min_cleanable_dirty_ratio() const = 0; | |
virtual std::chrono::milliseconds get_default_min_compaction_lag_ms() const | |
= 0; | |
virtual std::chrono::milliseconds get_default_max_compaction_lag_ms() const | |
= 0; | |
virtual std::chrono::milliseconds get_default_iceberg_target_lag_ms() const | |
= 0; |
Copilot uses AI. Check for mistakes.
This PR is largely a non-functional change. It sets up some initial frameworks for unit testing the cluster linking components. It creates a fake "cluster" with some topics and configs, that can be used while doing unit testing of the metadata sync tasks (auto-topic creation, topic sync, etc). This PR also defines the initial abstract remote cluster connection class and factory to be used by unit tests and code.
The only 'functional' change was to modify the
kafka::server::make_topic_configs
method to improve it's usability for including it in unit tests without needing to stand up a fixture.Backports Required
Release Notes