From commits-return-10333-archive-asf-public=cust-asf.ponee.io@pulsar.incubator.apache.org Tue Jul 3 02:51:37 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id BA02D180626 for ; Tue, 3 Jul 2018 02:51:36 +0200 (CEST) Received: (qmail 89303 invoked by uid 500); 3 Jul 2018 00:51:35 -0000 Mailing-List: contact commits-help@pulsar.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.incubator.apache.org Delivered-To: mailing list commits@pulsar.incubator.apache.org Received: (qmail 89293 invoked by uid 99); 3 Jul 2018 00:51:35 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Jul 2018 00:51:35 +0000 From: GitBox To: commits@pulsar.apache.org Subject: [GitHub] rdhabalia closed pull request #2068: Pass encryption-context to pulsar-sink if source receive encrypted message Message-ID: <153057909527.16156.4775963535598687826.gitbox@gitbox.apache.org> Date: Tue, 03 Jul 2018 00:51:35 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit rdhabalia closed pull request #2068: Pass encryption-context to pulsar-sink if source receive encrypted message URL: https://github.com/apache/incubator-pulsar/pull/2068 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 7fb027d472..50d9687084 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -55,8 +55,8 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException; import org.apache.pulsar.client.impl.ConsumerImpl; -import org.apache.pulsar.client.impl.EncryptionContext; -import org.apache.pulsar.client.impl.EncryptionContext.EncryptionKey; +import org.apache.pulsar.common.api.EncryptionContext; +import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey; import org.apache.pulsar.client.impl.MessageCrypto; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java index e71b798359..3ab48f73eb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java @@ -19,7 +19,7 @@ package org.apache.pulsar.client.api; -import org.apache.pulsar.client.impl.EncryptionContext; +import org.apache.pulsar.common.api.EncryptionContext; public enum ConsumerCryptoFailureAction { FAIL, // This is the default option to fail consume until crypto succeeds diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java index 33be4588d2..d6149621f1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java @@ -20,6 +20,10 @@ package org.apache.pulsar.client.api; import java.util.Map; +import java.util.Optional; + +import org.apache.pulsar.common.api.EncryptionContext; + /** * The message abstraction used in Pulsar. @@ -127,4 +131,12 @@ * @return the key of the message */ String getKey(); + + /** + * {@link EncryptionContext} contains encryption and compression information in it using which application can + * decrypt consumed message with encrypted-payload. + * + * @return + */ + Optional getEncryptionCtx(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index e7924cbd0b..6f781196d5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -63,9 +63,10 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.client.impl.EncryptionContext.EncryptionKey; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.common.api.Commands; +import org.apache.pulsar.common.api.EncryptionContext; +import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey; import org.apache.pulsar.common.api.PulsarDecoder; import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; @@ -83,7 +84,6 @@ import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue; public class ConsumerImpl extends ConsumerBase implements ConnectionHandler.Connection { private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index 4adada5e01..504cc6161b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.common.api.Commands; +import org.apache.pulsar.common.api.EncryptionContext; import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; @@ -330,6 +331,7 @@ void setMessageId(MessageIdImpl messageId) { this.messageId = messageId; } + @Override public Optional getEncryptionCtx() { return encryptionCtx; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java index a27ac134c6..ea1d892c18 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java @@ -18,8 +18,13 @@ */ package org.apache.pulsar.client.impl; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; + import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.common.api.EncryptionContext; import org.apache.pulsar.io.core.Record; /** @@ -62,4 +67,14 @@ public void ack() { public void fail() { // no-op } + + @Override + public Map getProperties() { + return Collections.emptyMap(); + } + + @Override + public Optional getEncryptionCtx() { + return Optional.empty(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java index 236c2b94f6..f7f1f9b964 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java @@ -20,9 +20,11 @@ package org.apache.pulsar.client.impl; import java.util.Map; +import java.util.Optional; + import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.SchemaSerializationException; +import org.apache.pulsar.common.api.EncryptionContext; public class TopicMessageImpl extends MessageRecordImpl { @@ -107,4 +109,9 @@ public String getKey() { public T getValue() { return msg.getValue(); } + + @Override + public Optional getEncryptionCtx() { + return (msg instanceof MessageImpl) ? ((MessageImpl) msg).getEncryptionCtx() : Optional.empty(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/EncryptionContext.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java similarity index 97% rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/EncryptionContext.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java index ba7018e253..98eaad7835 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/EncryptionContext.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.client.impl; +package org.apache.pulsar.common.api; import java.util.Map; import java.util.Optional; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java index 78a7c86ddf..113cf82060 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java @@ -25,8 +25,10 @@ import lombok.ToString; import java.util.Map; +import java.util.Optional; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.common.api.EncryptionContext; import org.apache.pulsar.io.core.Record; @Data @@ -42,6 +44,7 @@ private MessageId messageId; private String topicName; private Map properties; + private Optional encryptionCtx = Optional.empty(); private Runnable failFunction; private Runnable ackFunction; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index 2190be1d00..e5100c86e9 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -25,6 +25,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.TopicMessageIdImpl; @@ -133,6 +134,7 @@ public void open(Map config) throws Exception { .recordSequence(Utils.getSequenceId(message.getMessageId())) .topicName(topicName) .properties(message.getProperties()) + .encryptionCtx(message.getEncryptionCtx()) .ackFunction(() -> { if (pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { inputConsumer.acknowledgeCumulativeAsync(message); diff --git a/pulsar-io/core/pom.xml b/pulsar-io/core/pom.xml index 1f31934f72..30da3d527a 100644 --- a/pulsar-io/core/pom.xml +++ b/pulsar-io/core/pom.xml @@ -35,6 +35,18 @@ org.slf4j slf4j-api + + + ${project.groupId} + pulsar-common + ${project.version} + + + * + * + + + diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java index 09ce8d10ab..5e1212562b 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java @@ -18,6 +18,12 @@ */ package org.apache.pulsar.io.core; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; + +import org.apache.pulsar.common.api.EncryptionContext; + /** * A source context that can be used by the runtime to interact with source. */ @@ -34,6 +40,20 @@ * @return Sequence Id associated with the record */ default long getRecordSequence() { return -1L; } + + /** + * Retrieves user-properties attached to record. + * + * @return Map of user-properties + */ + default Map getProperties() { return Collections.emptyMap();} + + /** + * Retrieves encryption-context that is attached to record. + * + * @return {@link Optional}<{@link EncryptionContext}> + */ + default Optional getEncryptionCtx() { return Optional.empty();} /** * Acknowledge that this record is fully processed ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services