Skip to content

Commit 9c95454

Browse files
authored
[fix][client] Fix consumer not returning encrypted messages on decryption failure with compression enabled (#24356)
## Motivation There is a problem when a consumer tries to decrypt a compressed message. If a consumer fails to decrypt a compressed message, it also fails to decompress it, leading to the message being discarded. The user won't receive the message, even if using `CryptoFailureAction.CONSUME`, because the failure occurs during decompression, not decryption. This issue could be easily reproduced by the test added in this PR: `testE2EEncryptionWithCompression `. We already have logic to skip decompression for messages that can't be decrypted: https://github.com/apache/pulsar/blob/397b0211cf1c9069d80daa691ca530e09bc37999/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1439-L1441 The main issue is if the consumer fails to decrypt the message here: https://github.com/apache/pulsar/blob/397b0211cf1c9069d80daa691ca530e09bc37999/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1947, it isn't treated as undecryptable and still tries to decompress. ## Modification - Refactor `decryptPayloadIfNeeded` to return a clearer result, `DecryptResult`, to guide the logic on whether the message was decrypted successfully or should be discarded. - If decryption fails, treat the message as undecryptable and skip decompression.
1 parent cb0fe5b commit 9c95454

File tree

2 files changed

+146
-13
lines changed

2 files changed

+146
-13
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5062,4 +5062,93 @@ public void testFencedLedger() throws Exception {
50625062
.createAsync().get(5, TimeUnit.SECONDS);
50635063
assertNotNull(reader);
50645064
}
5065+
5066+
/**
5067+
* Creates a CryptoKeyReader that always returns keys from specified file paths,
5068+
* regardless of the key name provided.
5069+
*
5070+
* <p>This method creates a CryptoKeyReader instance that reads the public and
5071+
* private keys from fixed file paths. The key name and metadata provided to
5072+
* the getPublicKey and getPrivateKey methods are ignored, and the keys are
5073+
* always read from the specified paths.</p>
5074+
*
5075+
* @param publicKeyPath the file path to the public key
5076+
* @param privateKeyPath the file path to the private key
5077+
* @return a CryptoKeyReader that reads keys from the specified file paths
5078+
* @throws AssertionError if the key files are not present or not readable
5079+
*/
5080+
private static CryptoKeyReader createFixedFileCryptoKeyReader(String publicKeyPath, String privateKeyPath) {
5081+
return new CryptoKeyReader() {
5082+
final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
5083+
5084+
@Override
5085+
public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
5086+
if (Files.isReadable(Paths.get(publicKeyPath))) {
5087+
try {
5088+
keyInfo.setKey(Files.readAllBytes(Paths.get(publicKeyPath)));
5089+
return keyInfo;
5090+
} catch (IOException e) {
5091+
Assert.fail("Failed to read certificate from " + publicKeyPath);
5092+
}
5093+
} else {
5094+
Assert.fail("Certificate file " + publicKeyPath + " is not present or not readable.");
5095+
}
5096+
return null;
5097+
}
5098+
5099+
@Override
5100+
public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
5101+
if (Files.isReadable(Paths.get(privateKeyPath))) {
5102+
try {
5103+
keyInfo.setKey(Files.readAllBytes(Paths.get(privateKeyPath)));
5104+
return keyInfo;
5105+
} catch (IOException e) {
5106+
Assert.fail("Failed to read certificate from " + privateKeyPath);
5107+
}
5108+
} else {
5109+
Assert.fail("Certificate file " + privateKeyPath + " is not present or not readable.");
5110+
}
5111+
return null;
5112+
}
5113+
};
5114+
}
5115+
5116+
@Test
5117+
public void testE2EEncryptionWithCompression() throws Exception {
5118+
final String topic = "persistent://my-property/my-ns/testE2EEncryptionWithCompression-" + UUID.randomUUID();
5119+
5120+
final var producer = pulsarClient.newProducer(Schema.STRING)
5121+
.topic(topic)
5122+
.addEncryptionKey("client-rsa.pem")
5123+
.cryptoKeyReader(createFixedFileCryptoKeyReader(
5124+
"./src/test/resources/certificate/public-key.client-rsa.pem",
5125+
"./src/test/resources/certificate/private-key.client-rsa.pem"
5126+
))
5127+
.compressionMinMsgBodySize(1) // enforce compression
5128+
.compressionType(CompressionType.LZ4)
5129+
.create();
5130+
5131+
final var consumer = pulsarClient.newConsumer(Schema.STRING)
5132+
.topic(topic)
5133+
.subscriptionName("test")
5134+
.cryptoKeyReader(createFixedFileCryptoKeyReader(
5135+
"./src/test/resources/certificate/public-key.client-mismatch-rsa.pem",
5136+
"./src/test/resources/certificate/private-key.client-mismatch-rsa.pem"
5137+
))
5138+
.cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
5139+
.subscribe();
5140+
5141+
for (int i = 0; i < 10; i++) {
5142+
producer.send("message-" + i);
5143+
}
5144+
5145+
for (int i = 0; i < 10; i++) {
5146+
final var msg = consumer.receive(5, TimeUnit.SECONDS);
5147+
assertNotNull(msg);
5148+
consumer.acknowledge(msg);
5149+
}
5150+
5151+
producer.close();
5152+
consumer.close();
5153+
}
50655154
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1456,16 +1456,18 @@ void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, Clien
14561456
return;
14571457
}
14581458

