storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/3] storm git commit: STORM-2691: storm-kafka-client Trident spout does not implement the Trident spout interface properly
Date Thu, 01 Mar 2018 10:36:08 GMT
Repository: storm
Updated Branches:
  refs/heads/master 93f9a1733 -> e8e1a4e8f


http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index 676cb3d..c0c10f8 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -40,7 +40,6 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
-import org.apache.storm.kafka.spout.subscription.Subscription;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.utils.Time;
@@ -49,7 +48,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
-import org.mockito.MockitoAnnotations;
 
 import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
 import static org.mockito.ArgumentMatchers.any;
@@ -58,8 +56,23 @@ import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.HashSet;
+import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.TopicAssigner;
+import org.apache.storm.kafka.spout.subscription.TopicFilter;
+import org.junit.Rule;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
 public class KafkaSpoutRebalanceTest {
 
+    @Rule
+    public MockitoRule mockito = MockitoJUnit.rule();
+
     @Captor
     private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
 
@@ -69,23 +82,32 @@ public class KafkaSpoutRebalanceTest {
     private SpoutOutputCollector collectorMock;
     private KafkaConsumer<String, String> consumerMock;
     private KafkaConsumerFactory<String, String> consumerFactory;
+    private TopicFilter topicFilterMock;
+    private ManualPartitioner partitionerMock;
 
     @Before
     public void setUp() {
-        MockitoAnnotations.initMocks(this);
         contextMock = mock(TopologyContext.class);
         collectorMock = mock(SpoutOutputCollector.class);
         consumerMock = mock(KafkaConsumer.class);
         consumerFactory = (kafkaSpoutConfig) -> consumerMock;
+        topicFilterMock = mock(TopicFilter.class);
+        when(topicFilterMock.getAllSubscribedPartitions(any()))
+            .thenReturn(new HashSet<>());
+        partitionerMock = mock(ManualPartitioner.class);
+        when(partitionerMock.getPartitionsForThisTask(any(), any()))
+            .thenReturn(new HashSet<>());
     }
 
     //Returns messageIds in order of emission
-    private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition, ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture) {
+    private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition, TopicAssigner topicAssigner) {
         //Setup spout with mock consumer so we can get at the rebalance listener   
         spout.open(conf, contextMock, collectorMock);
         spout.activate();
 
         //Assign partitions to the spout
+        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+        verify(topicAssigner).assignPartitions(any(), any(), rebalanceListenerCapture.capture());
         ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
         Set<TopicPartition> assignedPartitions = new HashSet<>();
         assignedPartitions.add(partitionThatWillBeRevoked);
@@ -123,21 +145,17 @@ public class KafkaSpoutRebalanceTest {
     public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception {
         //Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
-            Subscription subscriptionMock = mock(Subscription.class);
-            doNothing()
-                .when(subscriptionMock)
-                .subscribe(any(), rebalanceListenerCapture.capture(), any());
-            KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1)
+            TopicAssigner assignerMock = mock(TopicAssigner.class);
+            KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(topicFilterMock, partitionerMock, -1)
                 .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
-                .build(), consumerFactory);
+                .build(), consumerFactory, assignerMock);
             String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
             TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
             TopicPartition assignedPartition = new TopicPartition(topic, 2);
 
             //Emit a message on each partition and revoke the first partition
             List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(
-                spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture);
+                spout, partitionThatWillBeRevoked, assignedPartition, assignerMock);
 
             //Ack both emitted tuples
             spout.ack(emittedMessageIds.get(0));
@@ -159,16 +177,12 @@ public class KafkaSpoutRebalanceTest {
     @Test
     public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception {
         //Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass
-        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
-        Subscription subscriptionMock = mock(Subscription.class);
-        doNothing()
-            .when(subscriptionMock)
-            .subscribe(any(), rebalanceListenerCapture.capture(), any());
+        TopicAssigner assignerMock = mock(TopicAssigner.class);
         KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class);
-        KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1)
+        KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(topicFilterMock, partitionerMock, -1)
             .setOffsetCommitPeriodMs(10)
             .setRetry(retryServiceMock)
-            .build(), consumerFactory);
+            .build(), consumerFactory, assignerMock);
         String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
         TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
         TopicPartition assignedPartition = new TopicPartition(topic, 2);
@@ -179,7 +193,7 @@ public class KafkaSpoutRebalanceTest {
 
         //Emit a message on each partition and revoke the first partition
         List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(
-            spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture);
+            spout, partitionThatWillBeRevoked, assignedPartition, assignerMock);
 
         //Check that only two message ids were generated
         verify(retryServiceMock, times(2)).getMessageId(any(TopicPartition.class), anyLong());
