Skip to content

[CORE-12248] - Define some test utilities for DR/CL #26528

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: dev
Choose a base branch
from

Conversation

michael-redpanda
Copy link
Contributor

This PR is largely a non-functional change. It sets up some initial frameworks for unit testing the cluster linking components. It creates a fake "cluster" with some topics and configs, that can be used while doing unit testing of the metadata sync tasks (auto-topic creation, topic sync, etc). This PR also defines the initial abstract remote cluster connection class and factory to be used by unit tests and code.

The only 'functional' change was to modify the kafka::server::make_topic_configs method to improve it's usability for including it in unit tests without needing to stand up a fixture.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v25.1.x
  • v24.3.x
  • v24.2.x

Release Notes

  • None

Added abstract class that will be used to connect to remote clusters and
a factory that will be used to create them on demand.

Signed-off-by: Michael Boquard <[email protected]>
(cherry picked from commit 9020f2dd4d6d1c039d8ef5fb6bca98d0b324f490)
(cherry picked from commit 72df6570ff0d34cc0a17ce071d64daf251f0a189)
Added a fake cluster representation to be used in unit tests

Signed-off-by: Michael Boquard <[email protected]>
(cherry picked from commit 9a61af5fea27830dc8d2bf4363fe4c85aee617f2)
Added an abstract class and metadata_cache_adapter that decouples
cluster::metadata_cache from the make_topic_configs function.  This
permits us to use that function in unit tests without having to create
an entire test fixture.

Signed-off-by: Michael Boquard <[email protected]>
(cherry picked from commit 2ddd05a55e6afd84c5ae4b40f863a9f8b324d630)
This class sets a fake remote client connection that can also be
overridden with other implementations to assist with unit testing
cluster linking.

Signed-off-by: Michael Boquard <[email protected]>
(cherry picked from commit c4f27abb96a892ac323d6d4658bea9f8347b8f26)
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR sets up the initial testing framework for cluster linking components, introducing a metadata cache abstraction in the Kafka config response utilities and test utilities for simulating remote cluster connections.

  • Introduces metadata_cache_info and metadata_cache_adapter in the Kafka config handler for better unit test integration.
  • Adds test utilities and source cluster implementations to simulate remote cluster behaviors in unit tests.
  • Updates remote cluster connection interfaces and BUILD dependencies to support the new testing framework.

Reviewed Changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
src/v/kafka/server/handlers/configs/config_response_utils.h Introduces metadata cache interface and adapter; adds inline overload for make_topic_configs to simplify test usage.
src/v/kafka/server/handlers/configs/config_response_utils.cc Implements metadata_cache_adapter methods and updates make_topic_configs to use the new abstraction.
src/v/cluster_link/tests/test_utils.h/cc Provides test connection and connection factory implementations for remote cluster connections.
src/v/cluster_link/tests/source_cluster.h/cc Adds source cluster and source topic test fixtures.
src/v/cluster_link/manager.h and BUILD Updates remote cluster connection interface and adds new dependencies in BUILD files.
Comments suppressed due to low confidence (1)

src/v/kafka/server/handlers/configs/config_response_utils.h:150

  • [nitpick] Add a brief comment clarifying the role of the metadata_cache_adapter in the conversion overload for make_topic_configs to help future maintainers understand the adapter logic.
