pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: set functions config to me uniform (#3629)
Date Wed, 20 Feb 2019 17:24:40 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fd283e  set functions config to me uniform (#3629)
2fd283e is described below

commit 2fd283e9e0a583bc8631b77af44df6f21480eaf1
Author: Boyang Jerry Peng <jerry.boyang.peng@gmail.com>
AuthorDate: Wed Feb 20 09:24:35 2019 -0800

    set functions config to me uniform (#3629)
---
 .../org/apache/pulsar/functions/instance/ContextImpl.java   | 13 +++++++++++++
 .../java/org/apache/pulsar/functions/sink/PulsarSink.java   |  2 +-
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 85b8080..7271c87 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -25,6 +25,9 @@ import io.prometheus.client.Summary;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -321,6 +324,16 @@ class ContextImpl implements Context, SinkContext, SourceContext {
             try {
                 Producer<O> newProducer = ((ProducerBuilderImpl<O>) producerBuilder.clone())
                         .schema(schema)
+                        .blockIfQueueFull(true)
+                        .enableBatching(true)
+                        .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
+                        .compressionType(CompressionType.LZ4)
+                        .hashingScheme(HashingScheme.Murmur3_32Hash) //
+                        .messageRoutingMode(MessageRoutingMode.CustomPartition)
+                        .messageRouter(FunctionResultRouter.of())
+                        // set send timeout to be infinity to prevent potential deadlock
with consumer
+                        // that might happen when consumer is blocked due to unacked messages
+                        .sendTimeout(0, TimeUnit.SECONDS)
                         .topic(topicName)
                         .properties(InstanceUtils.getProperties(componentType,
                                 FunctionDetailsUtils.getFullyQualifiedName(
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 3195a82..717ac10 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -85,7 +85,7 @@ public class PulsarSink<T> implements Sink<T> {
             ProducerBuilder<T> builder = client.newProducer(schema)
                     .blockIfQueueFull(true)
                     .enableBatching(true)
-                    .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+                    .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                     .compressionType(CompressionType.LZ4)
                     .hashingScheme(HashingScheme.Murmur3_32Hash) //
                     .messageRoutingMode(MessageRoutingMode.CustomPartition)


Mime
View raw message