pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rdhaba...@apache.org
Subject [incubator-pulsar] branch master updated: kinesis-sink: manage msg ordering for publish callback failure (#2285)
Date Thu, 02 Aug 2018 19:35:52 GMT
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d2f6dd9  kinesis-sink: manage msg ordering for publish callback failure (#2285)
d2f6dd9 is described below

commit d2f6dd906292078e68376d8d5b995903e58138de
Author: Rajan Dhabalia <rdhabalia@apache.org>
AuthorDate: Thu Aug 2 12:35:48 2018 -0700

    kinesis-sink: manage msg ordering for publish callback failure (#2285)
---
 .../org/apache/pulsar/io/kinesis/KinesisSink.java  | 49 ++++++++++++++--------
 .../pulsar/io/kinesis/KinesisSinkConfig.java       |  1 +
 2 files changed, 33 insertions(+), 17 deletions(-)

diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index dc70b98..67de21a 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -44,6 +44,7 @@ import java.lang.reflect.Constructor;
 import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
@@ -89,6 +90,12 @@ public class KinesisSink implements Sink<byte[]> {
     private static final String defaultPartitionedKey = "default";
     private static final int maxPartitionedKeyLength = 256;
     private SinkContext sinkContext;
+    // 
+    private static final int FALSE = 0;
+    private static final int TRUE = 1;
+    private volatile int previousPublishFailed = FALSE;
+    private static final AtomicIntegerFieldUpdater<KinesisSink> IS_PUBLISH_FAILED =
+            AtomicIntegerFieldUpdater.newUpdater(KinesisSink.class, "previousPublishFailed");
 
     public static final String ACCESS_KEY_NAME = "accessKey";
     public static final String SECRET_KEY_NAME = "secretKey";
@@ -101,6 +108,12 @@ public class KinesisSink implements Sink<byte[]> {
     
     @Override
     public void write(Record<byte[]> record) throws Exception {
+        // kpl-thread captures publish-failure. fail the publish on main pulsar-io-thread
to maintain the ordering 
+        if (kinesisSinkConfig.isRetainOrdering() && previousPublishFailed == TRUE)
{
+            LOG.warn("Skip acking message to retain ordering with previous failed message
{}-{}", this.streamName,
+                    record.getRecordSequence());
+            throw new IllegalStateException("kinesis queue has publish failure");
+        }
         String partitionedKey = record.getKey().orElse(defaultPartitionedKey);
         partitionedKey = partitionedKey.length() > maxPartitionedKeyLength
                 ? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
@@ -109,7 +122,7 @@ public class KinesisSink implements Sink<byte[]> {
         ListenableFuture<UserRecordResult> addRecordResult = kinesisProducer.addUserRecord(this.streamName,
                 partitionedKey, data);
         addCallback(addRecordResult,
-                ProducerSendCallback.create(this.streamName, record, System.nanoTime(), sinkContext),
directExecutor());
+                ProducerSendCallback.create(this, record, System.nanoTime()), directExecutor());
         if (sinkContext != null) {
             sinkContext.recordMetric(METRICS_TOTAL_INCOMING, 1);
             sinkContext.recordMetric(METRICS_TOTAL_INCOMING_BYTES, data.array().length);
@@ -151,6 +164,7 @@ public class KinesisSink implements Sink<byte[]> {
 
         this.streamName = kinesisSinkConfig.getAwsKinesisStreamName();
         this.kinesisProducer = new KinesisProducer(kinesisConfig);
+        IS_PUBLISH_FAILED.set(this, FALSE);
 
         LOG.info("Kinesis sink started. {}", (ReflectionToStringBuilder.toString(kinesisConfig,
ToStringStyle.SHORT_PREFIX_STYLE)));
     }
@@ -167,30 +181,26 @@ public class KinesisSink implements Sink<byte[]> {
     private static final class ProducerSendCallback implements FutureCallback<UserRecordResult>
{
 
         private Record<byte[]> resultContext;
-        private String streamName;
         private long startTime = 0;
         private final Handle<ProducerSendCallback> recyclerHandle;
-        private SinkContext sinkContext;
+        private KinesisSink kinesisSink;
 
         private ProducerSendCallback(Handle<ProducerSendCallback> recyclerHandle) {
             this.recyclerHandle = recyclerHandle;
         }
 
-        static ProducerSendCallback create(String streamName, Record<byte[]> resultContext,
long startTime,
-                SinkContext sinkContext) {
+        static ProducerSendCallback create(KinesisSink kinesisSink, Record<byte[]>
resultContext, long startTime) {
             ProducerSendCallback sendCallback = RECYCLER.get();
             sendCallback.resultContext = resultContext;
-            sendCallback.streamName = streamName;
+            sendCallback.kinesisSink = kinesisSink;
             sendCallback.startTime = startTime;
-            sendCallback.sinkContext = sinkContext;
             return sendCallback;
         }
 
         private void recycle() {
             resultContext = null;
-            streamName = null;
+            kinesisSink = null;
             startTime = 0;
-            sinkContext = null;
             recyclerHandle.recycle(this);
         }
 
@@ -204,23 +214,28 @@ public class KinesisSink implements Sink<byte[]> {
         @Override
         public void onSuccess(UserRecordResult result) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Successfully published message for {}-{} with latency", this.streamName,
result.getShardId(),
+                LOG.debug("Successfully published message for {}-{} with latency {}", kinesisSink.streamName,
result.getShardId(),
                         TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime)));
             }
-            this.resultContext.ack();
-            if (sinkContext != null) {
-                sinkContext.recordMetric(METRICS_TOTAL_SUCCESS, 1);
+            if (kinesisSink.sinkContext != null) {
+                kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_SUCCESS, 1);
+            }
+            if (kinesisSink.kinesisSinkConfig.isRetainOrdering() && kinesisSink.previousPublishFailed
== TRUE) {
+                LOG.warn("Skip acking message to retain ordering with previous failed message
{}-{} on shard {}",
+                        kinesisSink.streamName, resultContext.getRecordSequence(), result.getShardId());
+            } else {
+                this.resultContext.ack();
             }
             recycle();
         }
 
         @Override
         public void onFailure(Throwable exception) {
-            LOG.error("[{}] Failed to published message for replicator of {}-{} ", streamName,
+            LOG.error("[{}] Failed to published message for replicator of {}-{} ", kinesisSink.streamName,
                     resultContext.getPartitionId(), resultContext.getRecordSequence());
-            this.resultContext.fail();
-            if (sinkContext != null) {
-                sinkContext.recordMetric(METRICS_TOTAL_FAILURE, 1);
+            kinesisSink.previousPublishFailed = TRUE;
+            if (kinesisSink.sinkContext != null) {
+                kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_FAILURE, 1);
             }
             recycle();
         }
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
index bf5f2ea..ba476ab 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
@@ -45,6 +45,7 @@ public class KinesisSinkConfig implements Serializable {
     private String awsCredentialPluginName;
     private String awsCredentialPluginParam;
     private MessageFormat messageFormat = MessageFormat.ONLY_RAW_PAYLOAD; // default : ONLY_RAW_PAYLOAD
+    private boolean retainOrdering;
 
     public static KinesisSinkConfig load(String yamlFile) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());


Mime
View raw message