inline config_response_container_t make_topic_configs(

Comment on lines +50 to +91
virtual model::compression get_default_compression() const = 0;
virtual model::cleanup_policy_bitflags
get_default_cleanup_policy_bitflags() const
= 0;
virtual size_t get_default_compacted_topic_segment_size() const = 0;
virtual size_t get_default_segment_size() const = 0;
virtual std::optional<std::chrono::milliseconds>
get_default_retention_duration() const = 0;
virtual std::optional<size_t> get_default_retention_bytes() const = 0;
virtual model::timestamp_type get_default_timestamp_type() const = 0;
virtual uint32_t get_default_batch_max_bytes() const = 0;
virtual model::shadow_indexing_mode get_default_shadow_indexing_mode() const
= 0;
virtual std::optional<size_t>
get_default_retention_local_target_bytes() const = 0;
virtual std::chrono::milliseconds
get_default_retention_local_target_ms() const
= 0;
virtual std::optional<std::chrono::milliseconds>
get_default_segment_ms() const = 0;
virtual std::optional<std::chrono::milliseconds>
get_default_delete_retention_ms() const = 0;
virtual bool get_default_record_key_schema_id_validation() const = 0;
virtual pandaproxy::schema_registry::subject_name_strategy
get_default_record_key_subject_name_strategy() const
= 0;
virtual bool get_default_record_value_schema_id_validation() const = 0;
virtual pandaproxy::schema_registry::subject_name_strategy
get_default_record_value_subject_name_strategy() const
= 0;
virtual std::optional<size_t>
get_default_initial_retention_local_target_bytes() const = 0;
virtual std::optional<std::chrono::milliseconds>
get_default_initial_retention_local_target_ms() const = 0;
virtual std::chrono::milliseconds get_default_iceberg_target_lag_ms() const
= 0;
virtual std::optional<double>
get_default_min_cleanable_dirty_ratio() const = 0;
virtual std::chrono::milliseconds get_default_min_compaction_lag_ms() const
= 0;
virtual std::chrono::milliseconds get_default_max_compaction_lag_ms() const
= 0;
Copy link
Preview

Copilot AI Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider grouping related default configuration methods together and adding inline comments to the metadata_cache_info interface to enhance its readability and maintainability.

Suggested change
virtual model::compression get_default_compression() const = 0;
virtual model::cleanup_policy_bitflags
get_default_cleanup_policy_bitflags() const
= 0;
virtual size_t get_default_compacted_topic_segment_size() const = 0;
virtual size_t get_default_segment_size() const = 0;
virtual std::optional<std::chrono::milliseconds>
get_default_retention_duration() const = 0;
virtual std::optional<size_t> get_default_retention_bytes() const = 0;
virtual model::timestamp_type get_default_timestamp_type() const = 0;
virtual uint32_t get_default_batch_max_bytes() const = 0;
virtual model::shadow_indexing_mode get_default_shadow_indexing_mode() const
= 0;
virtual std::optional<size_t>
get_default_retention_local_target_bytes() const = 0;
virtual std::chrono::milliseconds
get_default_retention_local_target_ms() const
= 0;
virtual std::optional<std::chrono::milliseconds>
get_default_segment_ms() const = 0;
virtual std::optional<std::chrono::milliseconds>
get_default_delete_retention_ms() const = 0;
virtual bool get_default_record_key_schema_id_validation() const = 0;
virtual pandaproxy::schema_registry::subject_name_strategy
get_default_record_key_subject_name_strategy() const
= 0;
virtual bool get_default_record_value_schema_id_validation() const = 0;
virtual pandaproxy::schema_registry::subject_name_strategy
get_default_record_value_subject_name_strategy() const
= 0;
virtual std::optional<size_t>
get_default_initial_retention_local_target_bytes() const = 0;
virtual std::optional<std::chrono::milliseconds>
get_default_initial_retention_local_target_ms() const = 0;
virtual std::chrono::milliseconds get_default_iceberg_target_lag_ms() const
= 0;
virtual std::optional<double>
get_default_min_cleanable_dirty_ratio() const = 0;
virtual std::chrono::milliseconds get_default_min_compaction_lag_ms() const
= 0;
virtual std::chrono::milliseconds get_default_max_compaction_lag_ms() const
= 0;
// Compression and cleanup policy defaults
virtual model::compression get_default_compression() const = 0;
virtual model::cleanup_policy_bitflags
get_default_cleanup_policy_bitflags() const = 0;
// Segment size defaults
virtual size_t get_default_compacted_topic_segment_size() const = 0;
virtual size_t get_default_segment_size() const = 0;
virtual std::optional<std::chrono::milliseconds>
get_default_segment_ms() const = 0;
// Retention policy defaults
virtual std::optional<std::chrono::milliseconds>
get_default_retention_duration() const = 0;
virtual std::optional<size_t> get_default_retention_bytes() const = 0;
virtual std::optional<size_t>
get_default_retention_local_target_bytes() const = 0;
virtual std::chrono::milliseconds
get_default_retention_local_target_ms() const = 0;
virtual std::optional<std::chrono::milliseconds>
get_default_delete_retention_ms() const = 0;
virtual std::optional<size_t>
get_default_initial_retention_local_target_bytes() const = 0;
virtual std::optional<std::chrono::milliseconds>
get_default_initial_retention_local_target_ms() const = 0;
// Timestamp and batch defaults
virtual model::timestamp_type get_default_timestamp_type() const = 0;
virtual uint32_t get_default_batch_max_bytes() const = 0;
// Shadow indexing defaults
virtual model::shadow_indexing_mode get_default_shadow_indexing_mode() const
= 0;
// Schema validation and subject name strategy defaults
virtual bool get_default_record_key_schema_id_validation() const = 0;
virtual pandaproxy::schema_registry::subject_name_strategy
get_default_record_key_subject_name_strategy() const = 0;
virtual bool get_default_record_value_schema_id_validation() const = 0;
virtual pandaproxy::schema_registry::subject_name_strategy
get_default_record_value_subject_name_strategy() const = 0;
// Compaction and iceberg defaults
virtual std::optional<double>
get_default_min_cleanable_dirty_ratio() const = 0;
virtual std::chrono::milliseconds get_default_min_compaction_lag_ms() const
= 0;
virtual std::chrono::milliseconds get_default_max_compaction_lag_ms() const
= 0;
virtual std::chrono::milliseconds get_default_iceberg_target_lag_ms() const
= 0;

Copilot uses AI. Check for mistakes.

@michael-redpanda michael-redpanda changed the title [CORE-12248] - [CORE-12248] - Define some test utilities for DR/CL Jun 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant