Skip to content

Commit 8e0a5e5

Browse files
committed
Cached total scan segment
1 parent d1cd60e commit 8e0a5e5

File tree

1 file changed

+29
-18
lines changed

1 file changed

+29
-18
lines changed

amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java

+29-18
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package software.amazon.kinesis.leases.dynamodb;
1616

1717
import java.time.Duration;
18+
import java.time.Instant;
1819
import java.util.AbstractMap;
1920
import java.util.ArrayList;
2021
import java.util.Collection;
@@ -132,6 +133,10 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher {
132133
private static final int MIN_SCAN_SEGMENTS = 1;
133134
private static final int MAX_SCAN_SEGMENTS = 30;
134135

136+
private volatile Integer cachedTotalSegments;
137+
private volatile Instant expirationTimeForTotalSegmentsCache;
138+
private static final Duration CACHE_DURATION_FOR_TOTAL_SEGMENTS = Duration.ofHours(2);
139+
135140
private static DdbTableConfig createDdbTableConfigFromBillingMode(final BillingMode billingMode) {
136141
final DdbTableConfig tableConfig = new DdbTableConfig();
137142
tableConfig.billingMode(billingMode);
@@ -560,18 +565,7 @@ public List<Lease> listLeases() throws DependencyException, InvalidStateExceptio
560565
public Map.Entry<List<Lease>, List<String>> listLeasesParallelyWithDynamicTotalSegments(
561566
final ExecutorService parallelScanExecutorService)
562567
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
563-
564-
int parallelScanTotalSegment = DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR;
565-
DescribeTableResponse describeTableResponse = describeLeaseTable();
566-
567-
if (describeTableResponse != null) {
568-
parallelScanTotalSegment =
569-
getParallelScanTotalSegments(describeTableResponse.table().tableSizeBytes());
570-
} else {
571-
log.info("DescribeTable returned null so using default totalSegments : {}", parallelScanTotalSegment);
572-
}
573-
574-
return listLeasesParallely(parallelScanExecutorService, parallelScanTotalSegment);
568+
return listLeasesParallely(parallelScanExecutorService, getParallelScanTotalSegments());
575569
}
576570

577571
@Override
@@ -623,13 +617,30 @@ public Map.Entry<List<Lease>, List<String>> listLeasesParallely(
623617
*
624618
* @return The number of segments to use for parallel scan, minimum 1
625619
*/
626-
private int getParallelScanTotalSegments(double tableSizeBytes) {
620+
private int getParallelScanTotalSegments() throws DependencyException {
621+
if (isTotalSegmentsCacheValid()) {
622+
log.info("Cached value used : TotalSegments for Lease table parallel scan : {}", cachedTotalSegments);
623+
return cachedTotalSegments;
624+
}
625+
626+
int parallelScanTotalSegments = DEFAULT_LEASE_TABLE_SCAN_PARALLELISM_FACTOR;
627+
DescribeTableResponse describeTableResponse = describeLeaseTable();
628+
629+
if (describeTableResponse == null) {
630+
log.info("DescribeTable returned null so using default totalSegments : {}", parallelScanTotalSegments);
631+
} else {
632+
final double tableSizeGB = (double) describeTableResponse.table().tableSizeBytes() / NUMBER_OF_BYTES_PER_GB;
633+
parallelScanTotalSegments = Math.min(
634+
Math.max((int) Math.ceil(tableSizeGB / GB_PER_SEGMENT), MIN_SCAN_SEGMENTS), MAX_SCAN_SEGMENTS);
635+
log.info("TotalSegments for Lease table parallel scan : {}", parallelScanTotalSegments);
636+
}
637+
cachedTotalSegments = parallelScanTotalSegments;
638+
expirationTimeForTotalSegmentsCache = Instant.now().plus(CACHE_DURATION_FOR_TOTAL_SEGMENTS);
639+
return parallelScanTotalSegments;
640+
}
627641

628-
double tableSizeGB = tableSizeBytes / NUMBER_OF_BYTES_PER_GB;
629-
int totalSegments =
630-
Math.min(Math.max((int) Math.ceil(tableSizeGB / GB_PER_SEGMENT), MIN_SCAN_SEGMENTS), MAX_SCAN_SEGMENTS);
631-
log.info("TotalSegments for Lease table parallel scan : {}", totalSegments);
632-
return totalSegments;
642+
private boolean isTotalSegmentsCacheValid() {
643+
return cachedTotalSegments != null && Instant.now().isBefore(expirationTimeForTotalSegmentsCache);
633644
}
634645

635646
private List<Map<String, AttributeValue>> scanSegment(final int segment, final int parallelScanTotalSegment)

0 commit comments

Comments
 (0)