apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject incubator-apex-malhar git commit: Fixes for the following issues
Date Fri, 12 Feb 2016 19:59:05 GMT
Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master cec33da88 -> 5775a5390


Fixes for the following issues

Committed offsets are not present in offset manager storage.
Operator partitions are reporting offsets to stats listener for kafka partitions they don't
subscribe to.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/5775a539
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/5775a539
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/5775a539

Branch: refs/heads/master
Commit: 5775a53904399b561335182bc822ecbc6f45f787
Parents: cec33da
Author: Pramod Immaneni <pramod@datatorrent.com>
Authored: Tue Feb 9 20:52:56 2016 -0800
Committer: Thomas Weise <thomas@datatorrent.com>
Committed: Fri Feb 12 11:58:38 2016 -0800

----------------------------------------------------------------------
 .../kafka/AbstractKafkaInputOperator.java       | 21 ++++++++++++++------
 1 file changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/5775a539/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
index 4b22e5e..b166b9e 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
@@ -643,7 +643,9 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer>
implem
     if (p.getPartitionedInstance().getConsumer() instanceof SimpleKafkaConsumer) {
       p.getPartitionedInstance().getConsumer().resetPartitionsAndOffset(pIds, initOffsets);
       if (initOffsets != null) {
-        p.getPartitionedInstance().offsetStats.putAll(initOffsets);
+        //Don't send all offsets to all partitions
+        //p.getPartitionedInstance().offsetStats.putAll(initOffsets);
+        p.getPartitionedInstance().offsetStats.putAll(p.getPartitionedInstance().getConsumer().getCurrentOffsets());
       }
     }
     newManagers.add(p.getPartitionedInstance().idempotentStorageManager);
@@ -715,7 +717,11 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer>
implem
   {
     //In every partition check interval, call offsetmanager to update the offsets
     if (offsetManager != null) {
-      offsetManager.updateOffsets(getOffsetsForPartitions(kstats));
+      Map<KafkaPartition, Long> offsetsForPartitions = getOffsetsForPartitions(kstats);
+      if (offsetsForPartitions.size() > 0) {
+        logger.debug("Passing offset updates to offset manager");
+        offsetManager.updateOffsets(offsetsForPartitions);
+      }
     }
   }
 
@@ -743,15 +749,18 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer>
implem
 
     long t = System.currentTimeMillis();
 
+    // If stats are available then update offsets
+    // Do this before re-partition interval check below to not miss offset updates
+    if (kstats.size() > 0) {
+      logger.debug("Checking offset updates for offset manager");
+      updateOffsets(kstats);
+    }
+
     if (t - lastCheckTime < repartitionCheckInterval) {
       // return false if it's within repartitionCheckInterval since last time it check the
stats
       return false;
     }
 
-    logger.debug("Use OffsetManager to update offsets");
-    updateOffsets(kstats);
-
-
     if(repartitionInterval < 0){
       // if repartition is disabled
       return false;


Mime
View raw message