Skip to content
This repository was archived by the owner on Apr 11, 2024. It is now read-only.

Commit fb9e150

Browse files
authored
Refactoring gated and ref-counted interfaces and their implementations (opensearch-project#2396)
* Reducing duplication in plugins around ref-counted releasable classes Both AmazonEc2Reference and AmazonS3Reference duplicate the same logic - a subclass of AbstractRefCounted that also implements Releasable. This change centralizes this paradigm into a AbstractRefCountedReleasable class and supports both clients via generics. It also updates all fetching implementations to use the get() method instead of client() Signed-off-by: Kartik Ganesh <[email protected]> * Introduce Reference classes for the Closeable and AutoCloseable interfaces These classes allow you to wrap a reference instance with an onClose runnable that is executed when close() is invoked. Two separate classes are needed because the close() signatures for the two interfaces are different. This change takes the first step to have implementing classes extend from these generic superclasses, before attempting to remove the subclasses entirely. The get() method is also replaced throughout the code base. Note that there is also a separate Releasable interface that has a similar access pattern, but is implemented separately. This is used in AbstractRefCountedReleasable introduced in a prior commit Signed-off-by: Kartik Ganesh <[email protected]> * More improvements and refactoring * Functionality around one-way gating is now moved to a dedicated class - OneWayGate. This replaces duplicate functionality throughout the code. * The two *Reference classes have been renamed to Gated* since that better represents their functionality * The AbstractRefCountedReleasable has been improved to no longer be abstract by accepting the shutdown hook. This removes the need for the inner class in ReleasableBytesReference, and further simplifies the plugin subclasses (these could probably be removed entirely). * Finally, unit tests have been added for some classes Signed-off-by: Kartik Ganesh <[email protected]> * Added tests for GatedCloseable Also updated the license information in GatedAutoCloseableTests Signed-off-by: Kartik Ganesh <[email protected]> * Fixing license information in new files Signed-off-by: Kartik Ganesh <[email protected]> * Added unit tests for RefCountedReleasable Signed-off-by: Kartik Ganesh <[email protected]>
1 parent 5a9a114 commit fb9e150

File tree

33 files changed

+493
-197
lines changed

33 files changed

+493
-197
lines changed

libs/core/src/test/java/org/opensearch/common/util/concurrent/RefCountedTests.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@
3131

3232
package org.opensearch.common.util.concurrent;
3333

34+
import org.opensearch.common.concurrent.OneWayGate;
3435
import org.opensearch.test.OpenSearchTestCase;
3536
import org.hamcrest.Matchers;
3637

3738
import java.io.IOException;
3839
import java.util.concurrent.CopyOnWriteArrayList;
3940
import java.util.concurrent.CountDownLatch;
40-
import java.util.concurrent.atomic.AtomicBoolean;
4141

4242
import static org.hamcrest.Matchers.equalTo;
4343
import static org.hamcrest.Matchers.is;
@@ -138,19 +138,19 @@ public void run() {
138138

139139
private final class MyRefCounted extends AbstractRefCounted {
140140

141-
private final AtomicBoolean closed = new AtomicBoolean(false);
141+
private final OneWayGate gate = new OneWayGate();
142142

143143
MyRefCounted() {
144144
super("test");
145145
}
146146

147147
@Override
148148
protected void closeInternal() {
149-
this.closed.set(true);
149+
gate.close();
150150
}
151151

152152
public void ensureOpen() {
153-
if (closed.get()) {
153+
if (gate.isClosed()) {
154154
assert this.refCount() == 0;
155155
throw new IllegalStateException("closed");
156156
}

plugins/discovery-ec2/src/main/java/org/opensearch/discovery/ec2/AmazonEc2Reference.java

+3-30
Original file line numberDiff line numberDiff line change
@@ -33,42 +33,15 @@
3333
package org.opensearch.discovery.ec2;
3434

3535
import com.amazonaws.services.ec2.AmazonEC2;
36-
37-
import org.opensearch.common.lease.Releasable;
38-
import org.opensearch.common.util.concurrent.AbstractRefCounted;
36+
import org.opensearch.common.concurrent.RefCountedReleasable;
3937

4038
/**
4139
* Handles the shutdown of the wrapped {@link AmazonEC2} using reference
4240
* counting.
4341
*/
44-
public class AmazonEc2Reference extends AbstractRefCounted implements Releasable {
45-
46-
private final AmazonEC2 client;
42+
public class AmazonEc2Reference extends RefCountedReleasable<AmazonEC2> {
4743

4844
AmazonEc2Reference(AmazonEC2 client) {
49-
super("AWS_EC2_CLIENT");
50-
this.client = client;
45+
super("AWS_EC2_CLIENT", client, client::shutdown);
5146
}
52-
53-
/**
54-
* Call when the client is not needed anymore.
55-
*/
56-
@Override
57-
public void close() {
58-
decRef();
59-
}
60-
61-
/**
62-
* Returns the underlying `AmazonEC2` client. All method calls are permitted BUT
63-
* NOT shutdown. Shutdown is called when reference count reaches 0.
64-
*/
65-
public AmazonEC2 client() {
66-
return client;
67-
}
68-
69-
@Override
70-
protected void closeInternal() {
71-
client.shutdown();
72-
}
73-
7447
}

plugins/discovery-ec2/src/main/java/org/opensearch/discovery/ec2/AwsEc2SeedHostsProvider.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ protected List<TransportAddress> fetchDynamicNodes() {
129129
// NOTE: we don't filter by security group during the describe instances request for two reasons:
130130
// 1. differences in VPCs require different parameters during query (ID vs Name)
131131
// 2. We want to use two different strategies: (all security groups vs. any security groups)
132-
descInstances = SocketAccess.doPrivileged(() -> clientReference.client().describeInstances(buildDescribeInstancesRequest()));
132+
descInstances = SocketAccess.doPrivileged(() -> clientReference.get().describeInstances(buildDescribeInstancesRequest()));
133133
} catch (final AmazonClientException e) {
134134
logger.info("Exception while retrieving instance list from AWS API: {}", e.getMessage());
135135
logger.debug("Full exception:", e);

plugins/discovery-ec2/src/test/java/org/opensearch/discovery/ec2/Ec2DiscoveryPluginTests.java

+20-20
Original file line numberDiff line numberDiff line change
@@ -103,15 +103,15 @@ public void testNodeAttributesErrorLenient() throws Exception {
103103

104104
public void testDefaultEndpoint() throws IOException {
105105
try (Ec2DiscoveryPluginMock plugin = new Ec2DiscoveryPluginMock(Settings.EMPTY)) {
106-
final String endpoint = ((AmazonEC2Mock) plugin.ec2Service.client().client()).endpoint;
106+
final String endpoint = ((AmazonEC2Mock) plugin.ec2Service.client().get()).endpoint;
107107
assertThat(endpoint, is(""));
108108
}
109109
}
110110

111111
public void testSpecificEndpoint() throws IOException {
112112
final Settings settings = Settings.builder().put(Ec2ClientSettings.ENDPOINT_SETTING.getKey(), "ec2.endpoint").build();
113113
try (Ec2DiscoveryPluginMock plugin = new Ec2DiscoveryPluginMock(settings)) {
114-
final String endpoint = ((AmazonEC2Mock) plugin.ec2Service.client().client()).endpoint;
114+
final String endpoint = ((AmazonEC2Mock) plugin.ec2Service.client().get()).endpoint;
115115
assertThat(endpoint, is("ec2.endpoint"));
116116
}
117117
}
@@ -150,7 +150,7 @@ public void testClientSettingsReInit() throws IOException {
150150
try (Ec2DiscoveryPluginMock plugin = new Ec2DiscoveryPluginMock(settings1)) {
151151
try (AmazonEc2Reference clientReference = plugin.ec2Service.client()) {
152152
{
153-
final AWSCredentials credentials = ((AmazonEC2Mock) clientReference.client()).credentials.getCredentials();
153+
final AWSCredentials credentials = ((AmazonEC2Mock) clientReference.get()).credentials.getCredentials();
154154
assertThat(credentials.getAWSAccessKeyId(), is("ec2_access_1"));
155155
assertThat(credentials.getAWSSecretKey(), is("ec2_secret_1"));
156156
if (mockSecure1HasSessionToken) {
@@ -159,32 +159,32 @@ public void testClientSettingsReInit() throws IOException {
159159
} else {
160160
assertThat(credentials, instanceOf(BasicAWSCredentials.class));
161161
}
162-
assertThat(((AmazonEC2Mock) clientReference.client()).configuration.getProxyUsername(), is("proxy_username_1"));
163-
assertThat(((AmazonEC2Mock) clientReference.client()).configuration.getProxyPassword(), is("proxy_password_1"));
164-
assertThat(((AmazonEC2Mock) clientReference.client()).configuration.getProxyHost(), is("proxy_host_1"));
165-
assertThat(((AmazonEC2Mock) clientReference.client()).configuration.getProxyPort(), is(881));
166-
assertThat(((AmazonEC2Mock) clientReference.client()).endpoint, is("ec2_endpoint_1"));
162+
assertThat(((AmazonEC2Mock) clientReference.get()).configuration.getProxyUsername(), is("proxy_username_1"));
163+
assertThat(((AmazonEC2Mock) clientReference.get()).configuration.getProxyPassword(), is("proxy_password_1"));
164+
assertThat(((AmazonEC2Mock) clientReference.get()).configuration.getProxyHost(), is("proxy_host_1"));
165+
assertThat(((AmazonEC2Mock) clientReference.get()).configuration.getProxyPort(), is(881));
166+
assertThat(((AmazonEC2Mock) clientReference.get()).endpoint, is("ec2_endpoint_1"));
167167
}
168168
// reload secure settings2
169169
plugin.reload(settings2);
170170
// client is not released, it is still using the old settings
171171
{
172-
final AWSCredentials credentials = ((AmazonEC2Mock) clientReference.client()).credentials.getCredentials();
172+
final AWSCredentials credentials = ((AmazonEC2Mock) clientReference.get()).credentials.getCredentials();
173173
if (mockSecure1HasSessionToken) {
174174
assertThat(credentials, instanceOf(BasicSessionCredentials.class));
175175
assertThat(((BasicSessionCredentials) credentials).getSessionToken(), is("ec2_session_token_1"));
176176
} else {
177177
assertThat(credentials, instanceOf(BasicAWSCredentials.class));
178178
}
179-
assertThat(((AmazonEC2Mock) clientReference.client()).configuration.getProxyUsername(), is("proxy_username_1"));
180-
assertThat(((AmazonEC2Mock) clientReference.client()).configuration.getProxyPassword(), is("proxy_password_1"));
181-
assertThat(((AmazonEC2Mock) clientReference.client()).configuration.getProxyHost(), is("proxy_host_1"));
182-
assertThat(((AmazonEC2Mock) clientReference.client()).configuration.getProxyPort(), is(881));
183-
assertThat(((AmazonEC2Mock) clientReference.client()).endpoint, is("ec2_endpoint_1"));
179+
assertThat(((AmazonEC2Mock) clientReference.get()).configuration.getProxyUsername(), is("proxy_username_1"));
180+
assertThat(((AmazonEC2Mock) clientReference.get()).configuration.getProxyPassword(), is("proxy_password_1"));
181+
assertThat(((AmazonEC2Mock) clientReference.get()).configuration.getProxyHost(), is("proxy_host_1"));
182+
assertThat(((AmazonEC2Mock) clientReference.get()).configuration.getProxyPort(), is(881));
183+
assertThat(((AmazonEC2Mock) clientReference.get()).endpoint, is("ec2_endpoint_1"));
184184
}
185185
}
186186
try (AmazonEc2Reference clientReference = plugin.ec2Service.client()) {
187-
final AWSCredentials credentials = ((AmazonEC2Mock) clientReference.client()).credentials.getCredentials();
187+
final AWSCredentials credentials = ((AmazonEC2Mock) clientReference.get()).credentials.getCredentials();
188188
assertThat(credentials.getAWSAccessKeyId(), is("ec2_access_2"));
189189
assertThat(credentials.getAWSSecretKey(), is("ec2_secret_2"));
190190
if (mockSecure2HasSessionToken) {
@@ -193,11 +193,11 @@ public void testClientSettingsReInit() throws IOException {
193193
} else {
194194
assertThat(credentials, instanceOf(BasicAWSCredentials.class));
195195
}
196-
assertThat(((AmazonEC2Mock) clientReference.client()).configuration.getProxyUsername(), is("proxy_username_2"));
197-
assertThat(((AmazonEC2Mock) clientReference.client()).configuration.getProxyPassword(), is("proxy_password_2"));
198-
assertThat(((AmazonEC2Mock) clientReference.client()).configuration.getProxyHost(), is("proxy_host_2"));
199-
assertThat(((AmazonEC2Mock) clientReference.client()).configuration.getProxyPort(), is(882));
200-
assertThat(((AmazonEC2Mock) clientReference.client()).endpoint, is("ec2_endpoint_2"));
196+
assertThat(((AmazonEC2Mock) clientReference.get()).configuration.getProxyUsername(), is("proxy_username_2"));
197+
assertThat(((AmazonEC2Mock) clientReference.get()).configuration.getProxyPassword(), is("proxy_password_2"));
198+
assertThat(((AmazonEC2Mock) clientReference.get()).configuration.getProxyHost(), is("proxy_host_2"));
199+
assertThat(((AmazonEC2Mock) clientReference.get()).configuration.getProxyPort(), is(882));
200+
assertThat(((AmazonEC2Mock) clientReference.get()).endpoint, is("ec2_endpoint_2"));
201201
}
202202
}
203203
}

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/AmazonS3Reference.java

+3-31
Original file line numberDiff line numberDiff line change
@@ -32,45 +32,17 @@
3232

3333
package org.opensearch.repositories.s3;
3434

35-
import org.opensearch.common.util.concurrent.AbstractRefCounted;
36-
3735
import com.amazonaws.services.s3.AmazonS3;
3836
import com.amazonaws.services.s3.AmazonS3Client;
39-
40-
import org.opensearch.common.lease.Releasable;
37+
import org.opensearch.common.concurrent.RefCountedReleasable;
4138

4239
/**
4340
* Handles the shutdown of the wrapped {@link AmazonS3Client} using reference
4441
* counting.
4542
*/
46-
public class AmazonS3Reference extends AbstractRefCounted implements Releasable {
47-
48-
private final AmazonS3 client;
43+
public class AmazonS3Reference extends RefCountedReleasable<AmazonS3> {
4944

5045
AmazonS3Reference(AmazonS3 client) {
51-
super("AWS_S3_CLIENT");
52-
this.client = client;
53-
}
54-
55-
/**
56-
* Call when the client is not needed anymore.
57-
*/
58-
@Override
59-
public void close() {
60-
decRef();
46+
super("AWS_S3_CLIENT", client, client::shutdown);
6147
}
62-
63-
/**
64-
* Returns the underlying `AmazonS3` client. All method calls are permitted BUT
65-
* NOT shutdown. Shutdown is called when reference count reaches 0.
66-
*/
67-
public AmazonS3 client() {
68-
return client;
69-
}
70-
71-
@Override
72-
protected void closeInternal() {
73-
client.shutdown();
74-
}
75-
7648
}

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

+11-11
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ class S3BlobContainer extends AbstractBlobContainer {
101101
@Override
102102
public boolean blobExists(String blobName) {
103103
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
104-
return SocketAccess.doPrivileged(() -> clientReference.client().doesObjectExist(blobStore.bucket(), buildKey(blobName)));
104+
return SocketAccess.doPrivileged(() -> clientReference.get().doesObjectExist(blobStore.bucket(), buildKey(blobName)));
105105
} catch (final Exception e) {
106106
throw new BlobStoreException("Failed to check if blob [" + blobName + "] exists", e);
107107
}
@@ -169,13 +169,13 @@ public DeleteResult delete() throws IOException {
169169
ObjectListing list;
170170
if (prevListing != null) {
171171
final ObjectListing finalPrevListing = prevListing;
172-
list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(finalPrevListing));
172+
list = SocketAccess.doPrivileged(() -> clientReference.get().listNextBatchOfObjects(finalPrevListing));
173173
} else {
174174
final ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
175175
listObjectsRequest.setBucketName(blobStore.bucket());
176176
listObjectsRequest.setPrefix(keyPath);
177177
listObjectsRequest.setRequestMetricCollector(blobStore.listMetricCollector);
178-
list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest));
178+
list = SocketAccess.doPrivileged(() -> clientReference.get().listObjects(listObjectsRequest));
179179
}
180180
final List<String> blobsToDelete = new ArrayList<>();
181181
list.getObjectSummaries().forEach(s3ObjectSummary -> {
@@ -236,7 +236,7 @@ private void doDeleteBlobs(List<String> blobNames, boolean relative) throws IOEx
236236
.map(DeleteObjectsRequest.KeyVersion::getKey)
237237
.collect(Collectors.toList());
238238
try {
239-
clientReference.client().deleteObjects(deleteRequest);
239+
clientReference.get().deleteObjects(deleteRequest);
240240
outstanding.removeAll(keysInRequest);
241241
} catch (MultiObjectDeleteException e) {
242242
// We are sending quiet mode requests so we can't use the deleted keys entry on the exception and instead
@@ -324,9 +324,9 @@ private static List<ObjectListing> executeListing(AmazonS3Reference clientRefere
324324
ObjectListing list;
325325
if (prevListing != null) {
326326
final ObjectListing finalPrevListing = prevListing;
327-
list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(finalPrevListing));
327+
list = SocketAccess.doPrivileged(() -> clientReference.get().listNextBatchOfObjects(finalPrevListing));
328328
} else {
329-
list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest));
329+
list = SocketAccess.doPrivileged(() -> clientReference.get().listObjects(listObjectsRequest));
330330
}
331331
results.add(list);
332332
if (list.isTruncated()) {
@@ -374,7 +374,7 @@ void executeSingleUpload(final S3BlobStore blobStore, final String blobName, fin
374374
putRequest.setRequestMetricCollector(blobStore.putMetricCollector);
375375

376376
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
377-
SocketAccess.doPrivilegedVoid(() -> { clientReference.client().putObject(putRequest); });
377+
SocketAccess.doPrivilegedVoid(() -> { clientReference.get().putObject(putRequest); });
378378
} catch (final AmazonClientException e) {
379379
throw new IOException("Unable to upload object [" + blobName + "] using a single upload", e);
380380
}
@@ -413,7 +413,7 @@ void executeMultipartUpload(final S3BlobStore blobStore, final String blobName,
413413
}
414414
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
415415

416-
uploadId.set(SocketAccess.doPrivileged(() -> clientReference.client().initiateMultipartUpload(initRequest).getUploadId()));
416+
uploadId.set(SocketAccess.doPrivileged(() -> clientReference.get().initiateMultipartUpload(initRequest).getUploadId()));
417417
if (Strings.isEmpty(uploadId.get())) {
418418
throw new IOException("Failed to initialize multipart upload " + blobName);
419419
}
@@ -439,7 +439,7 @@ void executeMultipartUpload(final S3BlobStore blobStore, final String blobName,
439439
}
440440
bytesCount += uploadRequest.getPartSize();
441441

442-
final UploadPartResult uploadResponse = SocketAccess.doPrivileged(() -> clientReference.client().uploadPart(uploadRequest));
442+
final UploadPartResult uploadResponse = SocketAccess.doPrivileged(() -> clientReference.get().uploadPart(uploadRequest));
443443
parts.add(uploadResponse.getPartETag());
444444
}
445445

@@ -456,7 +456,7 @@ void executeMultipartUpload(final S3BlobStore blobStore, final String blobName,
456456
parts
457457
);
458458
complRequest.setRequestMetricCollector(blobStore.multiPartUploadMetricCollector);
459-
SocketAccess.doPrivilegedVoid(() -> clientReference.client().completeMultipartUpload(complRequest));
459+
SocketAccess.doPrivilegedVoid(() -> clientReference.get().completeMultipartUpload(complRequest));
460460
success = true;
461461

462462
} catch (final AmazonClientException e) {
@@ -465,7 +465,7 @@ void executeMultipartUpload(final S3BlobStore blobStore, final String blobName,
465465
if ((success == false) && Strings.hasLength(uploadId.get())) {
466466
final AbortMultipartUploadRequest abortRequest = new AbortMultipartUploadRequest(bucketName, blobName, uploadId.get());
467467
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
468-
SocketAccess.doPrivilegedVoid(() -> clientReference.client().abortMultipartUpload(abortRequest));
468+
SocketAccess.doPrivilegedVoid(() -> clientReference.get().abortMultipartUpload(abortRequest));
469469
}
470470
}
471471
}

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RetryingInputStream.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ private void openStream() throws IOException {
110110
+ end;
111111
getObjectRequest.setRange(Math.addExact(start, currentOffset), end);
112112
}
113-
final S3Object s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest));
113+
final S3Object s3Object = SocketAccess.doPrivileged(() -> clientReference.get().getObject(getObjectRequest));
114114
this.currentStreamLastOffset = Math.addExact(Math.addExact(start, currentOffset), getStreamLength(s3Object));
115115
this.currentStream = s3Object.getObjectContent();
116116
} catch (final AmazonClientException e) {

0 commit comments

Comments
 (0)