Skip to content

Commit 158f4cc

Browse files
authored
6722: Purge of batches before data causes appearance of stranded data (#226)
0006722: Purge of batches before data causes appearance of stranded data. Prevent NPE
1 parent 289f886 commit 158f4cc

File tree

4 files changed

+406
-18
lines changed

4 files changed

+406
-18
lines changed

symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1084,7 +1084,9 @@ protected IDataWriter chooseDataWriter(Batch batch) {
10841084
log.info("Bulk loader failed in class {} with message: {}", e.getClass().getName(), e.getMessage());
10851085
ctx.put(ContextConstants.CONTEXT_BULK_WRITER_TO_USE, "default");
10861086
ctx.setLastError(null);
1087-
listener.currentBatch.setStatus(Status.OK);
1087+
if (listener.currentBatch != null) {
1088+
listener.currentBatch.setStatus(Status.OK);
1089+
}
10881090
processor.setDataReader(buildDataReader(batchInStaging, resource));
10891091
try {
10901092
listener.getBatchesProcessed().remove(listener.currentBatch);
@@ -1099,7 +1101,7 @@ protected IDataWriter chooseDataWriter(Batch batch) {
10991101
}
11001102
} else {
11011103
isError = true;
1102-
if (listener.currentBatch.getSqlCode() == ErrorConstants.PROTOCOL_VIOLATION_CODE) {
1104+
if (listener.currentBatch != null && listener.currentBatch.getSqlCode() == ErrorConstants.PROTOCOL_VIOLATION_CODE) {
11031105
log.info("The batch {} may be corrupt in staging, so removing it.", batchInStaging.getNodeBatchId());
11041106
resource.delete();
11051107
incomingBatch = listener.currentBatch;

symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.util.List;
3030
import java.util.function.LongConsumer;
3131

32-
import org.apache.commons.lang3.StringUtils;
3332
import org.apache.commons.lang3.time.DateUtils;
3433
import org.apache.commons.lang3.time.FastDateFormat;
3534
import org.jumpmind.db.platform.DatabaseNamesConstants;
@@ -234,7 +233,7 @@ private long[] getMinMax(long[] minMax, long notOkBatchId, long[] rangeMinMax) {
234233
if (rangeMinMax[1] == minMax[1]) {
235234
minMax[1] = -1;
236235
} else {
237-
minMax[0] = notOkBatchId + 1;
236+
minMax[0] = notOkBatchId;
238237
}
239238
return minMax;
240239
}
@@ -281,8 +280,8 @@ private long purgeLingeringBatches(OutgoingContext context) {
281280
} else {
282281
break;
283282
}
284-
log.info("Done purging {} lingering batches and {} rows", totalBatchesPurged, totalRowsPurged);
285283
}
284+
log.info("Done purging {} lingering batches and {} rows", totalBatchesPurged, totalRowsPurged);
286285
return totalRowsPurged;
287286
}
288287

@@ -926,22 +925,25 @@ private OutgoingContext buildOutgoingContext(Calendar retentionCutoff) {
926925
context.setMinEventBatchId(startEventBatchId);
927926
// Leave 1 batch and its data around so MySQL auto increment doesn't reset
928927
long endBatchId = sequenceService.currVal(Constants.SEQUENCE_OUTGOING_BATCH) - 1;
929-
List<Long> batchIds = sqlTemplateDirty.query(getSql("maxBatchIdByChannel"), new LongMapper(),
928+
List<Long> batchIds = sqlTemplateDirty.query(getSql("maxBatchIdForOldBatches"), new LongMapper(),
930929
new Object[] { startBatchId, endBatchId, new Timestamp(context.getRetentionCutoff().getTime().getTime()) },
931930
new int[] { symmetricDialect.getSqlTypeForIds(), symmetricDialect.getSqlTypeForIds(), Types.TIMESTAMP });
932931
if (batchIds != null && batchIds.size() > 0) {
933-
int[] types = new int[batchIds.size()];
934-
for (int i = 0; i < batchIds.size(); i++) {
935-
types[i] = symmetricDialect.getSqlTypeForIds();
936-
if (batchIds.get(i) > context.getMaxBatchId()) {
937-
context.setMaxBatchId(batchIds.get(i));
932+
context.setMaxBatchId(batchIds.get(0));
933+
log.info("Max eligible batch ID: {}", context.getMaxBatchId());
934+
List<Row> rows = sqlTemplateDirty.query(getSql("minMaxDataIdForOldBatches"), new Object[] { startBatchId,
935+
context.getMaxBatchId() }, new int[] { symmetricDialect.getSqlTypeForIds(), symmetricDialect.getSqlTypeForIds() });
936+
if (rows != null && rows.size() > 0) {
937+
Row row = rows.get(0);
938+
long minDataId = row.getLong("min_data_id");
939+
long maxDataId = row.getLong("max_data_id");
940+
context.setMaxDataId(maxDataId);
941+
log.info("Max eligible data ID: {}", context.getMaxDataId());
942+
if (minDataId < context.getMinDataId()) {
943+
log.info("Moving starting data ID back from {} to {}", context.getMinDataId(), minDataId);
944+
context.setMinDataId(minDataId);
938945
}
939946
}
940-
String sql = getSql("maxDataIdForBatches").replace("?", StringUtils.repeat("?", ",", batchIds.size()));
941-
List<Long> ids = sqlTemplateDirty.query(sql, new LongMapper(), batchIds.toArray(), types);
942-
if (ids != null && ids.size() > 0) {
943-
context.setMaxDataId(ids.get(0));
944-
}
945947
}
946948
context.setMinDataGapStartId(sqlTemplateDirty.queryForLong(getSql("minDataGapStartId")));
947949
context.setDataGapsExpired(dataService.findDataGapsExpired());

symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,9 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replac
6767

6868
putSql("minDataEventId", "select min(data_id) from $(data_event)");
6969

70-
putSql("maxBatchIdByChannel", "select max(batch_id) from $(outgoing_batch) where batch_id between ? and ? and create_time < ? group by channel_id");
70+
putSql("maxBatchIdForOldBatches", "select max(batch_id) from $(outgoing_batch) where batch_id between ? and ? and create_time < ?");
7171

72-
putSql("maxDataIdForBatches", "select max(data_id) from $(data_event) where batch_id in (?)");
72+
putSql("minMaxDataIdForOldBatches", "select min(data_id) as min_data_id, max(data_id) as max_data_id from $(data_event) where batch_id between ? and ?");
7373

7474
putSql("selectNodesWithStrandedBatches", "select distinct node_id from $(outgoing_batch) " +
7575
"where node_id not in (select node_id from $(node) where sync_enabled = ?) and status != ?");

0 commit comments

Comments
 (0)