pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] rdhabalia closed pull request #2285: kinesis-sink: manage msg ordering for publish callback failure
Date Thu, 02 Aug 2018 19:35:50 GMT
rdhabalia closed pull request #2285: kinesis-sink: manage msg ordering for publish callback
failure
URL: https://github.com/apache/incubator-pulsar/pull/2285
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index dc70b98c5c..67de21a4ee 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -44,6 +44,7 @@
 import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
@@ -89,6 +90,12 @@
     private static final String defaultPartitionedKey = "default";
     private static final int maxPartitionedKeyLength = 256;
     private SinkContext sinkContext;
+    // 
+    private static final int FALSE = 0;
+    private static final int TRUE = 1;
+    private volatile int previousPublishFailed = FALSE;
+    private static final AtomicIntegerFieldUpdater<KinesisSink> IS_PUBLISH_FAILED =
+            AtomicIntegerFieldUpdater.newUpdater(KinesisSink.class, "previousPublishFailed");
 
     public static final String ACCESS_KEY_NAME = "accessKey";
     public static final String SECRET_KEY_NAME = "secretKey";
@@ -101,6 +108,12 @@
     
     @Override
     public void write(Record<byte[]> record) throws Exception {
+        // kpl-thread captures publish-failure. fail the publish on main pulsar-io-thread
to maintain the ordering 
+        if (kinesisSinkConfig.isRetainOrdering() && previousPublishFailed == TRUE)
{
+            LOG.warn("Skip acking message to retain ordering with previous failed message
{}-{}", this.streamName,
+                    record.getRecordSequence());
+            throw new IllegalStateException("kinesis queue has publish failure");
+        }
         String partitionedKey = record.getKey().orElse(defaultPartitionedKey);
         partitionedKey = partitionedKey.length() > maxPartitionedKeyLength
                 ? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
@@ -109,7 +122,7 @@ public void write(Record<byte[]> record) throws Exception {
         ListenableFuture<UserRecordResult> addRecordResult = kinesisProducer.addUserRecord(this.streamName,
                 partitionedKey, data);
         addCallback(addRecordResult,
-                ProducerSendCallback.create(this.streamName, record, System.nanoTime(), sinkContext),
directExecutor());
+                ProducerSendCallback.create(this, record, System.nanoTime()), directExecutor());
         if (sinkContext != null) {
             sinkContext.recordMetric(METRICS_TOTAL_INCOMING, 1);
             sinkContext.recordMetric(METRICS_TOTAL_INCOMING_BYTES, data.array().length);
@@ -151,6 +164,7 @@ public void open(Map<String, Object> config, SinkContext sinkContext)
throws Exc
 
         this.streamName = kinesisSinkConfig.getAwsKinesisStreamName();
         this.kinesisProducer = new KinesisProducer(kinesisConfig);
+        IS_PUBLISH_FAILED.set(this, FALSE);
 
         LOG.info("Kinesis sink started. {}", (ReflectionToStringBuilder.toString(kinesisConfig,
ToStringStyle.SHORT_PREFIX_STYLE)));
     }
@@ -167,30 +181,26 @@ protected AWSCredentialsProvider createCredentialProvider(String awsCredentialPl
     private static final class ProducerSendCallback implements FutureCallback<UserRecordResult>
{
 
         private Record<byte[]> resultContext;
-        private String streamName;
         private long startTime = 0;
         private final Handle<ProducerSendCallback> recyclerHandle;
-        private SinkContext sinkContext;
+        private KinesisSink kinesisSink;
 
         private ProducerSendCallback(Handle<ProducerSendCallback> recyclerHandle) {
             this.recyclerHandle = recyclerHandle;
         }
 
-        static ProducerSendCallback create(String streamName, Record<byte[]> resultContext,
long startTime,
-                SinkContext sinkContext) {
+        static ProducerSendCallback create(KinesisSink kinesisSink, Record<byte[]>
resultContext, long startTime) {
             ProducerSendCallback sendCallback = RECYCLER.get();
             sendCallback.resultContext = resultContext;
-            sendCallback.streamName = streamName;
+            sendCallback.kinesisSink = kinesisSink;
             sendCallback.startTime = startTime;
-            sendCallback.sinkContext = sinkContext;
             return sendCallback;
         }
 
         private void recycle() {
             resultContext = null;
-            streamName = null;
+            kinesisSink = null;
             startTime = 0;
-            sinkContext = null;
             recyclerHandle.recycle(this);
         }
 
@@ -204,23 +214,28 @@ protected ProducerSendCallback newObject(Handle<ProducerSendCallback>
handle) {
         @Override
         public void onSuccess(UserRecordResult result) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Successfully published message for {}-{} with latency", this.streamName,
result.getShardId(),
+                LOG.debug("Successfully published message for {}-{} with latency {}", kinesisSink.streamName,
result.getShardId(),
                         TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime)));
             }
-            this.resultContext.ack();
-            if (sinkContext != null) {
-                sinkContext.recordMetric(METRICS_TOTAL_SUCCESS, 1);
+            if (kinesisSink.sinkContext != null) {
+                kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_SUCCESS, 1);
+            }
+            if (kinesisSink.kinesisSinkConfig.isRetainOrdering() && kinesisSink.previousPublishFailed
== TRUE) {
+                LOG.warn("Skip acking message to retain ordering with previous failed message
{}-{} on shard {}",
+                        kinesisSink.streamName, resultContext.getRecordSequence(), result.getShardId());
+            } else {
+                this.resultContext.ack();
             }
             recycle();
         }
 
         @Override
         public void onFailure(Throwable exception) {
-            LOG.error("[{}] Failed to published message for replicator of {}-{} ", streamName,
+            LOG.error("[{}] Failed to published message for replicator of {}-{} ", kinesisSink.streamName,
                     resultContext.getPartitionId(), resultContext.getRecordSequence());
-            this.resultContext.fail();
-            if (sinkContext != null) {
-                sinkContext.recordMetric(METRICS_TOTAL_FAILURE, 1);
+            kinesisSink.previousPublishFailed = TRUE;
+            if (kinesisSink.sinkContext != null) {
+                kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_FAILURE, 1);
             }
             recycle();
         }
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
index bf5f2ea883..ba476ab099 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
@@ -45,6 +45,7 @@
     private String awsCredentialPluginName;
     private String awsCredentialPluginParam;
     private MessageFormat messageFormat = MessageFormat.ONLY_RAW_PAYLOAD; // default : ONLY_RAW_PAYLOAD
+    private boolean retainOrdering;
 
     public static KinesisSinkConfig load(String yamlFile) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message