pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] rdhabalia closed pull request #2024: Forward encryption properties with encrypted payload to consumer
Date Sun, 01 Jul 2018 22:40:03 GMT
rdhabalia closed pull request #2024: Forward encryption properties with encrypted payload to
consumer
URL: https://github.com/apache/incubator-pulsar/pull/2024
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index ab2af4c6dc..7fb027d472 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -35,6 +35,7 @@
 import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -54,10 +55,22 @@
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
 import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.EncryptionContext;
+import org.apache.pulsar.client.impl.EncryptionContext.EncryptionKey;
+import org.apache.pulsar.client.impl.MessageCrypto;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarDecoder;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata.Builder;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -67,8 +80,12 @@
 import org.testng.annotations.Test;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 public class SimpleProducerConsumerTest extends ProducerConsumerBase {
     private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
 
@@ -2222,10 +2239,13 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String,
String> keyMe
             producer2.send(message.getBytes());
         }
 
-        Message<byte[]> msg = null;
+        MessageImpl<byte[]> msg = null;
 
         for (int i = 0; i < totalMsg * 2; i++) {
-            msg = consumer.receive(5, TimeUnit.SECONDS);
+            msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+            // verify that encrypted message contains encryption-context
+            msg.getEncryptionCtx()
+                    .orElseThrow(() -> new IllegalStateException("encryption-ctx not present
for encrypted message"));
             String receivedMessage = new String(msg.getData());
             log.debug("Received message: [{}]", receivedMessage);
             String expectedMessage = "my-message-" + i;
@@ -2276,7 +2296,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String,
String> keyMe
 
         final int totalMsg = 10;
 
-        Message<byte[]> msg = null;
+        MessageImpl<byte[]> msg = null;
         Set<String> messageSet = Sets.newHashSet();
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic("persistent://my-property/use/myenc-ns/myenc-topic1").subscriptionName("my-subscriber-name")
@@ -2307,7 +2327,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String,
String> keyMe
 
         // 3. KeyReder is not set by consumer
         // Receive should fail since key reader is not setup
-        msg = consumer.receive(5, TimeUnit.SECONDS);
+        msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
         Assert.assertNull(msg, "Receive should have failed with no keyreader");
 
         // 4. Set consumer config to consume even if decryption fails
@@ -2319,7 +2339,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String,
String> keyMe
         int msgNum = 0;
         try {
             // Receive should proceed and deliver encrypted message
-            msg = consumer.receive(5, TimeUnit.SECONDS);
+            msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
             String expectedMessage = "my-message-" + msgNum++;
             Assert.assertNotEquals(receivedMessage, expectedMessage, "Received encrypted
message " + receivedMessage
@@ -2338,7 +2358,10 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String,
String> keyMe
                 .cryptoKeyReader(new EncKeyReader()).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
 
         for (int i = msgNum; i < totalMsg - 1; i++) {
-            msg = consumer.receive(5, TimeUnit.SECONDS);
+            msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+            // verify that encrypted message contains encryption-context
+            msg.getEncryptionCtx()
+                    .orElseThrow(() -> new IllegalStateException("encryption-ctx not present
for encrypted message"));
             String receivedMessage = new String(msg.getData());
             log.debug("Received message: [{}]", receivedMessage);
             String expectedMessage = "my-message-" + i;
@@ -2355,12 +2378,136 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String,
String> keyMe
                 .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
 
         // Receive should proceed and discard encrypted messages
-        msg = consumer.receive(5, TimeUnit.SECONDS);
+        msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
         Assert.assertNull(msg, "Message received even aftet ConsumerCryptoFailureAction.DISCARD
is set.");
 
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(groups = "encryption")
+    public void testEncryptionConsumerWithoutCryptoReader() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final String encryptionKeyName = "client-rsa.pem";
+        final String encryptionKeyVersion = "1.0";
+        Map<String, String> metadata = Maps.newHashMap();
+        metadata.put("version", encryptionKeyVersion);
+        class EncKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String>
keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." +
keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or
not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String>
keyMeta) {
+                String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." +
keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or
not readable.");
+                }
+                return null;
+            }
+        }
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+                .cryptoKeyReader(new EncKeyReader()).create();
+
+        String message = "my-message";
+        producer.send(message.getBytes());
+
+        MessageImpl<byte[]> msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+
+        String receivedMessage = decryptMessage(msg, encryptionKeyName, new EncKeyReader());
+        assertEquals(message, receivedMessage);
+
+        consumer.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
+    
+    private String decryptMessage(MessageImpl<byte[]> msg, String encryptionKeyName,
CryptoKeyReader reader)
+            throws Exception {
+        Optional<EncryptionContext> ctx = msg.getEncryptionCtx();
+        Assert.assertTrue(ctx.isPresent());
+        EncryptionContext encryptionCtx = ctx
+                .orElseThrow(() -> new IllegalStateException("encryption-ctx not present
for encrypted message"));
+
+        Map<String, EncryptionKey> keys = encryptionCtx.getKeys();
+        assertEquals(keys.size(), 1);
+        EncryptionKey encryptionKey = keys.get(encryptionKeyName);
+        byte[] dataKey = encryptionKey.getKeyValue();
+        Map<String, String> metadata = encryptionKey.getMetadata();
+        String version = metadata.get("version");
+        assertEquals(version, "1.0");
+
+        org.apache.pulsar.common.api.proto.PulsarApi.CompressionType compressionType = encryptionCtx
+                .getCompressionType();
+        int uncompressedSize = encryptionCtx.getUncompressedMessageSize();
+        byte[] encrParam = encryptionCtx.getParam();
+        String encAlgo = encryptionCtx.getAlgorithm();
+        int batchSize = encryptionCtx.getBatchSize().orElse(0);
+
+        ByteBuf payloadBuf = Unpooled.wrappedBuffer(msg.getData());
+        // try to decrypt
+        MessageCrypto crypto = new MessageCrypto("test", false);
+        Builder metadataBuilder = MessageMetadata.newBuilder();
+        org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys.Builder encKeyBuilder
= EncryptionKeys.newBuilder();
+        encKeyBuilder.setKey(encryptionKeyName);
+        ByteString keyValue = ByteString.copyFrom(dataKey);
+        encKeyBuilder.setValue(keyValue);
+        EncryptionKeys encKey = encKeyBuilder.build();
+        metadataBuilder.setEncryptionParam(ByteString.copyFrom(encrParam));
+        metadataBuilder.setEncryptionAlgo(encAlgo);
+        metadataBuilder.setProducerName("test");
+        metadataBuilder.setSequenceId(123);
+        metadataBuilder.setPublishTime(12333453454L);
+        metadataBuilder.addEncryptionKeys(encKey);
+        metadataBuilder.setCompression(compressionType);
+        metadataBuilder.setUncompressedSize(uncompressedSize);
+        ByteBuf decryptedPayload = crypto.decrypt(metadataBuilder.build(), payloadBuf, reader);
+
+        // try to uncompress
+        CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);
+        ByteBuf uncompressedPayload = codec.decode(decryptedPayload, uncompressedSize);
+
+        if (batchSize > 0) {
+            PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata
+                    .newBuilder();
+            uncompressedPayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
+                    singleMessageMetadataBuilder, 0, batchSize);
+        }
+
+        byte[] data = new byte[uncompressedPayload.readableBytes()];
+        uncompressedPayload.readBytes(data);
+        uncompressedPayload.release();
+        return new String(data);
+    }
+
     @Test
     public void testConsumerSubscriptionInitialize() throws Exception {
         log.info("-- Starting {} test --", methodName);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
index b67383af96..e71b798359 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
@@ -19,11 +19,22 @@
 
 package org.apache.pulsar.client.api;
 
+import org.apache.pulsar.client.impl.EncryptionContext;
+
 public enum ConsumerCryptoFailureAction {
     FAIL, // This is the default option to fail consume until crypto succeeds
     DISCARD, // Message is silently acknowledged and not delivered to the application
-    CONSUME // Deliver the encrypted message to the application. It's the application's
-            // responsibility to decrypt the message. If message is also compressed,
-            // decompression will fail. If message contain batch messages, client will
-            // not be able to retrieve individual messages in the batch
+    /**
+     * 
+     * <pre>
+     * Deliver the encrypted message to the application. It's the application's responsibility
to decrypt the message.
+     * If message is also compressed, decompression will fail. If message contain batch messages,
client will not be
+     * able to retrieve individual messages in the batch.
+     * </pre>
+     * 
+     * Delivered encrypted message contains {@link EncryptionContext} which contains encryption
and compression
+     * information in it using which application can decrypt consumed message payload.
+     * 
+     */
+    CONSUME;
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index d94fe8b62d..e7924cbd0b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -27,17 +27,20 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.util.Timeout;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import static java.util.Base64.getEncoder;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -50,6 +53,7 @@
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
+
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.ConsumerStats;
@@ -59,6 +63,7 @@
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.EncryptionContext.EncryptionKey;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarDecoder;
@@ -67,6 +72,8 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
+import org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys;
+import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
@@ -76,6 +83,7 @@
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 
 public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandler.Connection
{
     private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;
@@ -709,11 +717,17 @@ void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload,
ClientC
         }
 
         ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId, msgMetadata, payload,
cnx);
+        
+        boolean isMessageUndecryptable = isMessageUndecryptable(msgMetadata);
+        
         if (decryptedPayload == null) {
             // Message was discarded or CryptoKeyReader isn't implemented
             return;
         }
-        ByteBuf uncompressedPayload = uncompressPayloadIfNeeded(messageId, msgMetadata, decryptedPayload,
cnx);
+        
+        // uncompress decryptedPayload and release decryptedPayload-ByteBuf
+        ByteBuf uncompressedPayload = isMessageUndecryptable ? decryptedPayload.retain()

+                : uncompressPayloadIfNeeded(messageId, msgMetadata, decryptedPayload, cnx);
         decryptedPayload.release();
         if (uncompressedPayload == null) {
             // Message was discarded on decompression error
@@ -722,8 +736,11 @@ void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload,
ClientC
 
         final int numMessages = msgMetadata.getNumMessagesInBatch();
 
-        if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()) {
-            final MessageImpl<T> message = new MessageImpl<>(msgId, msgMetadata,
uncompressedPayload, cnx, schema);
+        // if message is not decryptable then it can't be parsed as a batch-message. so,
add EncyrptionCtx to message
+        // and return undecrypted payload
+        if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()))
{
+            final MessageImpl<T> message = new MessageImpl<>(msgId, msgMetadata,
uncompressedPayload,
+                    createEncryptionContext(msgMetadata), cnx, schema);
             uncompressedPayload.release();
             msgMetadata.recycle();
 
@@ -895,7 +912,8 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf
unc
                 BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(),
                         messageId.getEntryId(), getPartitionIndex(), i, acker);
                 final MessageImpl<T> message = new MessageImpl<>(batchMessageIdImpl,
msgMetadata,
-                        singleMessageMetadataBuilder.build(), singleMessagePayload, cnx,
schema);
+                        singleMessageMetadataBuilder.build(), singleMessagePayload,
+                        createEncryptionContext(msgMetadata), cnx, schema);
                 lock.readLock().lock();
                 try {
                     if (pendingReceives.isEmpty()) {
@@ -1322,6 +1340,45 @@ private MessageIdImpl getMessageIdImpl(Message<?> msg) {
         return messageId;
     }
 
+
+    private boolean isMessageUndecryptable(MessageMetadata msgMetadata) {
+        return (msgMetadata.getEncryptionKeysCount() > 0 && conf.getCryptoKeyReader()
== null
+                && conf.getCryptoFailureAction() == ConsumerCryptoFailureAction.CONSUME);
+    }
+
+    /**
+     * Create EncryptionContext if message payload is encrypted
+     * 
+     * @param msgMetadata
+     * @return {@link Optional}<{@link EncryptionContext}>
+     */
+    private Optional<EncryptionContext> createEncryptionContext(MessageMetadata msgMetadata)
{
+
+        EncryptionContext encryptionCtx = null;
+        if (msgMetadata.getEncryptionKeysCount() > 0) {
+            encryptionCtx = new EncryptionContext();
+            Map<String, EncryptionKey> keys = msgMetadata.getEncryptionKeysList().stream()
+                    .collect(
+                            Collectors.toMap(EncryptionKeys::getKey,
+                                    e -> new EncryptionKey(e.getValue().toByteArray(),
+                                            e.getMetadataList() != null
+                                                    ? e.getMetadataList().stream().collect(
+                                                            Collectors.toMap(KeyValue::getKey,
KeyValue::getValue))
+                                                    : null)));
+            byte[] encParam = new byte[MessageCrypto.ivLen];
+            msgMetadata.getEncryptionParam().copyTo(encParam, 0);
+            Optional<Integer> batchSize = Optional
+                    .ofNullable(msgMetadata.hasNumMessagesInBatch() ? msgMetadata.getNumMessagesInBatch()
: null);
+            encryptionCtx.setKeys(keys);
+            encryptionCtx.setParam(encParam);
+            encryptionCtx.setAlgorithm(msgMetadata.getEncryptionAlgo());
+            encryptionCtx.setCompressionType(msgMetadata.getCompression());
+            encryptionCtx.setUncompressedMessageSize(msgMetadata.getUncompressedSize());
+            encryptionCtx.setBatchSize(batchSize);
+        }
+        return Optional.ofNullable(encryptionCtx);
+    }
+
     private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
         int messagesFromQueue = 0;
         Message<T> peek = incomingMessages.peek();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/EncryptionContext.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/EncryptionContext.java
new file mode 100644
index 0000000000..ba7018e253
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/EncryptionContext.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@Getter
+@Setter
+public class EncryptionContext {
+
+    private Map<String, EncryptionKey> keys;
+    private byte[] param;
+    private Map<String, String> metadata;
+    private String algorithm;
+    private CompressionType compressionType;
+    private int uncompressedMessageSize;
+    private Optional<Integer> batchSize;
+
+    @Getter
+    @Setter
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class EncryptionKey {
+        private byte[] keyValue;
+        private Map<String, String> metadata;
+    }
+
+}
\ No newline at end of file
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java
index 2b57470513..116410873b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java
@@ -97,7 +97,7 @@
 
     private static KeyGenerator keyGenerator;
     private static final int tagLen = 16 * 8;
-    private static final int ivLen = 12;
+    public static final int ivLen = 12;
     private byte[] iv = new byte[ivLen];
     private Cipher cipher;
     MessageDigest digest;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index c019112c33..4adada5e01 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -25,6 +25,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -50,6 +51,7 @@
     private ClientCnx cnx;
     private ByteBuf payload;
     private Schema<T> schema;
+    private Optional<EncryptionContext> encryptionCtx = Optional.empty();
 
     transient private Map<String, String> properties;
 
@@ -81,6 +83,11 @@
     // Constructor for incoming message
     MessageImpl(MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf payload, ClientCnx
cnx,
             Schema<T> schema) {
+        this(messageId, msgMetadata, payload, null, cnx, schema);
+    }
+    
+    MessageImpl(MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf payload,
+            Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T>
schema) {
         this.msgMetadataBuilder = MessageMetadata.newBuilder(msgMetadata);
         this.messageId = messageId;
         this.cnx = cnx;
@@ -89,6 +96,7 @@
         // release, since the Message is passed to the user. Also, the passed ByteBuf is
coming from network and is
         // backed by a direct buffer which we could not expose as a byte[]
         this.payload = Unpooled.copiedBuffer(payload);
+        this.encryptionCtx = encryptionCtx;
 
         if (msgMetadata.getPropertiesCount() > 0) {
             this.properties = Collections.unmodifiableMap(msgMetadataBuilder.getPropertiesList().stream()
@@ -100,12 +108,14 @@
     }
 
     MessageImpl(BatchMessageIdImpl batchMessageIdImpl, MessageMetadata msgMetadata,
-            PulsarApi.SingleMessageMetadata singleMessageMetadata, ByteBuf payload, ClientCnx
cnx, Schema<T> schema) {
+            PulsarApi.SingleMessageMetadata singleMessageMetadata, ByteBuf payload,
+            Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, Schema<T>
schema) {
         this.msgMetadataBuilder = MessageMetadata.newBuilder(msgMetadata);
         this.messageId = batchMessageIdImpl;
         this.cnx = cnx;
 
         this.payload = Unpooled.copiedBuffer(payload);
+        this.encryptionCtx = encryptionCtx;
 
         if (singleMessageMetadata.getPropertiesCount() > 0) {
             Map<String, String> properties = Maps.newTreeMap();
@@ -319,4 +329,8 @@ public boolean hasReplicateTo() {
     void setMessageId(MessageIdImpl messageId) {
         this.messageId = messageId;
     }
+    
+    public Optional<EncryptionContext> getEncryptionCtx() {
+        return encryptionCtx;
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
index 1ab6d04297..8cf0328613 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
@@ -25,6 +25,7 @@
 import io.netty.buffer.ByteBuf;
 
 import java.io.IOException;
+import java.util.Optional;
 
 import lombok.experimental.UtilityClass;
 import lombok.extern.slf4j.Slf4j;
@@ -175,7 +176,7 @@ public static void receiveIndividualMessagesFromBatch(MessageMetadata
msgMetadat
                 BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(),
                         messageId.getEntryId(), partitionIndex, i, null);
                 final MessageImpl<?> message = new MessageImpl<>(batchMessageIdImpl,
msgMetadata,
-                        singleMessageMetadataBuilder.build(), singleMessagePayload, cnx,
null);
+                        singleMessageMetadataBuilder.build(), singleMessagePayload, Optional.empty(),
cnx, null);
 
                 processor.process(batchMessageIdImpl, message, singleMessagePayload);
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message