pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [pulsar] branch master updated: Redeliver messages that can't be decrypted. (#3097)
Date Fri, 30 Nov 2018 09:33:23 GMT
This is an automated email from the ASF dual-hosted git repository.

jai1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b3457e  Redeliver messages that can't be decrypted. (#3097)
3b3457e is described below

commit 3b3457e2253b9201c1fdd6826b52df12b17ef217
Author: Jai Asher <jai1@ccs.neu.edu>
AuthorDate: Fri Nov 30 01:33:19 2018 -0800

    Redeliver messages that can't be decrypted. (#3097)
---
 .../client/api/SimpleProducerConsumerTest.java     | 133 +++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  66 +++++-----
 2 files changed, 170 insertions(+), 29 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index f6b8e1a..fb47c50 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -26,6 +26,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -34,6 +35,7 @@ import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -2305,6 +2307,137 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase
{
         consumer.close();
         log.info("-- Exiting {} test --", methodName);
     }
+    
+    @Test(groups = "encryption")
+    public void testRedeliveryOfFailedMessages() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final String encryptionKeyName = "client-rsa.pem";
+        final String encryptionKeyVersion = "1.0";
+        Map<String, String> metadata = Maps.newHashMap();
+        metadata.put("version", encryptionKeyVersion);
+        class EncKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String>
keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." +
keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or
not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String>
keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." +
keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or
not readable.");
+                }
+                return null;
+            }
+        }
+        
+        class InvalidKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String>
keyMeta) {
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String>
metadata) {
+                return null;
+            }
+        }
+        
+        /*
+         * Redelivery functionality guarantees that customer will get a chance to process
the message again.
+         * In case of shared subscription eventually every client will get a chance to process
the message, till one of them acks it.
+         * 
+         * For client with Encryption enabled where in cases like a new production rollout
or a buggy client configuration, we might have a mismatch of consumers 
+         * - few which can decrypt, few which can't (due to errors or cryptoReader not configured).
+         * 
+         * In that case eventually all messages should be acked as long as there is a single
consumer who can decrypt the message.
+         * 
+         * Consumer 1 - Can decrypt message
+         * Consumer 2 - Has invalid Reader configured.
+         * Consumer 3 - Has no reader configured.
+         * 
+         */
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic1";
+        
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+                .cryptoKeyReader(new EncKeyReader()).create();
+
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+        
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new InvalidKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        Consumer<byte[]> consumer3 = pulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1,
TimeUnit.SECONDS).subscribe();
+        
+        int numberOfMessages = 100;
+        String message = "my-message";
+        Set<String> messages = new HashSet(); // Since messages are in random order
+        for (int i = 0; i<numberOfMessages; i++) {
+            producer.send((message + i).getBytes());
+        }
+        
+        // Consuming from consumer 2 and 3 
+        // no message should be returned since they can't decrypt the message
+        Message m = consumer2.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        m = consumer3.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        
+        for (int i = 0; i<numberOfMessages; i++) {
+            // All messages would be received by consumer 1 
+            m = consumer1.receive();
+            messages.add(new String(m.getData()));
+            consumer1.acknowledge(m);
+        }
+        
+        // Consuming from consumer 2 and 3 again just to be sure 
+        // no message should be returned since they can't decrypt the message
+        m = consumer2.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        m = consumer3.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        
+        // checking if all messages were received
+        for (int i = 0; i<numberOfMessages; i++) {
+            assertTrue(messages.contains((message + i)));
+        }
+        
+        consumer1.close();
+        consumer2.close();
+        consumer3.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
 
     @Test(groups = "encryption")
     public void testEncryptionFailure() throws Exception {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 76c4f53..6ae925d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1107,44 +1107,52 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements
ConnectionHandle
 
         // If KeyReader is not configured throw exception based on config param
         if (conf.getCryptoKeyReader() == null) {
-
-            if (conf.getCryptoFailureAction() == ConsumerCryptoFailureAction.CONSUME) {
-                log.warn("[{}][{}][{}] CryptoKeyReader interface is not implemented. Consuming
encrypted message.",
-                        topic, subscription, consumerName);
-                return payload.retain();
-            } else if (conf.getCryptoFailureAction() == ConsumerCryptoFailureAction.DISCARD)
{
-                log.warn(
-                        "[{}][{}][{}] Skipping decryption since CryptoKeyReader interface
is not implemented and config is set to discard",
-                        topic, subscription, consumerName);
-                discardMessage(messageId, currentCnx, ValidationError.DecryptionError);
-            } else {
-                log.error(
-                        "[{}][{}][{}] Message delivery failed since CryptoKeyReader interface
is not implemented to consume encrypted message",
-                        topic, subscription, consumerName);
+            switch (conf.getCryptoFailureAction()) {
+                case CONSUME:
+                    log.warn("[{}][{}][{}] CryptoKeyReader interface is not implemented.
Consuming encrypted message.",
+                            topic, subscription, consumerName);
+                    return payload.retain();
+                case DISCARD:
+                    log.warn(
+                            "[{}][{}][{}] Skipping decryption since CryptoKeyReader interface
is not implemented and config is set to discard",
+                            topic, subscription, consumerName);
+                    discardMessage(messageId, currentCnx, ValidationError.DecryptionError);
+                    return null;
+                case FAIL:
+                    MessageId m = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
partitionIndex);
+                    log.error(
+                            "[{}][{}][{}][{}] Message delivery failed since CryptoKeyReader
interface is not implemented to consume encrypted message",
+                             topic, subscription, consumerName, m);
+                    unAckedMessageTracker.add(m);
+                    return null;
             }
-            return null;
         }
 
         ByteBuf decryptedData = this.msgCrypto.decrypt(msgMetadata, payload, conf.getCryptoKeyReader());
         if (decryptedData != null) {
             return decryptedData;
         }
-
-        if (conf.getCryptoFailureAction() == ConsumerCryptoFailureAction.CONSUME) {
-            // Note, batch message will fail to consume even if config is set to consume
-            log.warn("[{}][{}][{}][{}] Decryption failed. Consuming encrypted message since
config is set to consume.",
-                    topic, subscription, consumerName, messageId);
-            return payload.retain();
-        } else if (conf.getCryptoFailureAction() == ConsumerCryptoFailureAction.DISCARD)
{
-            log.warn("[{}][{}][{}][{}] Discarding message since decryption failed and config
is set to discard", topic,
-                    subscription, consumerName, messageId);
-            discardMessage(messageId, currentCnx, ValidationError.DecryptionError);
-        } else {
-            log.error("[{}][{}][{}][{}] Message delivery failed since unable to decrypt incoming
message", topic,
-                    subscription, consumerName, messageId);
+        
+        switch (conf.getCryptoFailureAction()) {
+            case CONSUME:
+                // Note, batch message will fail to consume even if config is set to consume
+                log.warn("[{}][{}][{}][{}] Decryption failed. Consuming encrypted message
since config is set to consume.",
+                        topic, subscription, consumerName, messageId);
+                return payload.retain();
+            case DISCARD:
+                log.warn("[{}][{}][{}][{}] Discarding message since decryption failed and
config is set to discard", topic,
+                        subscription, consumerName, messageId);
+                discardMessage(messageId, currentCnx, ValidationError.DecryptionError);
+                return null;
+            case FAIL:
+                MessageId m = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
partitionIndex);
+                log.error(
+                        "[{}][{}][{}][{}] Message delivery failed since unable to decrypt
incoming message",
+                         topic, subscription, consumerName, m);
+                unAckedMessageTracker.add(m);
+                return null;
         }
         return null;
-
     }
 
     private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId, MessageMetadata msgMetadata,
ByteBuf payload,


Mime
View raw message