1459-
ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId, redeliveryCount, msgMetadata, headersAndPayload,
1459+
DecryptResult decryptResult = decryptPayloadIfNeeded(messageId, redeliveryCount, msgMetadata, headersAndPayload,
14601460
cnx);
14611461

1462-
boolean isMessageUndecryptable = isMessageUndecryptable(msgMetadata);
1463-
1464-
if (decryptedPayload == null) {
1462+
if (decryptResult.shouldDiscard()) {
14651463
// Message was discarded or CryptoKeyReader isn't implemented
14661464
return;
14671465
}
14681466

1467+
boolean isMessageUndecryptable = !decryptResult.success;
1468+
1469+
ByteBuf decryptedPayload = decryptResult.payload;
1470+
14691471
// uncompress decryptedPayload and release decryptedPayload-ByteBuf
14701472
ByteBuf uncompressedPayload = (isMessageUndecryptable || isChunkedMessage) ? decryptedPayload.retain()
14711473
: uncompressPayloadIfNeeded(messageId, msgMetadata, decryptedPayload, cnx, true);
@@ -1951,11 +1953,53 @@ public long getLastDisconnectedTimestamp() {
19511953
return connectionHandler.lastConnectionClosedTimestamp;
19521954
}
19531955

1954-
private ByteBuf decryptPayloadIfNeeded(MessageIdData messageId, int redeliveryCount, MessageMetadata msgMetadata,
1955-
ByteBuf payload, ClientCnx currentCnx) {
1956+
/**
1957+
* Represents the outcome of a message decryption attempt for the consumer.
1958+
*/
1959+
private static class DecryptResult {
1960+
private final boolean success;
1961+
private final ByteBuf payload;
1962+
1963+
private DecryptResult(boolean success, ByteBuf decryptedPayload) {
1964+
this.success = success;
1965+
this.payload = decryptedPayload;
1966+
}
1967+
1968+
/**
1969+
* Returns true if the message should be discarded and not delivered to the consumer user.
1970+
*/
1971+
public boolean shouldDiscard() {
1972+
return this.payload == null;
1973+
}
1974+
1975+
/**
1976+
* Creates a result indicating decryption succeeded and the payload is ready for use.
1977+
*/
1978+
public static DecryptResult success(ByteBuf decryptedPayload) {
1979+
return new DecryptResult(true, decryptedPayload);
1980+
}
1981+
1982+
/**
1983+
* Creates a result indicating decryption failed, but the message should still be delivered.
1984+
*/
1985+
public static DecryptResult failure(ByteBuf decryptedPayload) {
1986+
return new DecryptResult(false, decryptedPayload);
1987+
}
1988+
1989+
/**
1990+
* Creates a result indicating the message should be discarded.
1991+
*/
1992+
public static DecryptResult discard() {
1993+
return new DecryptResult(false, null);
1994+
}
1995+
}
1996+
1997+
private DecryptResult decryptPayloadIfNeeded(MessageIdData messageId, int redeliveryCount,
1998+
MessageMetadata msgMetadata,
1999+
ByteBuf payload, ClientCnx currentCnx) {
19562000

19572001
if (msgMetadata.getEncryptionKeysCount() == 0) {
1958-
return payload.retain();
2002+
return DecryptResult.success(payload.retain());
19592003
}
19602004
int batchSize = msgMetadata.getNumMessagesInBatch();
19612005
// If KeyReader is not configured throw exception based on config param
@@ -1969,15 +2013,15 @@ private ByteBuf decryptPayloadIfNeeded(MessageIdData messageId, int redeliveryCo
19692013
ByteBuffer nioDecryptedData = decryptedData.nioBuffer(0, maxDecryptedSize);
19702014
if (msgCrypto.decrypt(() -> msgMetadata, payload.nioBuffer(), nioDecryptedData, conf.getCryptoKeyReader())) {
19712015
decryptedData.writerIndex(nioDecryptedData.limit());
1972-
return decryptedData;
2016+
return DecryptResult.success(decryptedData);
19732017
}
19742018

19752019
decryptedData.release();
19762020

19772021
return handleCryptoFailure(payload, messageId, currentCnx, redeliveryCount, batchSize, false);
19782022
}
19792023

1980-
private ByteBuf handleCryptoFailure(ByteBuf payload, MessageIdData messageId, ClientCnx currentCnx,
2024+
private DecryptResult handleCryptoFailure(ByteBuf payload, MessageIdData messageId, ClientCnx currentCnx,
19812025
int redeliveryCount, int batchSize, boolean cryptoReaderNotExist) {
19822026

19832027
switch (conf.getCryptoFailureAction()) {
@@ -1990,7 +2034,7 @@ private ByteBuf handleCryptoFailure(ByteBuf payload, MessageIdData messageId, Cl
19902034
log.warn("[{}][{}][{}][{}] Decryption failed. Consuming encrypted message since config is set to"
19912035
+ " consume.", topic, subscription, consumerName, messageId);
19922036
}
1993-
return payload.retain();
2037+
return DecryptResult.failure(payload.retain());
19942038
case DISCARD:
19952039
if (cryptoReaderNotExist) {
19962040
log.warn(
@@ -2005,7 +2049,7 @@ private ByteBuf handleCryptoFailure(ByteBuf payload, MessageIdData messageId, Cl
20052049
messageId.getBatchIndex());
20062050
}
20072051
discardMessage(messageId, currentCnx, ValidationError.DecryptionError, batchSize);
2008-
return null;
2052+
return DecryptResult.discard();
20092053
case FAIL:
20102054
if (cryptoReaderNotExist) {
20112055
log.error(
@@ -2020,11 +2064,11 @@ private ByteBuf handleCryptoFailure(ByteBuf payload, MessageIdData messageId, Cl
20202064
}
20212065
MessageId m = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), partitionIndex);
20222066
unAckedMessageTracker.add(m, redeliveryCount);
2023-
return null;
2067+
return DecryptResult.discard();
20242068
default:
20252069
log.warn("[{}][{}][{}] Invalid crypto failure state found, continue message consumption.", topic,
20262070
subscription, consumerName);
2027-
return payload.retain();
2071+
return DecryptResult.failure(payload.retain());
20282072
}
20292073
}
20302074

0 commit comments

Comments
 (0)