@@ -200,14 +214,10 @@ public class KafkaSpoutRebalanceTest {
          * Previously assigned partitions should be left alone, since the spout keeps the emitted and acked state for those.
          */
 
-        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
-        Subscription subscriptionMock = mock(Subscription.class);
-        doNothing()
-            .when(subscriptionMock)
-            .subscribe(any(), rebalanceListenerCapture.capture(), any());
-        KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1)
+        TopicAssigner assignerMock = mock(TopicAssigner.class);
+        KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(topicFilterMock, partitionerMock, -1)
             .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
-            .build(), consumerFactory);
+            .build(), consumerFactory, assignerMock);
         String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
         TopicPartition assignedPartition = new TopicPartition(topic, 1);
         TopicPartition newPartition = new TopicPartition(topic, 2);
@@ -215,6 +225,9 @@ public class KafkaSpoutRebalanceTest {
         //Setup spout with mock consumer so we can get at the rebalance listener   
         spout.open(conf, contextMock, collectorMock);
         spout.activate();
+        
+        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+        verify(assignerMock).assignPartitions(any(), any(), rebalanceListenerCapture.capture());
 
         //Assign partitions to the spout
         ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
index 64b69b0..0b5c580 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
@@ -18,12 +18,10 @@ package org.apache.storm.kafka.spout;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -40,7 +38,6 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.InOrder;
-import org.mockito.MockitoAnnotations;
 
 import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
 import static org.mockito.ArgumentMatchers.anyList;
@@ -48,10 +45,17 @@ import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 
-import org.apache.storm.kafka.spout.subscription.Subscription;
+import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.TopicFilter;
+import org.junit.Rule;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
 
 public class KafkaSpoutRetryLimitTest {
     
+    @Rule
+    public MockitoRule mockito = MockitoJUnit.rule();
+
     private final long offsetCommitPeriodMs = 2_000;
     private final TopologyContext contextMock = mock(TopologyContext.class);
     private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
@@ -69,8 +73,7 @@ public class KafkaSpoutRetryLimitTest {
     
     @Before
     public void setUp() {
-        MockitoAnnotations.initMocks(this);
-        spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+        spoutConfig = createKafkaSpoutConfigBuilder(mock(TopicFilter.class), mock(ManualPartitioner.class), -1)
             .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
             .setRetry(ZERO_RETRIES_RETRY_SERVICE)
             .build();

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
index d92a3a7..6b8b94b 100755
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
@@ -82,7 +82,6 @@ public class MaxUncommittedOffsetTest {
         //This is to verify that a low maxPollRecords does not interfere with reemitting failed tuples
         //The spout must be able to reemit all retriable tuples, even if the maxPollRecords is set to a low value compared to maxUncommittedOffsets.
         assertThat("Current tests require maxPollRecords < maxUncommittedOffsets", maxPollRecords, lessThanOrEqualTo(maxUncommittedOffsets));
-        MockitoAnnotations.initMocks(this);
         spout = new KafkaSpout<>(spoutConfig);
         new KafkaConsumerFactoryDefault<String, String>().createConsumer(spoutConfig);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
index 05cfd28..2aed182 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
@@ -21,6 +21,7 @@ import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockingDetails;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -41,7 +42,9 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
-import org.apache.storm.kafka.spout.subscription.Subscription;
+import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.TopicAssigner;
+import org.apache.storm.kafka.spout.subscription.TopicFilter;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.mockito.ArgumentCaptor;
@@ -49,8 +52,8 @@ import org.mockito.ArgumentCaptor;
 public class SpoutWithMockedConsumerSetupHelper {
 
     /**
-     * Creates, opens and activates a KafkaSpout using a mocked consumer. The subscription should be a mock object, since this method skips
-     * the subscription and instead just configures the mocked consumer to act as if the specified partitions are assigned to it.
+     * Creates, opens and activates a KafkaSpout using a mocked consumer. The TopicFilter and ManualPartitioner should be mock objects,
+     * since this method shortcircuits the TopicPartition assignment process and just calls onPartitionsAssigned on the rebalance listener.
      *
      * @param <K> The Kafka key type
      * @param <V> The Kafka value type
@@ -64,22 +67,24 @@ public class SpoutWithMockedConsumerSetupHelper {
      */
     public static <K, V> KafkaSpout<K, V> setupSpout(KafkaSpoutConfig<K, V> spoutConfig, Map<String, Object> topoConf,
         TopologyContext contextMock, SpoutOutputCollector collectorMock, KafkaConsumer<K, V> consumerMock, TopicPartition... assignedPartitions) {
-        Subscription subscriptionMock = spoutConfig.getSubscription();
-        if (!mockingDetails(subscriptionMock).isMock()) {
-            throw new IllegalStateException("Use a mocked subscription when using this method, it helps avoid complex stubbing");
+        TopicFilter topicFilter = spoutConfig.getTopicFilter();
+        ManualPartitioner topicPartitioner = spoutConfig.getTopicPartitioner();
+        if (!mockingDetails(topicFilter).isMock() || !mockingDetails(topicPartitioner).isMock()) {
+            throw new IllegalStateException("Use a mocked TopicFilter and a mocked ManualPartitioner when using this method, it helps avoid complex stubbing");
         }
         
         Set<TopicPartition> assignedPartitionsSet = new HashSet<>(Arrays.asList(assignedPartitions));
         
-        when(consumerMock.assignment()).thenReturn(assignedPartitionsSet);
+        TopicAssigner assigner = mock(TopicAssigner.class);
         doAnswer(invocation -> {
-            ConsumerRebalanceListener listener = invocation.getArgument(1);
+            ConsumerRebalanceListener listener = invocation.getArgument(2);
             listener.onPartitionsAssigned(assignedPartitionsSet);
             return null;
-        }).when(subscriptionMock).subscribe(any(), any(ConsumerRebalanceListener.class), any());
+        }).when(assigner).assignPartitions(any(), any(), any());
+        when(consumerMock.assignment()).thenReturn(assignedPartitionsSet);
         
         KafkaConsumerFactory<K, V> consumerFactory = (kafkaSpoutConfig) -> consumerMock;
-        KafkaSpout<K, V> spout = new KafkaSpout<>(spoutConfig, consumerFactory);
+        KafkaSpout<K, V> spout = new KafkaSpout<>(spoutConfig, consumerFactory, assigner);
         
         spout.open(topoConf, contextMock, collectorMock);
         spout.activate();

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
index 3670d8a..4896267 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
@@ -24,7 +24,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
-import org.apache.storm.kafka.spout.subscription.Subscription;
+import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.TopicFilter;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 
@@ -37,8 +38,8 @@ public class SingleTopicKafkaSpoutConfiguration {
         return setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC));
     }
 
-    public static KafkaSpoutConfig.Builder<String, String> createKafkaSpoutConfigBuilder(Subscription subscription, int port) {
-        return setCommonSpoutConfig(new KafkaSpoutConfig.Builder<>("127.0.0.1:" + port, subscription));
+    public static KafkaSpoutConfig.Builder<String, String> createKafkaSpoutConfigBuilder(TopicFilter topicFilter, ManualPartitioner topicPartitioner, int port) {
+        return setCommonSpoutConfig(new KafkaSpoutConfig.Builder<>("127.0.0.1:" + port, topicFilter, topicPartitioner));
     }
 
     public static KafkaSpoutConfig.Builder<String, String> setCommonSpoutConfig(KafkaSpoutConfig.Builder<String, String> config) {

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java
deleted file mode 100644
index 2355283..0000000
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.subscription;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.Mockito.clearInvocations;
-import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.task.TopologyContext;
-import org.junit.Test;
-import org.mockito.InOrder;
-
-public class ManualPartitionSubscriptionTest {
-
-    @Test
-    public void testCanReassignPartitions() {
-        ManualPartitioner partitionerMock = mock(ManualPartitioner.class);
-        TopicFilter filterMock = mock(TopicFilter.class);
-        KafkaConsumer<String, String> consumerMock = mock(KafkaConsumer.class);
-        ConsumerRebalanceListener listenerMock = mock(ConsumerRebalanceListener.class);
-        TopologyContext contextMock = mock(TopologyContext.class);
-        ManualPartitionSubscription subscription = new ManualPartitionSubscription(partitionerMock, filterMock);
-        
-        List<TopicPartition> onePartition = Collections.singletonList(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0));
-        List<TopicPartition> twoPartitions = new ArrayList<>();
-        twoPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0));
-        twoPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1));
-        when(partitionerMock.partition(anyList(), any(TopologyContext.class)))
-            .thenReturn(onePartition)
-            .thenReturn(twoPartitions);
-        
-        //Set the first assignment
-        subscription.subscribe(consumerMock, listenerMock, contextMock);
-        
-        InOrder inOrder = inOrder(consumerMock, listenerMock);
-        inOrder.verify(consumerMock).assign(new HashSet<>(onePartition));
-        inOrder.verify(listenerMock).onPartitionsAssigned(new HashSet<>(onePartition));
-        
-        clearInvocations(consumerMock, listenerMock);
-        
-        when(consumerMock.assignment()).thenReturn(new HashSet<>(onePartition));
-        
-        //Update to set the second assignment
-        subscription.refreshAssignment();
-        
-        //The partition revocation hook must be called before the new partitions are assigned to the consumer,
-        //to allow the revocation hook to commit offsets for the revoked partitions.
-        inOrder.verify(listenerMock).onPartitionsRevoked(new HashSet<>(onePartition));
-        inOrder.verify(consumerMock).assign(new HashSet<>(twoPartitions));
-        inOrder.verify(listenerMock).onPartitionsAssigned(new HashSet<>(twoPartitions));
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
index 3985619..a30a23a 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.when;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -54,7 +55,7 @@ public class NamedTopicFilterTest {
         when(consumerMock.partitionsFor(matchingTopicTwo)).thenReturn(partitionTwoPartitions);
         when(consumerMock.partitionsFor(unmatchedTopic)).thenReturn(Collections.singletonList(createPartitionInfo(unmatchedTopic, 0)));
         
-        List<TopicPartition> matchedPartitions = filter.getFilteredTopicPartitions(consumerMock);
+        Set<TopicPartition> matchedPartitions = filter.getAllSubscribedPartitions(consumerMock);
         
         assertThat("Expected filter to pass only topics with exact name matches", matchedPartitions, 
             containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1)));

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java
index 67411e3..01e0b3d 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.regex.Pattern;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
@@ -61,7 +62,7 @@ public class PatternTopicFilterTest {
         
         when(consumerMock.listTopics()).thenReturn(allTopics);
         
-        List<TopicPartition> matchedPartitions = filter.getFilteredTopicPartitions(consumerMock);
+        Set<TopicPartition> matchedPartitions = filter.getAllSubscribedPartitions(consumerMock);
         
         assertThat("Expected topic partitions matching the pattern to be passed by the filter", matchedPartitions,
             containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1)));

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitionerTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitionerTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitionerTest.java
new file mode 100644
index 0000000..f4deeba
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitionerTest.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.task.TopologyContext;
+import org.junit.Test;
+
+public class RoundRobinManualPartitionerTest {
+
+    private TopicPartition createTp(int partition) {
+        return new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, partition);
+    }
+    
+    private Set<TopicPartition> partitionsToTps(int[] expectedPartitions) {
+        Set<TopicPartition> expectedTopicPartitions = new HashSet<>();
+        for(int i = 0; i < expectedPartitions.length; i++) {
+            expectedTopicPartitions.add(createTp(expectedPartitions[i]));
+        }
+        return expectedTopicPartitions;
+    }
+    
+    @Test
+    public void testRoundRobinPartitioning() {
+        List<TopicPartition> allPartitions = new ArrayList<>();
+        for(int i = 0; i < 11; i++) {
+            allPartitions.add(createTp(i));
+        }
+        List<TopologyContext> contextMocks = new ArrayList<>();
+        String thisComponentId = "A spout";
+        List<Integer> allTasks = Arrays.asList(new Integer[]{0, 1, 2});
+        for(int i = 0; i < 3; i++) {
+            TopologyContext contextMock = mock(TopologyContext.class);
+            when(contextMock.getThisTaskIndex()).thenReturn(i);
+            when(contextMock.getThisComponentId()).thenReturn(thisComponentId);
+            when(contextMock.getComponentTasks(thisComponentId)).thenReturn(allTasks);
+            contextMocks.add(contextMock);
+        }
+        RoundRobinManualPartitioner partitioner = new RoundRobinManualPartitioner();
+        
+        Set<TopicPartition> partitionsForFirstTask = partitioner.getPartitionsForThisTask(allPartitions, contextMocks.get(0));
+        assertThat(partitionsForFirstTask, is(partitionsToTps(new int[]{0, 3, 6, 9})));
+        
+        Set<TopicPartition> partitionsForSecondTask = partitioner.getPartitionsForThisTask(allPartitions, contextMocks.get(1));
+        assertThat(partitionsForSecondTask, is(partitionsToTps(new int[]{1, 4, 7, 10})));
+        
+        Set<TopicPartition> partitionsForThirdTask = partitioner.getPartitionsForThisTask(allPartitions, contextMocks.get(2));
+        assertThat(partitionsForThirdTask, is(partitionsToTps(new int[]{2, 5, 8})));
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/TopicAssignerTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/TopicAssignerTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/TopicAssignerTest.java
new file mode 100644
index 0000000..96bbc1c
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/TopicAssignerTest.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
+import org.junit.Test;
+import org.mockito.InOrder;
+
+public class TopicAssignerTest {
+
+    @Test
+    public void testCanReassignPartitions() {    
+        Set<TopicPartition> onePartition = Collections.singleton(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0));
+        Set<TopicPartition> twoPartitions = new HashSet<>();
+        twoPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0));
+        twoPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1));
+        KafkaConsumer<String, String> consumerMock = mock(KafkaConsumer.class);
+        ConsumerRebalanceListener listenerMock = mock(ConsumerRebalanceListener.class);
+        TopicAssigner assigner = new TopicAssigner();
+        
+        //Set the first assignment
+        assigner.assignPartitions(consumerMock, onePartition, listenerMock);
+        
+        InOrder inOrder = inOrder(consumerMock, listenerMock);
+        inOrder.verify(listenerMock).onPartitionsRevoked(Collections.emptySet());
+        inOrder.verify(consumerMock).assign(new HashSet<>(onePartition));
+        inOrder.verify(listenerMock).onPartitionsAssigned(new HashSet<>(onePartition));
+        
+        clearInvocations(consumerMock, listenerMock);
+        
+        when(consumerMock.assignment()).thenReturn(new HashSet<>(onePartition));
+        
+        //Update to set the second assignment
+        assigner.assignPartitions(consumerMock, twoPartitions, listenerMock);
+        
+        //The partition revocation hook must be called before the new partitions are assigned to the consumer,
+        //to allow the revocation hook to commit offsets for the revoked partitions.
+        inOrder.verify(listenerMock).onPartitionsRevoked(new HashSet<>(onePartition));
+        inOrder.verify(consumerMock).assign(new HashSet<>(twoPartitions));
+        inOrder.verify(listenerMock).onPartitionsAssigned(new HashSet<>(twoPartitions));
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
index a5c78a8..a15e415 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
@@ -19,9 +19,9 @@ package org.apache.storm.kafka.spout.trident;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 
-import java.util.Collections;
+import java.util.List;
 import java.util.Map;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.SpoutWithMockedConsumerSetupHelper;
 import org.json.simple.JSONValue;
