apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [1/7] incubator-apex-malhar git commit: APEXMALHAR-1970 #resolve #comment Fix the ArrayOutOfBoundaryException and add a bunch of tests for both one_to_one and one_to_many partition
Date Sat, 20 Feb 2016 08:56:47 GMT
Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/release-3.3 6ad18e8ca -> 040f8f777


APEXMALHAR-1970 #resolve #comment Fix the ArrayOutOfBoundaryException and add a bunch of tests
for both one_to_one and one_to_many partition


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/cd183db8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/cd183db8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/cd183db8

Branch: refs/heads/release-3.3
Commit: cd183db8f4b6427dedb1bdda869c2a6b7042f430
Parents: 6ad18e8
Author: Siyuan Hua <hsy541@apache.org>
Authored: Mon Jan 11 22:09:25 2016 -0800
Committer: Thomas Weise <thomas@datatorrent.com>
Committed: Sat Feb 20 00:13:02 2016 -0800

----------------------------------------------------------------------
 .../malhar/kafka/AbstractKafkaPartitioner.java  |  4 ++--
 .../apex/malhar/kafka/OneToManyPartitioner.java |  4 ++--
 .../malhar/kafka/KafkaInputOperatorTest.java    | 23 ++++++++++++--------
 3 files changed, 18 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cd183db8/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
index e01c38e..c708145 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -259,13 +259,13 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
       }
       PartitionMeta that = (PartitionMeta)o;
       return Objects.equals(cluster, that.cluster) &&
-        Objects.equals(topicPartition, that.topicPartition);
+        Objects.equals(getTopicPartition(), that.getTopicPartition());
     }
 
     @Override
     public int hashCode()
     {
-      return Objects.hash(cluster, topicPartition);
+      return Objects.hash(cluster, getTopicPartition());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cd183db8/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
index 16f4da4..bcd3073 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/OneToManyPartitioner.java
@@ -54,8 +54,8 @@ public class OneToManyPartitioner extends AbstractKafkaPartitioner
       for (Map.Entry<String, List<PartitionInfo>> topicPartition : clusterMap.getValue().entrySet())
{
         for (PartitionInfo pif : topicPartition.getValue()) {
           int index = i++ % partitionCount;
-          if (eachPartitionAssignment.get(index) == null) {
-            eachPartitionAssignment.add(index, new HashSet<PartitionMeta>());
+          if (index >= eachPartitionAssignment.size()) {
+            eachPartitionAssignment.add(new HashSet<PartitionMeta>());
           }
           eachPartitionAssignment.get(index).add(new PartitionMeta(clusterMap.getKey(), topicPartition.getKey(),
pif.partition()));
         }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/cd183db8/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
index 17bc465..d055555 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
@@ -48,26 +48,30 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
 
   private int totalBrokers = 0;
 
+  private String partition = null;
 
-
-  @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}")
-  public static Collection<Boolean[]> testScenario()
+  @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition:
{2}")
+  public static Collection<Object[]> testScenario()
   {
-    return Arrays.asList(new Boolean[][]{{true, false}, // multi cluster with single partition
-      {true, true}, // multi cluster with multi partitions
-      {false, true}, // single cluster with multi partitions
-      {false, false}, // single cluster with single partitions
+    return Arrays.asList(new Object[][]{{true, false, "one_to_one"},// multi cluster with
single partition
+      {true, false, "one_to_many"},
+      {true, true, "one_to_one"},// multi cluster with multi partitions
+      {true, true, "one_to_many"},
+      {false, true, "one_to_one"}, // single cluster with multi partitions
+      {false, true, "one_to_many"},
+      {false, false, "one_to_one"}, // single cluster with single partitions
+      {false, false, "one_to_many"}
     });
   }
 
-  public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition)
+  public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition, String
partition)
   {
     // This class want to initialize several kafka brokers for multiple partitions
     this.hasMultiCluster = hasMultiCluster;
     this.hasMultiPartition = hasMultiPartition;
     int cluster = 1 + (hasMultiCluster ? 1 : 0);
     totalBrokers = (1 + (hasMultiPartition ? 1 : 0)) * cluster;
-
+    this.partition = partition;
   }
 
   private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class);
@@ -167,6 +171,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
     node.setTopics(TEST_TOPIC);
     node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
     node.setClusters(getClusterConfig());
+    node.setStrategy(partition);
 
     // Create Test tuple collector
     CollectorModule collector = dag.addOperator("TestMessageCollector", new CollectorModule());


Mime
View raw message