Skip to content

Commit 76d99b9

Browse files
akashthawaitccpawankashyapolliontaherkltaherkl
authored
WideRowHandling: spanner to source db IT (#2252)
* Wide row rr handling (#148) * Resolve Merge Conflict * resolve merge conflict * Wide row rr fixes (#155) * Added allowes Packet Size * Added and remove unwanted Boundry Check * IT Resource Exhausted Fixes * Added allowes Packet Size * Added and remove unwanted Boundry Check * Added Foxes for Max in Size IT * Added Fixes for max cols * Added 10mb * Added Fixes * Added Fixes * Spotless fixes * Added Fixes * Added FIxes * removed unwanted * updated the Comments * PR Review comment fixes * rename function * update comments * Added CopyRight Fixes * Removed and Cleanup code (#194) * fix checkstyle * fix schema creation for N columns * fix changestream issue * widerow: add missing changes --------- Co-authored-by: pawankashyapollion <[email protected]> Co-authored-by: Taher Lakdawala <[email protected]> Co-authored-by: taherkl <[email protected]>
1 parent ec561a0 commit 76d99b9

13 files changed

+1677
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/*
2+
* Copyright (C) 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.templates;
17+
18+
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.CASSANDRA_SOURCE_TYPE;
19+
import static com.google.common.truth.Truth.assertThat;
20+
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline;
21+
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;
22+
import static org.junit.Assert.assertEquals;
23+
24+
import com.datastax.oss.driver.api.core.cql.ResultSet;
25+
import com.datastax.oss.driver.api.core.cql.Row;
26+
import com.google.cloud.spanner.Mutation;
27+
import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
28+
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
29+
import com.google.pubsub.v1.SubscriptionName;
30+
import java.io.IOException;
31+
import java.time.Duration;
32+
import java.util.ArrayList;
33+
import java.util.HashSet;
34+
import java.util.List;
35+
import org.apache.beam.it.cassandra.CassandraResourceManager;
36+
import org.apache.beam.it.common.PipelineLauncher;
37+
import org.apache.beam.it.common.PipelineOperator;
38+
import org.apache.beam.it.common.utils.ResourceManagerUtils;
39+
import org.apache.beam.it.gcp.pubsub.PubsubResourceManager;
40+
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
41+
import org.apache.beam.it.gcp.storage.GcsResourceManager;
42+
import org.junit.AfterClass;
43+
import org.junit.Before;
44+
import org.junit.Test;
45+
import org.junit.experimental.categories.Category;
46+
import org.junit.runner.RunWith;
47+
import org.junit.runners.JUnit4;
48+
import org.slf4j.Logger;
49+
import org.slf4j.LoggerFactory;
50+
51+
/** Integration test for {@link SpannerToSourceDb} Flex template with max number of columns. */
52+
@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class})
53+
@TemplateIntegrationTest(SpannerToSourceDb.class)
54+
@RunWith(JUnit4.class)
55+
public class SpannerToCassandraSourceDbMaxColumnsIT extends SpannerToSourceDbITBase {
56+
57+
private static final Logger LOG =
58+
LoggerFactory.getLogger(SpannerToCassandraSourceDbMaxColumnsIT.class);
59+
60+
private static final int NUM_COLS = 1024;
61+
private static final int NUM_NON_KEY_COLS = 1023;
62+
private static final String PRIMARY_KEY = "id";
63+
private static final String SECONDARY_KEY_PREFIX = "col_";
64+
private static final String CASSANDRA_CONFIG_FILE_RESOURCE =
65+
"SpannerToSourceDbWideRowIT/cassandra-config-template.conf";
66+
67+
private static final String TEST_TABLE = "testtable";
68+
private static final String COLUMN_SIZE = "100";
69+
private static final HashSet<SpannerToCassandraSourceDbMaxColumnsIT> testInstances =
70+
new HashSet<>();
71+
private static PipelineLauncher.LaunchInfo jobInfo;
72+
public static SpannerResourceManager spannerResourceManager;
73+
private static SpannerResourceManager spannerMetadataResourceManager;
74+
public static CassandraResourceManager cassandraResourceManager;
75+
private static GcsResourceManager gcsResourceManager;
76+
private static PubsubResourceManager pubsubResourceManager;
77+
private SubscriptionName subscriptionName;
78+
private final List<Throwable> assertionErrors = new ArrayList<>();
79+
80+
@Before
81+
public void setUp() throws Exception {
82+
skipBaseCleanup = true;
83+
synchronized (SpannerToCassandraSourceDbMaxColumnsIT.class) {
84+
testInstances.add(this);
85+
if (jobInfo == null) {
86+
spannerResourceManager =
87+
createSpannerDBAndTableWithNColumns(TEST_TABLE, NUM_NON_KEY_COLS, COLUMN_SIZE);
88+
spannerMetadataResourceManager = createSpannerMetadataDatabase();
89+
90+
cassandraResourceManager = generateKeyspaceAndBuildCassandraResource();
91+
gcsResourceManager =
92+
GcsResourceManager.builder(artifactBucketName, getClass().getSimpleName(), credentials)
93+
.build();
94+
createAndUploadCassandraConfigToGcs(
95+
gcsResourceManager, cassandraResourceManager, CASSANDRA_CONFIG_FILE_RESOURCE);
96+
createCassandraTableWithNColumns(cassandraResourceManager, TEST_TABLE, NUM_NON_KEY_COLS);
97+
pubsubResourceManager = setUpPubSubResourceManager();
98+
subscriptionName =
99+
createPubsubResources(
100+
getClass().getSimpleName(),
101+
pubsubResourceManager,
102+
getGcsPath("dlq", gcsResourceManager).replace("gs://" + artifactBucketName, ""),
103+
gcsResourceManager);
104+
jobInfo =
105+
launchDataflowJob(
106+
gcsResourceManager,
107+
spannerResourceManager,
108+
spannerMetadataResourceManager,
109+
subscriptionName.toString(),
110+
null,
111+
null,
112+
null,
113+
null,
114+
null,
115+
CASSANDRA_SOURCE_TYPE);
116+
}
117+
}
118+
}
119+
120+
@AfterClass
121+
public static void cleanUp() throws IOException {
122+
for (SpannerToCassandraSourceDbMaxColumnsIT instance : testInstances) {
123+
instance.tearDownBase();
124+
}
125+
ResourceManagerUtils.cleanResources(
126+
spannerResourceManager,
127+
cassandraResourceManager,
128+
spannerMetadataResourceManager,
129+
gcsResourceManager,
130+
pubsubResourceManager);
131+
}
132+
133+
/**
134+
* Retrieves the total row count of a specified table in Cassandra.
135+
*
136+
* <p>This method executes a `SELECT COUNT(*)` query on the given table and returns the number of
137+
* rows present in it.
138+
*
139+
* @param tableName the name of the table whose row count is to be retrieved.
140+
* @return the total number of rows in the specified table.
141+
* @throws RuntimeException if the query does not return a result.
142+
*/
143+
private long getRowCount(String tableName) {
144+
String query = String.format("SELECT COUNT(*) FROM %s", tableName);
145+
ResultSet resultSet = cassandraResourceManager.executeStatement(query);
146+
Row row = resultSet.one();
147+
if (row != null) {
148+
return row.getLong(0);
149+
} else {
150+
throw new RuntimeException("Query did not return a result for table: " + tableName);
151+
}
152+
}
153+
154+
/** Writes a row with 1,024 columns in Spanner and verifies replication to Cassandra. */
155+
@Test
156+
public void testSpannerToCassandraWithMaxColumns() throws InterruptedException, IOException {
157+
assertThatPipeline(jobInfo).isRunning();
158+
writeRowWithMaxColumnsInSpanner();
159+
assertRowWithMaxColumnsInCassandra();
160+
}
161+
162+
private void writeRowWithMaxColumnsInSpanner() {
163+
List<Mutation> mutations = new ArrayList<>();
164+
Mutation.WriteBuilder mutationBuilder =
165+
Mutation.newInsertOrUpdateBuilder(TEST_TABLE).set(PRIMARY_KEY).to("SampleTest");
166+
167+
for (int i = 1; i < NUM_COLS; i++) {
168+
mutationBuilder.set(SECONDARY_KEY_PREFIX + i).to("TestValue_" + i);
169+
}
170+
171+
mutations.add(mutationBuilder.build());
172+
spannerResourceManager.write(mutations);
173+
LOG.info("Inserted row with 1,024 columns into Spanner using Mutations");
174+
}
175+
176+
private void assertRowWithMaxColumnsInCassandra() {
177+
178+
PipelineOperator.Result result =
179+
pipelineOperator()
180+
.waitForCondition(
181+
createConfig(jobInfo, Duration.ofMinutes(15)), () -> getRowCount(TEST_TABLE) == 1);
182+
assertThatResult(result).meetsConditions();
183+
184+
Iterable<Row> rows;
185+
try {
186+
rows = cassandraResourceManager.readTable(TEST_TABLE);
187+
} catch (Exception e) {
188+
throw new RuntimeException("Failed to read from Cassandra table: " + TEST_TABLE, e);
189+
}
190+
191+
assertThat(rows).hasSize(1);
192+
for (Row row : rows) {
193+
LOG.info("Cassandra Row to Assert for All Data Types: {}", row.getFormattedContents());
194+
String primaryKeyColumn = row.getString(PRIMARY_KEY);
195+
assertEquals("SampleTest", primaryKeyColumn);
196+
for (int i = 1; i < NUM_COLS; i++) {
197+
assertEquals("TestValue_" + i, row.getString(SECONDARY_KEY_PREFIX + i));
198+
}
199+
}
200+
LOG.info("Successfully validated 1,024 columns in Cassandra");
201+
}
202+
}

0 commit comments

Comments
 (0)