apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject apex-malhar git commit: APEXMALHAR-2227 Index out of Bound Exception fix in Kafka input
Date Wed, 07 Sep 2016 20:00:08 GMT
Repository: apex-malhar
Updated Branches:
  refs/heads/release-3.5 9bcffafe4 -> ebd4f376d


APEXMALHAR-2227 Index out of Bound Exception fix in Kafka input


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/ebd4f376
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/ebd4f376
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/ebd4f376

Branch: refs/heads/release-3.5
Commit: ebd4f376d5502c2c2dd82f143887b0c1ac796f57
Parents: 9bcffaf
Author: Chandni Singh <csingh@apache.org>
Authored: Wed Sep 7 08:58:09 2016 -0700
Committer: Thomas Weise <thomas@datatorrent.com>
Committed: Wed Sep 7 12:59:47 2016 -0700

----------------------------------------------------------------------
 .../kafka/AbstractKafkaInputOperator.java       |  2 +-
 .../malhar/lib/wal/FSWindowDataManager.java     | 21 +++++++++++---------
 2 files changed, 13 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ebd4f376/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
index abf3fad..fc11bf7 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
@@ -621,7 +621,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer>
implem
     if (numPartitionsChanged) {
       List<WindowDataManager> managers = windowDataManager.partition(resultPartitions.size(),
deletedOperators);
       int i = 0;
-      for (Partition<AbstractKafkaInputOperator<K>> partition : partitions) {
+      for (Partition<AbstractKafkaInputOperator<K>> partition : resultPartitions)
{
         partition.getPartitionedInstance().setWindowDataManager(managers.get(i++));
       }
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/ebd4f376/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
index 81f6aa0..2b85580 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
@@ -237,15 +237,18 @@ public class FSWindowDataManager implements WindowDataManager
 
   protected void createReadOnlyWals() throws IOException
   {
-    RemoteIterator<FileStatus> operatorsIter = fileContext.listStatus(new Path(fullStatePath));
-    while (operatorsIter.hasNext()) {
-      FileStatus status = operatorsIter.next();
-      int operatorId = Integer.parseInt(status.getPath().getName());
-
-      if (operatorId != this.operatorId) {
-        //create read-only wal for other partitions
-        FSWindowReplayWAL wal = new FSWindowReplayWAL(true);
-        readOnlyWals.put(operatorId, wal);
+    Path statePath = new Path(fullStatePath);
+    if (fileContext.util().exists(statePath)) {
+      RemoteIterator<FileStatus> operatorsIter = fileContext.listStatus(statePath);
+      while (operatorsIter.hasNext()) {
+        FileStatus status = operatorsIter.next();
+        int operatorId = Integer.parseInt(status.getPath().getName());
+
+        if (operatorId != this.operatorId) {
+          //create read-only wal for other partitions
+          FSWindowReplayWAL wal = new FSWindowReplayWAL(true);
+          readOnlyWals.put(operatorId, wal);
+        }
       }
     }
   }


Mime
View raw message