pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sanjee...@apache.org
Subject [incubator-pulsar] branch master updated: Add support for dead letter topics for java functions (#2606)
Date Wed, 19 Sep 2018 17:01:26 GMT
This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 82aa2b8  Add support for dead letter topics for java functions (#2606)
82aa2b8 is described below

commit 82aa2b83359c31f71eae40bb8f068ce703f08b59
Author: Sanjeev Kulkarni <sanjeevrk@gmail.com>
AuthorDate: Wed Sep 19 10:01:21 2018 -0700

    Add support for dead letter topics for java functions (#2606)
    
    * Added ability to specify dead letter topic to functions
    
    * Fix bug
    
    * Added an example function that fails on a particular message consistently
    
    * Revert change
---
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  | 25 ++++++++++++--
 .../functions/instance/JavaInstanceRunnable.java   |  5 +++
 .../pulsar/functions/source/PulsarSource.java      | 18 +++++-----
 .../functions/source/PulsarSourceConfig.java       |  2 ++
 .../api/examples/ConsistentlyFailingFunction.java  | 38 ++++++++++++++++++++++
 .../proto/src/main/proto/Function.proto            |  6 ++++
 .../pulsar/functions/utils/FunctionConfig.java     |  2 ++
 .../functions/utils/validation/ValidatorImpls.java | 12 +++++++
 8 files changed, 97 insertions(+), 11 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index e8c8740..5284b59 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -24,7 +24,7 @@ import static java.util.Objects.isNull;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.apache.commons.lang.StringUtils.isBlank;
 import static org.apache.commons.lang.StringUtils.isNotBlank;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
 import static org.apache.pulsar.functions.utils.Utils.fileExists;
@@ -45,7 +45,6 @@ import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
 
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.lang.reflect.Type;
 import java.net.MalformedURLException;
@@ -79,12 +78,12 @@ import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.Resources;
+import org.apache.pulsar.functions.proto.Function.RetryDetails;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.Function.SubscriptionType;
 import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
-import org.apache.pulsar.functions.utils.ConsumerConfig;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.Utils;
@@ -321,6 +320,10 @@ public class CmdFunctions extends CmdBase {
         protected Long DEPRECATED_timeoutMs;
         @Parameter(names = "--timeout-ms", description = "The message timeout in milliseconds")
         protected Long timeoutMs;
+        @Parameter(names = "--max-message-retries", description = "How many times should
we try to process a message before giving up")
+        protected Integer maxMessageRetries = -1;
+        @Parameter(names = "--dead-letter-topic", description = "The topic where all messages
which could not be processed successfully are sent")
+        protected String deadLetterTopic;
         protected FunctionConfig functionConfig;
         protected String userCodeFile;
 
@@ -464,6 +467,13 @@ public class CmdFunctions extends CmdBase {
 
             functionConfig.setAutoAck(autoAck);
 
+            if (null != maxMessageRetries) {
+                functionConfig.setMaxMessageRetries(maxMessageRetries);
+            }
+            if (null != deadLetterTopic) {
+                functionConfig.setDeadLetterTopic(deadLetterTopic);
+            }
+
             if (null != jarFile) {
                 functionConfig.setJar(jarFile);
             }
@@ -717,6 +727,15 @@ public class CmdFunctions extends CmdBase {
                         Utils.convertProcessingGuarantee(functionConfig.getProcessingGuarantees()));
             }
 
+            if (functionConfig.getMaxMessageRetries() >= 0) {
+                RetryDetails.Builder retryBuilder = RetryDetails.newBuilder();
+                retryBuilder.setMaxMessageRetries(functionConfig.getMaxMessageRetries());
+                if (isNotEmpty(functionConfig.getDeadLetterTopic())) {
+                    retryBuilder.setDeadLetterTopic(functionConfig.getDeadLetterTopic());
+                }
+                functionDetailsBuilder.setRetryDetails(retryBuilder);
+            }
+
             Map<String, Object> configs = new HashMap<>();
             configs.putAll(functionConfig.getUserConfig());
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index b3f86ea..1e07516 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -547,6 +547,11 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable
{
             if (sourceSpec.getTimeoutMs() > 0 ) {
                 pulsarSourceConfig.setTimeoutMs(sourceSpec.getTimeoutMs());
             }
+
+            if (this.instanceConfig.getFunctionDetails().getRetryDetails() != null) {
+                pulsarSourceConfig.setMaxMessageRetries(this.instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
+                pulsarSourceConfig.setDeadLetterTopic(this.instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
+            }
             object = new PulsarSource(this.client, pulsarSourceConfig,
                     FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
         } else {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 6eed8e0..afac782 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -31,14 +31,7 @@ import lombok.Builder;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageListener;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
 import org.apache.pulsar.functions.api.Record;
@@ -97,6 +90,15 @@ public class PulsarSource<T> extends PushSource<T> implements
MessageListener<T>
                 cb.ackTimeout(pulsarSourceConfig.getTimeoutMs(), TimeUnit.MILLISECONDS);
             }
 
+            if (pulsarSourceConfig.getMaxMessageRetries() >= 0) {
+                DeadLetterPolicy.DeadLetterPolicyBuilder deadLetterPolicyBuilder = DeadLetterPolicy.builder();
+                deadLetterPolicyBuilder.maxRedeliverCount(pulsarSourceConfig.getMaxMessageRetries());
+                if (pulsarSourceConfig.getDeadLetterTopic() != null && !pulsarSourceConfig.getDeadLetterTopic().isEmpty())
{
+                    deadLetterPolicyBuilder.deadLetterTopic(pulsarSourceConfig.getDeadLetterTopic());
+                }
+                cb.deadLetterPolicy(deadLetterPolicyBuilder.build());
+            }
+
             return cb.subscribeAsync();
         }).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList());
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
index f1cb09b..4e2afa7 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
@@ -37,6 +37,8 @@ public class PulsarSourceConfig {
     private FunctionConfig.ProcessingGuarantees processingGuarantees;
     SubscriptionType subscriptionType;
     private String subscriptionName;
+    private int maxMessageRetries;
+    private String deadLetterTopic;
 
     private Map<String, ConsumerConfig> topicSchema = new TreeMap<>();
 
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ConsistentlyFailingFunction.java
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ConsistentlyFailingFunction.java
new file mode 100644
index 0000000..792a574
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ConsistentlyFailingFunction.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+
+
+/**
+ * This Function simulates a pulsar function encountering failing on a particular message.
+ */
+public class ConsistentlyFailingFunction implements Function<String, String> {
+    @Override
+    public String process(String input, Context context) {
+        if (input.equals("FAIL")) {
+            throw new RuntimeException("Failed");
+        } else {
+            return "SUCCESS";
+        }
+    }
+}
+
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index a76cf8d..482d901 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -40,6 +40,11 @@ message Resources {
     int64 disk = 3;
 }
 
+message RetryDetails {
+    int32 maxMessageRetries = 1;
+    string deadLetterTopic = 2;
+}
+
 message FunctionDetails {
     enum Runtime {
         JAVA = 0;
@@ -59,6 +64,7 @@ message FunctionDetails {
     SinkSpec sink = 12;
     Resources resources = 13;
     string packageUrl = 14; //present only if function submitted with package-url
+    RetryDetails retryDetails = 15;
 }
 
 message ConsumerSpec {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index 1335f8c..dc36812 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -107,6 +107,8 @@ public class FunctionConfig {
     private Map<String, Object> userConfig;
     private Runtime runtime;
     private boolean autoAck;
+    private int maxMessageRetries;
+    private String deadLetterTopic;
     private String subName;
     @isPositiveNumber
     private int parallelism;
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
index f60f3c0..e8acc28 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
@@ -514,6 +514,10 @@ public class ValidatorImpls {
             if (functionConfig.getWindowConfig() != null) {
                 throw new IllegalArgumentException("There is currently no support windowing
in python");
             }
+
+            if (functionConfig.getMaxMessageRetries() >= 0) {
+                throw new IllegalArgumentException("Message retries not yet supported in
python");
+            }
         }
 
         private static void verifyNoTopicClash(Collection<String> inputTopics, String
outputTopic) throws IllegalArgumentException {
@@ -549,6 +553,14 @@ public class ValidatorImpls {
                 throw new IllegalArgumentException("Message timeout can only be specified
with processing guarantee is "
                         + FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE.name());
             }
+
+            if (functionConfig.getMaxMessageRetries() >= 0
+                    && functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE)
{
+                throw new IllegalArgumentException("MaxMessageRetries and Effectively once
don't gel well");
+            }
+            if (functionConfig.getMaxMessageRetries() < 0 && !StringUtils.isEmpty(functionConfig.getDeadLetterTopic()))
{
+                throw new IllegalArgumentException("Dead Letter Topic specified, however
max retries is set to infinity");
+            }
         }
 
         private static Collection<String> collectAllInputTopics(FunctionConfig functionConfig)
{


Mime
View raw message