pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] rdhabalia closed pull request #2068: Pass encryption-context to pulsar-sink if source receive encrypted message
Date Tue, 03 Jul 2018 00:51:35 GMT
rdhabalia closed pull request #2068: Pass encryption-context to pulsar-sink if source receive
encrypted message
URL: https://github.com/apache/incubator-pulsar/pull/2068
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 7fb027d472..50d9687084 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -55,8 +55,8 @@
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
 import org.apache.pulsar.client.impl.ConsumerImpl;
-import org.apache.pulsar.client.impl.EncryptionContext;
-import org.apache.pulsar.client.impl.EncryptionContext.EncryptionKey;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
 import org.apache.pulsar.client.impl.MessageCrypto;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
index e71b798359..3ab48f73eb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
@@ -19,7 +19,7 @@
 
 package org.apache.pulsar.client.api;
 
-import org.apache.pulsar.client.impl.EncryptionContext;
+import org.apache.pulsar.common.api.EncryptionContext;
 
 public enum ConsumerCryptoFailureAction {
     FAIL, // This is the default option to fail consume until crypto succeeds
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java
index 33be4588d2..d6149621f1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java
@@ -20,6 +20,10 @@
 package org.apache.pulsar.client.api;
 
 import java.util.Map;
+import java.util.Optional;
+
+import org.apache.pulsar.common.api.EncryptionContext;
+
 
 /**
  * The message abstraction used in Pulsar.
@@ -127,4 +131,12 @@
      * @return the key of the message
      */
     String getKey();
+    
+    /**
+     * {@link EncryptionContext} contains encryption and compression information in it using
which application can
+     * decrypt consumed message with encrypted-payload.
+     * 
+     * @return
+     */
+    Optional<EncryptionContext> getEncryptionCtx();
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index e7924cbd0b..6f781196d5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -63,9 +63,10 @@
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.EncryptionContext.EncryptionKey;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
 import org.apache.pulsar.common.api.PulsarDecoder;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
@@ -83,7 +84,6 @@
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 
 public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandler.Connection
{
     private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 4adada5e01..504cc6161b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -34,6 +34,7 @@
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
@@ -330,6 +331,7 @@ void setMessageId(MessageIdImpl messageId) {
         this.messageId = messageId;
     }
     
+    @Override
     public Optional<EncryptionContext> getEncryptionCtx() {
         return encryptionCtx;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
index a27ac134c6..ea1d892c18 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
@@ -18,8 +18,13 @@
  */
 package org.apache.pulsar.client.impl;
 
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.io.core.Record;
 
 /**
@@ -62,4 +67,14 @@ public void ack() {
     public void fail() {
         // no-op
     }
+
+    @Override
+    public Map<String, String> getProperties() {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public Optional<EncryptionContext> getEncryptionCtx() {
+        return Optional.empty();
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index 236c2b94f6..f7f1f9b964 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -20,9 +20,11 @@
 package org.apache.pulsar.client.impl;
 
 import java.util.Map;
+import java.util.Optional;
+
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.common.api.EncryptionContext;
 
 public class TopicMessageImpl<T> extends MessageRecordImpl<T, TopicMessageIdImpl>
{
 
@@ -107,4 +109,9 @@ public String getKey() {
     public T getValue() {
         return msg.getValue();
     }
+    
+    @Override
+    public Optional<EncryptionContext> getEncryptionCtx() {
+        return (msg instanceof MessageImpl) ? ((MessageImpl) msg).getEncryptionCtx() : Optional.empty();
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/EncryptionContext.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java
similarity index 97%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/EncryptionContext.java
rename to pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java
index ba7018e253..98eaad7835 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/EncryptionContext.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.impl;
+package org.apache.pulsar.common.api;
 
 import java.util.Map;
 import java.util.Optional;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index 78a7c86ddf..113cf82060 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -25,8 +25,10 @@
 import lombok.ToString;
 
 import java.util.Map;
+import java.util.Optional;
 
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.io.core.Record;
 
 @Data
@@ -42,6 +44,7 @@
     private MessageId messageId;
     private String topicName;
     private Map<String, String> properties;
+    private Optional<EncryptionContext> encryptionCtx = Optional.empty();
     private Runnable failFunction;
     private Runnable ackFunction;
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 2190be1d00..e5100c86e9 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -25,6 +25,7 @@
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.TopicMessageIdImpl;
@@ -133,6 +134,7 @@ public void open(Map<String, Object> config) throws Exception {
                 .recordSequence(Utils.getSequenceId(message.getMessageId()))
                 .topicName(topicName)
                 .properties(message.getProperties())
+                .encryptionCtx(message.getEncryptionCtx())
                 .ackFunction(() -> {
                     if (pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE)
{
                         inputConsumer.acknowledgeCumulativeAsync(message);
diff --git a/pulsar-io/core/pom.xml b/pulsar-io/core/pom.xml
index 1f31934f72..30da3d527a 100644
--- a/pulsar-io/core/pom.xml
+++ b/pulsar-io/core/pom.xml
@@ -35,6 +35,18 @@
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-common</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 
 </project>
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java
index 09ce8d10ab..5e1212562b 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java
@@ -18,6 +18,12 @@
  */
 package org.apache.pulsar.io.core;
 
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.pulsar.common.api.EncryptionContext;
+
 /**
  * A source context that can be used by the runtime to interact with source.
  */
@@ -34,6 +40,20 @@
      * @return Sequence Id associated with the record
      */
     default long getRecordSequence() { return -1L; }
+    
+    /**
+     * Retrieves user-properties attached to record. 
+     * 
+     * @return Map of user-properties
+     */
+    default Map<String, String> getProperties() { return Collections.emptyMap();}
+    
+    /**
+     * Retrieves encryption-context that is attached to record. 
+     * 
+     * @return {@link Optional}<{@link EncryptionContext}>
+     */
+    default Optional<EncryptionContext> getEncryptionCtx() { return Optional.empty();}
 
     /**
      * Acknowledge that this record is fully processed


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message