Skip to content

Commit c0a46c1

Browse files
authored
BufferedStreamConsumerTest: remove non-determinism in size of generated test records (#9274)
* generate records fixed 40 bytes of size * fix buffer flush Signed-off-by: Sergey Chvalyuk <[email protected]>
1 parent 103e003 commit c0a46c1

File tree

2 files changed

+2
-2
lines changed

2 files changed

+2
-2
lines changed

airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ protected void acceptTracked(final AirbyteMessage message) throws Exception {
147147
// are serialized again when writing to
148148
// the destination
149149
long messageSizeInBytes = ByteUtils.getSizeInBytesForUTF8CharSet(Jsons.serialize(recordMessage.getData()));
150-
if (bufferSizeInBytes + messageSizeInBytes >= maxQueueSizeInBytes) {
150+
if (bufferSizeInBytes + messageSizeInBytes > maxQueueSizeInBytes) {
151151
LOGGER.info("Flushing buffer...");
152152
flushQueueToDestination();
153153
bufferSizeInBytes = 0;

airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ private static List<AirbyteMessage> generateRecords(final long targetSizeInBytes
315315
List<AirbyteMessage> output = Lists.newArrayList();
316316
long bytesCounter = 0;
317317
for (int i = 0;; i++) {
318-
JsonNode payload = Jsons.jsonNode(ImmutableMap.of("id", RandomStringUtils.randomAscii(7), "name", "human " + String.format("%5d", i)));
318+
JsonNode payload = Jsons.jsonNode(ImmutableMap.of("id", RandomStringUtils.randomAlphabetic(7), "name", "human " + String.format("%8d", i)));
319319
long sizeInBytes = ByteUtils.getSizeInBytesForUTF8CharSet(Jsons.serialize(payload));
320320
bytesCounter += sizeInBytes;
321321
AirbyteMessage airbyteMessage = new AirbyteMessage()

0 commit comments

Comments
 (0)