@@ -38,27 +38,24 @@ public class KafkaTridentSpoutBatchMetadataTest {
          * It is important that all map entries are types json-simple knows about,
          * since otherwise the library just calls toString on them which will likely produce invalid JSON.
          */
-        TopicPartition tp = new TopicPartition("topic", 0);
         long startOffset = 10;
         long endOffset = 20;
 
-        KafkaTridentSpoutBatchMetadata metadata = new KafkaTridentSpoutBatchMetadata(tp, startOffset, endOffset);
+        KafkaTridentSpoutBatchMetadata metadata = new KafkaTridentSpoutBatchMetadata(startOffset, endOffset);
         Map<String, Object> map = metadata.toMap();
         Map deserializedMap = (Map)JSONValue.parseWithException(JSONValue.toJSONString(map));
         KafkaTridentSpoutBatchMetadata deserializedMetadata = KafkaTridentSpoutBatchMetadata.fromMap(deserializedMap);
-        assertThat(deserializedMetadata.getTopicPartition(), is(metadata.getTopicPartition()));
         assertThat(deserializedMetadata.getFirstOffset(), is(metadata.getFirstOffset()));
         assertThat(deserializedMetadata.getLastOffset(), is(metadata.getLastOffset()));
     }
 
     @Test
     public void testCreateMetadataFromRecords() {
-        TopicPartition tp = new TopicPartition("topic", 0);
         long firstOffset = 15;
         long lastOffset = 55;
-        ConsumerRecords<?, ?> records = new ConsumerRecords<>(Collections.singletonMap(tp, SpoutWithMockedConsumerSetupHelper.createRecords(tp, firstOffset, (int) (lastOffset - firstOffset + 1))));
+        List<ConsumerRecord<String, String>> records = SpoutWithMockedConsumerSetupHelper.createRecords(new TopicPartition("test", 0), firstOffset, (int) (lastOffset - firstOffset + 1));
 
-        KafkaTridentSpoutBatchMetadata metadata = new KafkaTridentSpoutBatchMetadata(tp, records);
+        KafkaTridentSpoutBatchMetadata metadata = new KafkaTridentSpoutBatchMetadata(records);
         assertThat("The first offset should be the first offset in the record set", metadata.getFirstOffset(), is(firstOffset));
         assertThat("The last offset should be the last offset in the record set", metadata.getLastOffset(), is(lastOffset));
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
new file mode 100644
index 0000000..6208ce4
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.SpoutWithMockedConsumerSetupHelper;
+import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.TopicAssigner;
+import org.apache.storm.kafka.spout.subscription.TopicFilter;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.topology.TransactionAttempt;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InOrder;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+public class KafkaTridentSpoutEmitterTest {
+    
+    @Rule
+    public MockitoRule mockito = MockitoJUnit.rule();
+    
+    @Captor
+    public ArgumentCaptor<Collection<TopicPartition>> assignmentCaptor;
+    
+    private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
+    
+    @Test
+    public void testGetOrderedPartitionsIsConsistent() {
+        KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
+            SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
+                .build(),
+            mock(TopologyContext.class),
+            mock(KafkaConsumerFactory.class), new TopicAssigner());
+        
+        Set<TopicPartition> allPartitions = new HashSet<>();
+        int numPartitions = 10;
+        for (int i = 0; i < numPartitions; i++) {
+            allPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, i));
+        }
+        List<Map<String, Object>> serializedPartitions = allPartitions.stream()
+            .map(tp -> tpSerializer.toMap(tp))
+            .collect(Collectors.toList());
+        
+        List<KafkaTridentSpoutTopicPartition> orderedPartitions = emitter.getOrderedPartitions(serializedPartitions);
+        assertThat("Should contain all partitions", orderedPartitions.size(), is(allPartitions.size()));
+        Collections.shuffle(serializedPartitions);
+        List<KafkaTridentSpoutTopicPartition> secondGetOrderedPartitions = emitter.getOrderedPartitions(serializedPartitions);
+        assertThat("Ordering must be consistent", secondGetOrderedPartitions, is(orderedPartitions));
+        
+        serializedPartitions.add(tpSerializer.toMap(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, numPartitions)));
+        List<KafkaTridentSpoutTopicPartition> orderedPartitionsWithNewPartition = emitter.getOrderedPartitions(serializedPartitions);
+        orderedPartitionsWithNewPartition.remove(orderedPartitionsWithNewPartition.size() - 1);
+        assertThat("Adding new partitions should not shuffle the existing ordering", orderedPartitionsWithNewPartition, is(orderedPartitions));
+    }
+    
+    @Test
+    public void testGetPartitionsForTask() {
+        //Verify correct wrapping/unwrapping of partition and delegation of partition assignment
+        ManualPartitioner partitionerMock = mock(ManualPartitioner.class);
+        when(partitionerMock.getPartitionsForThisTask(any(), any()))
+            .thenAnswer(invocation -> {
+                List<TopicPartition> partitions = new ArrayList<>((List<TopicPartition>) invocation.getArguments()[0]);
+                partitions.remove(0);
+                return new HashSet<>(partitions);
+            });
+        
+        KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
+            SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(mock(TopicFilter.class), partitionerMock, -1)
+                .build(),
+            mock(TopologyContext.class),
+            mock(KafkaConsumerFactory.class), new TopicAssigner());
+        
+        List<KafkaTridentSpoutTopicPartition> allPartitions = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            allPartitions.add(new KafkaTridentSpoutTopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, i));
+        }
+        List<TopicPartition> unwrappedPartitions = allPartitions.stream()
+            .map(kttp -> kttp.getTopicPartition())
+            .collect(Collectors.toList());
+        
+        List<KafkaTridentSpoutTopicPartition> partitionsForTask = emitter.getPartitionsForTask(0, 2, allPartitions);
+        verify(partitionerMock).getPartitionsForThisTask(eq(unwrappedPartitions), any(TopologyContext.class));
+        allPartitions.remove(0);
+        assertThat("Should have assigned all except the first partition to this task", new HashSet<>(partitionsForTask), is(new HashSet<>(allPartitions)));
+    }
+    
+    @Test
+    public void testAssignPartitions() {
+        //Verify correct unwrapping of partitions and delegation of assignment
+        KafkaConsumer<String, String> consumerMock = mock(KafkaConsumer.class);
+        KafkaConsumerFactory<String, String> consumerFactory = spoutConfig -> consumerMock;
+        TopicAssigner assignerMock = mock(TopicAssigner.class);
+        
+        KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
+            SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
+                .build(),
+            mock(TopologyContext.class),
+            consumerFactory, assignerMock);
+        
+        List<KafkaTridentSpoutTopicPartition> allPartitions = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            allPartitions.add(new KafkaTridentSpoutTopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, i));
+        }
+        Set<TopicPartition> unwrappedPartitions = allPartitions.stream()
+            .map(kttp -> kttp.getTopicPartition())
+            .collect(Collectors.toSet());
+        
+        emitter.refreshPartitions(allPartitions);
+        
+        verify(assignerMock).assignPartitions(any(KafkaConsumer.class), eq(unwrappedPartitions), any(ConsumerRebalanceListener.class));
+    }
+    
+    private Map<String, Object> doEmitBatchTest(KafkaConsumer<String, String> consumerMock, TridentCollector collectorMock, TopicPartition tp, long firstOffset, int numRecords, Map<String, Object> previousBatchMeta) {
+        when(consumerMock.assignment()).thenReturn(Collections.singleton(tp));
+        when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(
+            tp, SpoutWithMockedConsumerSetupHelper.createRecords(tp, firstOffset, numRecords))));
+        KafkaConsumerFactory<String, String> consumerFactory = spoutConfig -> consumerMock;
+        
+        KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
+            SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
+                .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
+                .build(),
+            mock(TopologyContext.class),
+            consumerFactory, new TopicAssigner());
+        
+        TransactionAttempt txid = new TransactionAttempt(10L, 0);
+        KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(tp);
+        return emitter.emitPartitionBatch(txid, collectorMock, kttp, previousBatchMeta);
+    }
+    
+    @Test
+    public void testEmitBatchWithNullMeta() {
+        //Check that null meta makes the spout seek according to FirstPollOffsetStrategy, and that the returned meta is correct
+        KafkaConsumer<String, String> consumerMock = mock(KafkaConsumer.class);
+        TridentCollector collectorMock = mock(TridentCollector.class);
+        TopicPartition tp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
+        long firstOffset = 0;
+        int numRecords = 10;
+        Map<String, Object> batchMeta = doEmitBatchTest(consumerMock, collectorMock, tp, firstOffset, numRecords, null);
+        
+        InOrder inOrder = inOrder(consumerMock, collectorMock);
+        inOrder.verify(consumerMock).seekToBeginning(Collections.singleton(tp));
+        inOrder.verify(consumerMock).poll(anyLong());
+        inOrder.verify(collectorMock, times(numRecords)).emit(anyList());
+        KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(batchMeta);
+        assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstOffset));
+        assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(firstOffset + numRecords - 1));
+    }
+    
+    @Test
+    public void testEmitBatchWithPreviousMeta() {
+        //Check that non-null meta makes the spout seek according to the provided metadata, and that the returned meta is correct
+        KafkaConsumer<String, String> consumerMock = mock(KafkaConsumer.class);
+        TridentCollector collectorMock = mock(TridentCollector.class);
+        TopicPartition tp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
+        long firstOffset = 50;
+        int numRecords = 10;
+        KafkaTridentSpoutBatchMetadata previousBatchMeta = new KafkaTridentSpoutBatchMetadata(0, firstOffset - 1);
+        Map<String, Object> batchMeta = doEmitBatchTest(consumerMock, collectorMock, tp, firstOffset, numRecords, previousBatchMeta.toMap());
+        
+        InOrder inOrder = inOrder(consumerMock, collectorMock);
+        inOrder.verify(consumerMock).seek(tp, firstOffset);
+        inOrder.verify(consumerMock).poll(anyLong());
+        inOrder.verify(collectorMock, times(numRecords)).emit(anyList());
+        KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(batchMeta);
+        assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstOffset));
+        assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(firstOffset + numRecords - 1));
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinatorTest.java
new file mode 100644
index 0000000..1abe551
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinatorTest.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.TopicFilter;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Test;
+
+public class KafkaTridentSpoutOpaqueCoordinatorTest {
+
+    private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
+
+    @Test
+    public void testCanGetPartitions() {
+        KafkaConsumer<String, String> mockConsumer = mock(KafkaConsumer.class);
+        TopicPartition expectedPartition = new TopicPartition("test", 0);
+        TopicFilter mockFilter = mock(TopicFilter.class);
+        when(mockFilter.getAllSubscribedPartitions(any())).thenReturn(Collections.singleton(expectedPartition));
+
+        KafkaSpoutConfig<String, String> spoutConfig = 
+            SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(mockFilter, mock(ManualPartitioner.class), -1)
+                .build();
+        KafkaTridentSpoutOpaqueCoordinator<String, String> coordinator = new KafkaTridentSpoutOpaqueCoordinator<>(spoutConfig, ignored -> mockConsumer);
+
+        List<Map<String, Object>> partitionsForBatch = coordinator.getPartitionsForBatch();
+
+        List<TopicPartition> tps = deserializePartitions(partitionsForBatch);
+
+        verify(mockFilter).getAllSubscribedPartitions(mockConsumer);
+        assertThat(tps, contains(expectedPartition));
+
+    }
+
+    @Test
+    public void testCanUpdatePartitions() {
+        try (SimulatedTime time = new SimulatedTime()) {
+            KafkaConsumer<String, String> mockConsumer = mock(KafkaConsumer.class);
+            TopicPartition expectedPartition = new TopicPartition("test", 0);
+            TopicPartition addedLaterPartition = new TopicPartition("test-2", 0);
+            HashSet<TopicPartition> allPartitions = new HashSet<>();
+            allPartitions.add(expectedPartition);
+            allPartitions.add(addedLaterPartition);
+            TopicFilter mockFilter = mock(TopicFilter.class);
+            when(mockFilter.getAllSubscribedPartitions(any()))
+                .thenReturn(Collections.singleton(expectedPartition))
+                .thenReturn(allPartitions);
+
+            KafkaSpoutConfig<String, String> spoutConfig = 
+                SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(mockFilter, mock(ManualPartitioner.class), -1)
+                    .build();
+            KafkaTridentSpoutOpaqueCoordinator<String, String> coordinator = new KafkaTridentSpoutOpaqueCoordinator<>(spoutConfig, ignored -> mockConsumer);
+
+            List<Map<String, Object>> partitionsForBatch = coordinator.getPartitionsForBatch();
+
+            List<TopicPartition> firstBatchTps = deserializePartitions(partitionsForBatch);
+            
+            verify(mockFilter).getAllSubscribedPartitions(mockConsumer);
+            assertThat(firstBatchTps, contains(expectedPartition));
+
+            Time.advanceTime(KafkaTridentSpoutOpaqueCoordinator.TIMER_DELAY_MS + spoutConfig.getPartitionRefreshPeriodMs());
+
+            List<Map<String, Object>> partitionsForSecondBatch = coordinator.getPartitionsForBatch();
+            
+            List<TopicPartition> secondBatchTps = deserializePartitions(partitionsForSecondBatch);
+            verify(mockFilter, times(2)).getAllSubscribedPartitions(mockConsumer);
+            assertThat(new HashSet<>(secondBatchTps), is(allPartitions));
+
+        }
+    }
+
+    private List<TopicPartition> deserializePartitions(List<Map<String, Object>> tps) {
+        return tps.stream()
+            .map(map -> tpSerializer.fromMap(map))
+            .collect(Collectors.toList());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2260d66..3886a82 100644
--- a/pom.xml
+++ b/pom.xml
@@ -896,12 +896,12 @@
                 <artifactId>guava</artifactId>
                 <version>${guava.version}</version>
             </dependency>
-            <dependency>
-                <groupId>com.google.auto.service</groupId>
-                <artifactId>auto-service</artifactId>
-                <version>${auto-service.version}</version>
-                <optional>true</optional>
-            </dependency>
+                <dependency>
+                    <groupId>com.google.auto.service</groupId>
+                    <artifactId>auto-service</artifactId>
+                    <version>${auto-service.version}</version>
+                    <optional>true</optional>
+                </dependency>
             <dependency>
                 <groupId>org.apache.logging.log4j</groupId>
                 <artifactId>log4j-api</artifactId>

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/storm-client/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java b/storm-client/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
index 5fe3c65..8dd1301 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
@@ -72,23 +72,30 @@ public interface IOpaquePartitionedTridentSpout<Partitions, Partition extends IS
         /**
          * This method is called when this task is responsible for a new set of partitions. Should be used
          * to manage things like connections to brokers.
+         * @param partitionResponsibilities The partitions assigned to this task
          */        
         void refreshPartitions(List<Partition> partitionResponsibilities);
 
         /**
-         * @return The oredered list of partitions being processed by all the tasks
+         * Sorts the partition info to produce an ordered list of partition.
+         * @param allPartitionInfo The partition info for all partitions being processed by all spout tasks
+         * @return The ordered list of partitions being processed by all the tasks. The ordering must be consistent for all tasks.
          */
         List<Partition> getOrderedPartitions(Partitions allPartitionInfo);
 
         /**
+         * Get the partitions assigned to this task.
+         * @param taskId The id of this task
+         * @param numTasks The number of tasks for this spout
+         * @param allPartitionInfoSorted The partition info for all partitions being processed by all spout tasks,
+         *     sorted according to {@link #getOrderedPartitions(java.lang.Object)}
          * @return The list of partitions that are to be processed by the task with id {@code taskId}
          */
-        default List<Partition> getPartitionsForTask(int taskId, int numTasks, Partitions allPartitionInfo){
-            final List<Partition> orderedPartitions = getOrderedPartitions(allPartitionInfo);
-            final List<Partition> taskPartitions = new ArrayList<>(orderedPartitions == null ? 0 : orderedPartitions.size());
-            if (orderedPartitions != null) {
-                for (int i = taskId; i < orderedPartitions.size(); i += numTasks) {
-                    taskPartitions.add(orderedPartitions.get(i));
+        default List<Partition> getPartitionsForTask(int taskId, int numTasks, List<Partition> allPartitionInfoSorted){
+            final List<Partition> taskPartitions = new ArrayList<>(allPartitionInfoSorted == null ? 0 : allPartitionInfoSorted.size());
+            if (allPartitionInfoSorted != null) {
+                for (int i = taskId; i < allPartitionInfoSorted.size(); i += numTasks) {
+                    taskPartitions.add(allPartitionInfoSorted.get(i));
                 }
             }
             return taskPartitions;

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
index e7bf70a..f381318 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
@@ -111,13 +111,13 @@ public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentS
 
             if(_savedCoordinatorMeta==null || !_savedCoordinatorMeta.equals(coordinatorMeta)) {
                 _partitionStates.clear();
-                final List<ISpoutPartition> taskPartitions = _emitter.getPartitionsForTask(_index, _numTasks, coordinatorMeta);
+                final List<ISpoutPartition> sortedPartitions = _emitter.getOrderedPartitions(coordinatorMeta);
+                final List<ISpoutPartition> taskPartitions = _emitter.getPartitionsForTask(_index, _numTasks, sortedPartitions);
                 for (ISpoutPartition partition : taskPartitions) {
                     _partitionStates.put(partition.getId(), new EmitterPartitionState(new RotatingTransactionalState(_state, partition.getId()), partition));
                 }
+                _emitter.refreshPartitions(taskPartitions);
 
-                // refresh all partitions for backwards compatibility with old spout
-                _emitter.refreshPartitions(_emitter.getOrderedPartitions(coordinatorMeta));
                 _savedCoordinatorMeta = coordinatorMeta;
                 _changedMeta = true;
             }
@@ -137,7 +137,9 @@ public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentS
                 EmitterPartitionState s = e.getValue();
                 s.rotatingState.removeState(tx.getTransactionId());
                 Object lastMeta = prevCached.get(id);
-                if(lastMeta==null) lastMeta = s.rotatingState.getLastState();
+                if(lastMeta==null) {
+                    lastMeta = s.rotatingState.getLastState();
+                }
                 Object meta = _emitter.emitPartitionBatch(tx, collector, s.partition, lastMeta);
                 metas.put(id, meta);
             }


Mime
View raw message