pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] jai1 closed pull request #3097: Redeliver messages that can't be decrypted.
Date Fri, 30 Nov 2018 09:33:20 GMT
jai1 closed pull request #3097: Redeliver messages that can't be decrypted.
URL: https://github.com/apache/pulsar/pull/3097
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index f6b8e1a518..fb47c5052d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -26,6 +26,7 @@
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -34,6 +35,7 @@
 import java.lang.reflect.Modifier;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -2305,6 +2307,137 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String,
String> keyMe
         consumer.close();
         log.info("-- Exiting {} test --", methodName);
     }
+    
+    @Test(groups = "encryption")
+    public void testRedeliveryOfFailedMessages() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final String encryptionKeyName = "client-rsa.pem";
+        final String encryptionKeyVersion = "1.0";
+        Map<String, String> metadata = Maps.newHashMap();
+        metadata.put("version", encryptionKeyVersion);
+        class EncKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String>
keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." +
keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or
not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String>
keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." +
keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or
not readable.");
+                }
+                return null;
+            }
+        }
+        
+        class InvalidKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String>
keyMeta) {
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String>
metadata) {
+                return null;
+            }
+        }
+        
+        /*
+         * Redelivery functionality guarantees that customer will get a chance to process
the message again.
+         * In case of shared subscription eventually every client will get a chance to process
the message, till one of them acks it.
+         * 
+         * For client with Encryption enabled where in cases like a new production rollout
or a buggy client configuration, we might have a mismatch of consumers 
+         * - few which can decrypt, few which can't (due to errors or cryptoReader not configured).
+         * 
+         * In that case eventually all messages should be acked as long as there is a single
consumer who can decrypt the message.
+         * 
+         * Consumer 1 - Can decrypt message
+         * Consumer 2 - Has invalid Reader configured.
+         * Consumer 3 - Has no reader configured.
+         * 
+         */
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic1";
+        
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+                .cryptoKeyReader(new EncKeyReader()).create();
+
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+        
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new InvalidKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe();
+
+        Consumer<byte[]> consumer3 = pulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1,
TimeUnit.SECONDS).subscribe();
+        
+        int numberOfMessages = 100;
+        String message = "my-message";
+        Set<String> messages = new HashSet(); // Since messages are in random order
+        for (int i = 0; i<numberOfMessages; i++) {
+            producer.send((message + i).getBytes());
+        }
+        
+        // Consuming from consumer 2 and 3 
+        // no message should be returned since they can't decrypt the message
+        Message m = consumer2.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        m = consumer3.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        
+        for (int i = 0; i<numberOfMessages; i++) {
+            // All messages would be received by consumer 1 
+            m = consumer1.receive();
+            messages.add(new String(m.getData()));
+            consumer1.acknowledge(m);
+        }
+        
+        // Consuming from consumer 2 and 3 again just to be sure 
+        // no message should be returned since they can't decrypt the message
+        m = consumer2.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        m = consumer3.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        
+        // checking if all messages were received
+        for (int i = 0; i<numberOfMessages; i++) {
+            assertTrue(messages.contains((message + i)));
+        }
+        
+        consumer1.close();
+        consumer2.close();
+        consumer3.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
 
     @Test(groups = "encryption")
     public void testEncryptionFailure() throws Exception {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 76c4f53d8d..6ae925d21c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1107,44 +1107,52 @@ private ByteBuf decryptPayloadIfNeeded(MessageIdData messageId, MessageMetadata
 
         // If KeyReader is not configured throw exception based on config param
         if (conf.getCryptoKeyReader() == null) {
-
-            if (conf.getCryptoFailureAction() == ConsumerCryptoFailureAction.CONSUME) {
-                log.warn("[{}][{}][{}] CryptoKeyReader interface is not implemented. Consuming
encrypted message.",
-                        topic, subscription, consumerName);
-                return payload.retain();
-            } else if (conf.getCryptoFailureAction() == ConsumerCryptoFailureAction.DISCARD)
{
-                log.warn(
-                        "[{}][{}][{}] Skipping decryption since CryptoKeyReader interface
is not implemented and config is set to discard",
-                        topic, subscription, consumerName);
-                discardMessage(messageId, currentCnx, ValidationError.DecryptionError);
-            } else {
-                log.error(
-                        "[{}][{}][{}] Message delivery failed since CryptoKeyReader interface
is not implemented to consume encrypted message",
-                        topic, subscription, consumerName);
+            switch (conf.getCryptoFailureAction()) {
+                case CONSUME:
+                    log.warn("[{}][{}][{}] CryptoKeyReader interface is not implemented.
Consuming encrypted message.",
+                            topic, subscription, consumerName);
+                    return payload.retain();
+                case DISCARD:
+                    log.warn(
+                            "[{}][{}][{}] Skipping decryption since CryptoKeyReader interface
is not implemented and config is set to discard",
+                            topic, subscription, consumerName);
+                    discardMessage(messageId, currentCnx, ValidationError.DecryptionError);
+                    return null;
+                case FAIL:
+                    MessageId m = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
partitionIndex);
+                    log.error(
+                            "[{}][{}][{}][{}] Message delivery failed since CryptoKeyReader
interface is not implemented to consume encrypted message",
+                             topic, subscription, consumerName, m);
+                    unAckedMessageTracker.add(m);
+                    return null;
             }
-            return null;
         }
 
         ByteBuf decryptedData = this.msgCrypto.decrypt(msgMetadata, payload, conf.getCryptoKeyReader());
         if (decryptedData != null) {
             return decryptedData;
         }
-
-        if (conf.getCryptoFailureAction() == ConsumerCryptoFailureAction.CONSUME) {
-            // Note, batch message will fail to consume even if config is set to consume
-            log.warn("[{}][{}][{}][{}] Decryption failed. Consuming encrypted message since
config is set to consume.",
-                    topic, subscription, consumerName, messageId);
-            return payload.retain();
-        } else if (conf.getCryptoFailureAction() == ConsumerCryptoFailureAction.DISCARD)
{
-            log.warn("[{}][{}][{}][{}] Discarding message since decryption failed and config
is set to discard", topic,
-                    subscription, consumerName, messageId);
-            discardMessage(messageId, currentCnx, ValidationError.DecryptionError);
-        } else {
-            log.error("[{}][{}][{}][{}] Message delivery failed since unable to decrypt incoming
message", topic,
-                    subscription, consumerName, messageId);
+        
+        switch (conf.getCryptoFailureAction()) {
+            case CONSUME:
+                // Note, batch message will fail to consume even if config is set to consume
+                log.warn("[{}][{}][{}][{}] Decryption failed. Consuming encrypted message
since config is set to consume.",
+                        topic, subscription, consumerName, messageId);
+                return payload.retain();
+            case DISCARD:
+                log.warn("[{}][{}][{}][{}] Discarding message since decryption failed and
config is set to discard", topic,
+                        subscription, consumerName, messageId);
+                discardMessage(messageId, currentCnx, ValidationError.DecryptionError);
+                return null;
+            case FAIL:
+                MessageId m = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
partitionIndex);
+                log.error(
+                        "[{}][{}][{}][{}] Message delivery failed since unable to decrypt
incoming message",
+                         topic, subscription, consumerName, m);
+                unAckedMessageTracker.add(m);
+                return null;
         }
         return null;
-
     }
 
     private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId, MessageMetadata msgMetadata,
ByteBuf payload,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message