kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [3/8] kafka git commit: MINOR: remove Kafka Streams in 0.9.0
Date Tue, 10 Nov 2015 00:27:29 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
deleted file mode 100644
index 2a5ca9b..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import static org.apache.kafka.common.utils.Utils.mkSet;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-
-public class DefaultPartitionGrouperTest {
-
-    private List<PartitionInfo> infos = Arrays.asList(
-            new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]),
-            new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0])
-    );
-
-    private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
-
-    @Test
-    public void testGrouping() {
-        PartitionGrouper grouper = new DefaultPartitionGrouper();
-        int topicGroupId;
-        Map<TaskId, Set<TopicPartition>> expected;
-        Map<Integer, Set<String>> topicGroups;
-
-        topicGroups = new HashMap<>();
-        topicGroups.put(0, mkSet("topic1"));
-        topicGroups.put(1, mkSet("topic2"));
-        grouper.topicGroups(topicGroups);
-
-        expected = new HashMap<>();
-        topicGroupId = 0;
-        expected.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic1", 0)));
-        expected.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic1", 1)));
-        expected.put(new TaskId(topicGroupId, 2), mkSet(new TopicPartition("topic1", 2)));
-        topicGroupId++;
-        expected.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic2", 0)));
-        expected.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic2", 1)));
-
-        assertEquals(expected, grouper.partitionGroups(metadata));
-
-        topicGroups = new HashMap<>();
-        topicGroups.put(0, mkSet("topic1", "topic2"));
-        grouper.topicGroups(topicGroups);
-
-        expected = new HashMap<>();
-        topicGroupId = 0;
-        expected.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0)));
-        expected.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic1", 1), new TopicPartition("topic2", 1)));
-        expected.put(new TaskId(topicGroupId, 2), mkSet(new TopicPartition("topic1", 2)));
-
-        assertEquals(expected, grouper.partitionGroups(metadata));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
deleted file mode 100644
index b1b71b6..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-import static org.junit.Assert.assertEquals;
-
-import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.apache.kafka.common.utils.Utils.mkList;
-
-import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockStateStoreSupplier;
-import org.junit.Test;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class TopologyBuilderTest {
-
-    @Test(expected = TopologyException.class)
-    public void testAddSourceWithSameName() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSource("source", "topic-1");
-        builder.addSource("source", "topic-2");
-    }
-
-    @Test(expected = TopologyException.class)
-    public void testAddSourceWithSameTopic() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSource("source", "topic-1");
-        builder.addSource("source-2", "topic-1");
-    }
-
-    @Test(expected = TopologyException.class)
-    public void testAddProcessorWithSameName() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSource("source", "topic-1");
-        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-    }
-
-    @Test(expected = TopologyException.class)
-    public void testAddProcessorWithWrongParent() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-    }
-
-    @Test(expected = TopologyException.class)
-    public void testAddProcessorWithSelfParent() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addProcessor("processor", new MockProcessorSupplier(), "processor");
-    }
-
-    @Test(expected = TopologyException.class)
-    public void testAddSinkWithSameName() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSource("source", "topic-1");
-        builder.addSink("sink", "topic-2", "source");
-        builder.addSink("sink", "topic-3", "source");
-    }
-
-    @Test(expected = TopologyException.class)
-    public void testAddSinkWithWrongParent() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSink("sink", "topic-2", "source");
-    }
-
-    @Test(expected = TopologyException.class)
-    public void testAddSinkWithSelfParent() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSink("sink", "topic-2", "sink");
-    }
-
-    @Test
-    public void testSourceTopics() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSource("source-1", "topic-1");
-        builder.addSource("source-2", "topic-2");
-        builder.addSource("source-3", "topic-3");
-
-        assertEquals(3, builder.sourceTopics().size());
-    }
-
-    @Test(expected = TopologyException.class)
-    public void testAddStateStoreWithNonExistingProcessor() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addStateStore(new MockStateStoreSupplier("store", false), "no-such-processsor");
-    }
-
-    @Test(expected = TopologyException.class)
-    public void testAddStateStoreWithSource() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSource("source-1", "topic-1");
-        builder.addStateStore(new MockStateStoreSupplier("store", false), "source-1");
-    }
-
-    @Test(expected = TopologyException.class)
-    public void testAddStateStoreWithSink() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSink("sink-1", "topic-1");
-        builder.addStateStore(new MockStateStoreSupplier("store", false), "sink-1");
-    }
-
-    @Test(expected = TopologyException.class)
-    public void testAddStateStoreWithDuplicates() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addStateStore(new MockStateStoreSupplier("store", false));
-        builder.addStateStore(new MockStateStoreSupplier("store", false));
-    }
-
-    @Test
-    public void testAddStateStore() {
-        final TopologyBuilder builder = new TopologyBuilder();
-        List<StateStoreSupplier> suppliers;
-
-        StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false);
-        builder.addStateStore(supplier);
-        suppliers = builder.build(null).stateStoreSuppliers();
-        assertEquals(0, suppliers.size());
-
-        builder.addSource("source-1", "topic-1");
-        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
-        builder.connectProcessorAndStateStores("processor-1", "store-1");
-        suppliers = builder.build(null).stateStoreSuppliers();
-        assertEquals(1, suppliers.size());
-        assertEquals(supplier.name(), suppliers.get(0).name());
-    }
-
-    @Test
-    public void testTopicGroups() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSource("source-1", "topic-1", "topic-1x");
-        builder.addSource("source-2", "topic-2");
-        builder.addSource("source-3", "topic-3");
-        builder.addSource("source-4", "topic-4");
-        builder.addSource("source-5", "topic-5");
-
-        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
-
-        builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1");
-        builder.copartitionSources(mkList("source-1", "source-2"));
-
-        builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
-
-        Map<Integer, Set<String>> topicGroups = builder.topicGroups();
-
-        Map<Integer, Set<String>> expectedTopicGroups = new HashMap<>();
-        expectedTopicGroups.put(0, mkSet("topic-1", "topic-1x", "topic-2"));
-        expectedTopicGroups.put(1, mkSet("topic-3", "topic-4"));
-        expectedTopicGroups.put(2, mkSet("topic-5"));
-
-        assertEquals(3, topicGroups.size());
-        assertEquals(expectedTopicGroups, topicGroups);
-
-        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
-
-        assertEquals(mkSet(mkSet("topic-1", "topic-1x", "topic-2")), new HashSet<>(copartitionGroups));
-    }
-
-    @Test
-    public void testTopicGroupsByStateStore() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSource("source-1", "topic-1", "topic-1x");
-        builder.addSource("source-2", "topic-2");
-        builder.addSource("source-3", "topic-3");
-        builder.addSource("source-4", "topic-4");
-        builder.addSource("source-5", "topic-5");
-
-        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
-        builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2");
-        builder.addStateStore(new MockStateStoreSupplier("strore-1", false), "processor-1", "processor-2");
-
-        builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3");
-        builder.addProcessor("processor-4", new MockProcessorSupplier(), "source-4");
-        builder.addStateStore(new MockStateStoreSupplier("strore-2", false), "processor-3", "processor-4");
-
-        Map<Integer, Set<String>> topicGroups = builder.topicGroups();
-
-        Map<Integer, Set<String>> expectedTopicGroups = new HashMap<>();
-        expectedTopicGroups.put(0, mkSet("topic-1", "topic-1x", "topic-2"));
-        expectedTopicGroups.put(1, mkSet("topic-3", "topic-4"));
-        expectedTopicGroups.put(2, mkSet("topic-5"));
-
-        assertEquals(3, topicGroups.size());
-        assertEquals(expectedTopicGroups, topicGroups);
-    }
-
-    @Test
-    public void testBuild() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSource("source-1", "topic-1", "topic-1x");
-        builder.addSource("source-2", "topic-2");
-        builder.addSource("source-3", "topic-3");
-        builder.addSource("source-4", "topic-4");
-        builder.addSource("source-5", "topic-5");
-
-        builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
-        builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1");
-        builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
-
-        ProcessorTopology topology0 = builder.build(0);
-        ProcessorTopology topology1 = builder.build(1);
-        ProcessorTopology topology2 = builder.build(2);
-
-        assertEquals(mkSet("source-1", "source-2", "processor-1", "processor-2"), nodeNames(topology0.processors()));
-        assertEquals(mkSet("source-3", "source-4", "processor-3"), nodeNames(topology1.processors()));
-        assertEquals(mkSet("source-5"), nodeNames(topology2.processors()));
-    }
-
-    private Set<String> nodeNames(Collection<ProcessorNode> nodes) {
-        Set<String> nodeNames = new HashSet<>();
-        for (ProcessorNode node : nodes) {
-            nodeNames.add(node.name());
-        }
-        return nodeNames;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
deleted file mode 100644
index 0a1f95c..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-
-public class MinTimestampTrackerTest {
-
-    private Stamped<String> elem(long timestamp) {
-        return new Stamped<>("", timestamp);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testTracking() {
-        TimestampTracker<String> tracker = new MinTimestampTracker<>();
-
-        Object[] elems = new Object[]{
-            elem(100), elem(101), elem(102), elem(98), elem(99), elem(100)
-        };
-
-        int insertionIndex = 0;
-        int removalIndex = 0;
-
-        // add 100
-        tracker.addElement((Stamped<String>) elems[insertionIndex++]);
-        assertEquals(100L, tracker.get());
-
-        // add 101
-        tracker.addElement((Stamped<String>) elems[insertionIndex++]);
-        assertEquals(100L, tracker.get());
-
-        // remove 100
-        tracker.removeElement((Stamped<String>) elems[removalIndex++]);
-        assertEquals(101L, tracker.get());
-
-        // add 102
-        tracker.addElement((Stamped<String>) elems[insertionIndex++]);
-        assertEquals(101L, tracker.get());
-
-        // add 98
-        tracker.addElement((Stamped<String>) elems[insertionIndex++]);
-        assertEquals(98L, tracker.get());
-
-        // add 99
-        tracker.addElement((Stamped<String>) elems[insertionIndex++]);
-        assertEquals(98L, tracker.get());
-
-        // add 100
-        tracker.addElement((Stamped<String>) elems[insertionIndex++]);
-        assertEquals(98L, tracker.get());
-
-        // remove 101
-        tracker.removeElement((Stamped<String>) elems[removalIndex++]);
-        assertEquals(98L, tracker.get());
-
-        // remove 102
-        tracker.removeElement((Stamped<String>) elems[removalIndex++]);
-        assertEquals(98L, tracker.get());
-
-        // remove 98
-        tracker.removeElement((Stamped<String>) elems[removalIndex++]);
-        assertEquals(99L, tracker.get());
-
-        // remove 99
-        tracker.removeElement((Stamped<String>) elems[removalIndex++]);
-        assertEquals(100L, tracker.get());
-
-        // remove 100
-        tracker.removeElement((Stamped<String>) elems[removalIndex++]);
-        assertEquals(100L, tracker.get());
-
-        assertEquals(insertionIndex, removalIndex);
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
deleted file mode 100644
index b91acdc..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.test.MockSourceNode;
-import org.apache.kafka.test.MockTimestampExtractor;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
-public class PartitionGroupTest {
-    private final Serializer<Integer> intSerializer = new IntegerSerializer();
-    private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
-    private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
-    private final TopicPartition partition1 = new TopicPartition("topic", 1);
-    private final TopicPartition partition2 = new TopicPartition("topic", 2);
-    private final RecordQueue queue1 = new RecordQueue(partition1, new MockSourceNode<>(intDeserializer, intDeserializer));
-    private final RecordQueue queue2 = new RecordQueue(partition2, new MockSourceNode<>(intDeserializer, intDeserializer));
-
-    private final byte[] recordValue = intSerializer.serialize(null, 10);
-    private final byte[] recordKey = intSerializer.serialize(null, 1);
-
-    private final PartitionGroup group = new PartitionGroup(new HashMap<TopicPartition, RecordQueue>() {
-        {
-            put(partition1, queue1);
-            put(partition2, queue2);
-        }
-    }, timestampExtractor);
-
-    @Test
-    public void testTimeTracking() {
-        assertEquals(0, group.numBuffered());
-
-        // add three 3 records with timestamp 1, 3, 5 to partition-1
-        List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
-            new ConsumerRecord<>("topic", 1, 1, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 3, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 5, recordKey, recordValue));
-
-        group.addRawRecords(partition1, list1);
-
-        // add three 3 records with timestamp 2, 4, 6 to partition-2
-        List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
-            new ConsumerRecord<>("topic", 1, 2, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 4, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 6, recordKey, recordValue));
-
-        group.addRawRecords(partition2, list2);
-
-        assertEquals(6, group.numBuffered());
-        assertEquals(3, group.numBuffered(partition1));
-        assertEquals(3, group.numBuffered(partition2));
-        assertEquals(TimestampTracker.NOT_KNOWN, group.timestamp());
-
-        StampedRecord record;
-        PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
-
-        // get one record
-        record = group.nextRecord(info);
-        assertEquals(partition1, info.partition());
-        assertEquals(1L, record.timestamp);
-        assertEquals(5, group.numBuffered());
-        assertEquals(2, group.numBuffered(partition1));
-        assertEquals(3, group.numBuffered(partition2));
-        assertEquals(TimestampTracker.NOT_KNOWN, group.timestamp());
-
-        // get one record, now the time should be advanced
-        record = group.nextRecord(info);
-        assertEquals(partition2, info.partition());
-        assertEquals(2L, record.timestamp);
-        assertEquals(4, group.numBuffered());
-        assertEquals(2, group.numBuffered(partition1));
-        assertEquals(2, group.numBuffered(partition2));
-        assertEquals(3L, group.timestamp());
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
deleted file mode 100644
index c447f99..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.MockConsumer;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.state.OffsetCheckpoint;
-import org.apache.kafka.test.MockStateStoreSupplier;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.channels.FileLock;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
-
-public class ProcessorStateManagerTest {
-
-    private class MockRestoreConsumer  extends MockConsumer<byte[], byte[]> {
-        private final Serializer<Integer> serializer = new IntegerSerializer();
-
-        public TopicPartition assignedPartition = null;
-        public TopicPartition seekPartition = null;
-        public long seekOffset = -1L;
-        public boolean seekToBeginingCalled = false;
-        public boolean seekToEndCalled = false;
-        private long endOffset = 0L;
-        private long currentOffset = 0L;
-
-        private ArrayList<ConsumerRecord<byte[], byte[]>> recordBuffer = new ArrayList<>();
-
-        MockRestoreConsumer() {
-            super(OffsetResetStrategy.EARLIEST);
-
-            reset();
-        }
-
-        // reset this mock restore consumer for a state store registration
-        public void reset() {
-            assignedPartition = null;
-            seekOffset = -1L;
-            seekToBeginingCalled = false;
-            seekToEndCalled = false;
-            endOffset = 0L;
-            recordBuffer.clear();
-        }
-
-        // buffer a record (we cannot use addRecord because we need to add records before asigning a partition)
-        public void bufferRecord(ConsumerRecord<Integer, Integer> record) {
-            recordBuffer.add(
-                new ConsumerRecord<>(record.topic(), record.partition(), record.offset(),
-                    serializer.serialize(record.topic(), record.key()),
-                    serializer.serialize(record.topic(), record.value())));
-            endOffset = record.offset();
-
-            super.updateEndOffsets(Collections.singletonMap(assignedPartition, endOffset));
-        }
-
-        @Override
-        public synchronized void assign(List<TopicPartition> partitions) {
-            int numPartitions = partitions.size();
-            if (numPartitions > 1)
-                throw new IllegalArgumentException("RestoreConsumer: more than one partition specified");
-
-            if (numPartitions == 1) {
-                if (assignedPartition != null)
-                    throw new IllegalStateException("RestoreConsumer: partition already assigned");
-                assignedPartition = partitions.get(0);
-
-                // set the beginning offset to 0
-                // NOTE: this is users responsible to set the initial lEO.
-                super.updateBeginningOffsets(Collections.singletonMap(assignedPartition, 0L));
-            }
-
-            super.assign(partitions);
-        }
-
-        @Override
-        public ConsumerRecords<byte[], byte[]> poll(long timeout) {
-            // add buffered records to MockConsumer
-            for (ConsumerRecord<byte[], byte[]> record : recordBuffer) {
-                super.addRecord(record);
-            }
-            recordBuffer.clear();
-
-            ConsumerRecords<byte[], byte[]> records = super.poll(timeout);
-
-            // set the current offset
-            Iterable<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(assignedPartition);
-            for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
-                currentOffset = record.offset();
-            }
-
-            return records;
-        }
-
-        @Override
-        public synchronized long position(TopicPartition partition) {
-            if (!partition.equals(assignedPartition))
-                throw new IllegalStateException("RestoreConsumer: unassigned partition");
-
-            return currentOffset;
-        }
-
-        @Override
-        public synchronized void seek(TopicPartition partition, long offset) {
-            if (offset < 0)
-                throw new IllegalArgumentException("RestoreConsumer: offset should not be negative");
-
-            if (seekOffset >= 0)
-                throw new IllegalStateException("RestoreConsumer: offset already seeked");
-
-            seekPartition = partition;
-            seekOffset = offset;
-            currentOffset = offset;
-            super.seek(partition, offset);
-        }
-
-        @Override
-        public synchronized void seekToBeginning(TopicPartition... partitions) {
-            if (partitions.length != 1)
-                throw new IllegalStateException("RestoreConsumer: other than one partition specified");
-
-            for (TopicPartition partition : partitions) {
-                if (!partition.equals(assignedPartition))
-                    throw new IllegalStateException("RestoreConsumer: seek-to-end not on the assigned partition");
-            }
-
-            seekToBeginingCalled = true;
-            currentOffset = 0L;
-        }
-
-        @Override
-        public synchronized void seekToEnd(TopicPartition... partitions) {
-            if (partitions.length != 1)
-                throw new IllegalStateException("RestoreConsumer: other than one partition specified");
-
-            for (TopicPartition partition : partitions) {
-                if (!partition.equals(assignedPartition))
-                    throw new IllegalStateException("RestoreConsumer: seek-to-end not on the assigned partition");
-            }
-
-            seekToEndCalled = true;
-            currentOffset = endOffset;
-        }
-    }
-
-    @Test
-    public void testLockStateDirectory() throws IOException {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-            FileLock lock;
-
-            // the state manager locks the directory
-            ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer());
-
-            try {
-                // this should not get the lock
-                lock = ProcessorStateManager.lockStateDirectory(baseDir);
-                assertNull(lock);
-            } finally {
-                // by closing the state manager, release the lock
-                stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
-            }
-
-            // now, this should get the lock
-            lock = ProcessorStateManager.lockStateDirectory(baseDir);
-            try {
-                assertNotNull(lock);
-            } finally {
-                if (lock != null) lock.release();
-            }
-        } finally {
-            Utils.delete(baseDir);
-        }
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testNoTopic() throws IOException {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-            MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("mockStore", false);
-
-            ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer());
-            try {
-                stateMgr.register(mockStateStore, mockStateStore.stateRestoreCallback);
-            } finally {
-                stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
-            }
-        } finally {
-            Utils.delete(baseDir);
-        }
-    }
-
-    @Test
-    public void testRegisterPersistentStore() throws IOException {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-            long lastCheckpointedOffset = 10L;
-            OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
-            checkpoint.write(Collections.singletonMap(new TopicPartition("persistentStore", 2), lastCheckpointedOffset));
-
-            MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-            restoreConsumer.updatePartitions("persistentStore", Arrays.asList(
-                    new PartitionInfo("persistentStore", 1, Node.noNode(), new Node[0], new Node[0]),
-                    new PartitionInfo("persistentStore", 2, Node.noNode(), new Node[0], new Node[0])
-            ));
-            restoreConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition("persistentStore", 2), 13L));
-
-            MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store
-
-            ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer);
-            try {
-                restoreConsumer.reset();
-
-                ArrayList<Integer> expectedKeys = new ArrayList<>();
-                for (int i = 1; i <= 3; i++) {
-                    long offset = (long) i;
-                    int key = i * 10;
-                    expectedKeys.add(key);
-                    restoreConsumer.bufferRecord(
-                            new ConsumerRecord<>("persistentStore", 2, offset, key, 0)
-                    );
-                }
-
-                stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
-
-                assertEquals(new TopicPartition("persistentStore", 2), restoreConsumer.assignedPartition);
-                assertEquals(lastCheckpointedOffset, restoreConsumer.seekOffset);
-                assertFalse(restoreConsumer.seekToBeginingCalled);
-                assertTrue(restoreConsumer.seekToEndCalled);
-                assertEquals(expectedKeys, persistentStore.keys);
-
-            } finally {
-                stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
-            }
-
-        } finally {
-            Utils.delete(baseDir);
-        }
-    }
-
-    @Test
-    public void testRegisterNonPersistentStore() throws IOException {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-            long lastCheckpointedOffset = 10L;
-            OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
-            checkpoint.write(Collections.singletonMap(new TopicPartition("persistentStore", 2), lastCheckpointedOffset));
-
-            MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-            restoreConsumer.updatePartitions("nonPersistentStore", Arrays.asList(
-                    new PartitionInfo("nonPersistentStore", 1, Node.noNode(), new Node[0], new Node[0]),
-                    new PartitionInfo("nonPersistentStore", 2, Node.noNode(), new Node[0], new Node[0])
-            ));
-            restoreConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition("persistentStore", 2), 13L));
-
-            MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false); // non persistent store
-
-            ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer);
-            try {
-                restoreConsumer.reset();
-
-                ArrayList<Integer> expectedKeys = new ArrayList<>();
-                for (int i = 1; i <= 3; i++) {
-                    long offset = (long) (i + 100);
-                    int key = i;
-                    expectedKeys.add(i);
-                    restoreConsumer.bufferRecord(
-                            new ConsumerRecord<>("nonPersistentStore", 2, offset, key, 0)
-                    );
-                }
-
-                stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
-
-                assertEquals(new TopicPartition("nonPersistentStore", 2), restoreConsumer.assignedPartition);
-                assertEquals(0L, restoreConsumer.seekOffset);
-                assertTrue(restoreConsumer.seekToBeginingCalled);
-                assertTrue(restoreConsumer.seekToEndCalled);
-                assertEquals(expectedKeys, nonPersistentStore.keys);
-            } finally {
-                stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
-            }
-        } finally {
-            Utils.delete(baseDir);
-        }
-    }
-
-    @Test
-    public void testGetStore() throws IOException {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-            MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-            restoreConsumer.updatePartitions("mockStore", Arrays.asList(
-                    new PartitionInfo("mockStore", 1, Node.noNode(), new Node[0], new Node[0])
-            ));
-
-            MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("mockStore", false);
-
-            ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer);
-            try {
-                stateMgr.register(mockStateStore, mockStateStore.stateRestoreCallback);
-
-                assertNull(stateMgr.getStore("noSuchStore"));
-                assertEquals(mockStateStore, stateMgr.getStore("mockStore"));
-
-            } finally {
-                stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
-            }
-        } finally {
-            Utils.delete(baseDir);
-        }
-    }
-
-    @Test
-    public void testClose() throws IOException {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        File checkpointFile = new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME);
-        try {
-            // write an empty checkpoint file
-            OffsetCheckpoint oldCheckpoint = new OffsetCheckpoint(checkpointFile);
-            oldCheckpoint.write(Collections.<TopicPartition, Long>emptyMap());
-
-            MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-            restoreConsumer.updatePartitions("persistentStore", Arrays.asList(
-                    new PartitionInfo("persistentStore", 1, Node.noNode(), new Node[0], new Node[0])
-            ));
-            restoreConsumer.updatePartitions("nonPersistentStore", Arrays.asList(
-                    new PartitionInfo("nonPersistentStore", 1, Node.noNode(), new Node[0], new Node[0])
-            ));
-
-            // set up ack'ed offsets
-            HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>();
-            ackedOffsets.put(new TopicPartition("persistentStore", 1), 123L);
-            ackedOffsets.put(new TopicPartition("nonPersistentStore", 1), 456L);
-            ackedOffsets.put(new TopicPartition("otherTopic", 1), 789L);
-
-            MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true);
-            MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false);
-
-            ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer);
-            try {
-                // make sure the checkpoint file is deleted
-                assertFalse(checkpointFile.exists());
-
-                restoreConsumer.reset();
-                stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
-
-                restoreConsumer.reset();
-                stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
-            } finally {
-                // close the state manager with the ack'ed offsets
-                stateMgr.close(ackedOffsets);
-            }
-
-            // make sure all stores are closed, and the checkpoint file is written.
-            assertTrue(persistentStore.flushed);
-            assertTrue(persistentStore.closed);
-            assertTrue(nonPersistentStore.flushed);
-            assertTrue(nonPersistentStore.closed);
-            assertTrue(checkpointFile.exists());
-
-            // the checkpoint file should contain an offset from the persistent store only.
-            OffsetCheckpoint newCheckpoint = new OffsetCheckpoint(checkpointFile);
-            Map<TopicPartition, Long> checkpointedOffsets = newCheckpoint.read();
-            assertEquals(1, checkpointedOffsets.size());
-            assertEquals(new Long(123L + 1L), checkpointedOffsets.get(new TopicPartition("persistentStore", 1)));
-        } finally {
-            Utils.delete(baseDir);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
deleted file mode 100644
index 54096b2..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateUtils;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.ProcessorTopologyTestDriver;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.Properties;
-
-public class ProcessorTopologyTest {
-
-    private static final Serializer<String> STRING_SERIALIZER = new StringSerializer();
-    private static final Deserializer<String> STRING_DESERIALIZER = new StringDeserializer();
-
-    protected static final String INPUT_TOPIC = "input-topic";
-    protected static final String OUTPUT_TOPIC_1 = "output-topic-1";
-    protected static final String OUTPUT_TOPIC_2 = "output-topic-2";
-
-    private static long timestamp = 1000L;
-
-    private ProcessorTopologyTestDriver driver;
-    private StreamingConfig config;
-
-    @Before
-    public void setup() {
-        // Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ...
-        File localState = StateUtils.tempDir();
-        Properties props = new Properties();
-        props.setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        props.setProperty(StreamingConfig.STATE_DIR_CONFIG, localState.getAbsolutePath());
-        props.setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
-        props.setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-        props.setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-        props.setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        this.config = new StreamingConfig(props);
-    }
-
-    @After
-    public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
-
-    @Test
-    public void testTopologyMetadata() {
-        final TopologyBuilder builder = new TopologyBuilder();
-
-        builder.addSource("source-1", "topic-1");
-        builder.addSource("source-2", "topic-2", "topic-3");
-        builder.addProcessor("processor-1", new MockProcessorSupplier<>(), "source-1");
-        builder.addProcessor("processor-2", new MockProcessorSupplier<>(), "source-1", "source-2");
-        builder.addSink("sink-1", "topic-3", "processor-1");
-        builder.addSink("sink-2", "topic-4", "processor-1", "processor-2");
-
-        final ProcessorTopology topology = builder.build(null);
-
-        assertEquals(6, topology.processors().size());
-
-        assertEquals(2, topology.sources().size());
-
-        assertEquals(3, topology.sourceTopics().size());
-
-        assertNotNull(topology.source("topic-1"));
-
-        assertNotNull(topology.source("topic-2"));
-
-        assertNotNull(topology.source("topic-3"));
-
-        assertEquals(topology.source("topic-2"), topology.source("topic-3"));
-    }
-
-    @Test
-    public void testDrivingSimpleTopology() {
-        driver = new ProcessorTopologyTestDriver(config, createSimpleTopology());
-        driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
-        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1");
-        assertNoOutputRecord(OUTPUT_TOPIC_2);
-
-        driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
-        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2");
-        assertNoOutputRecord(OUTPUT_TOPIC_2);
-
-        driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
-        assertNoOutputRecord(OUTPUT_TOPIC_2);
-        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3");
-        assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4");
-        assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5");
-    }
-
-    @Test
-    public void testDrivingMultiplexingTopology() {
-        driver = new ProcessorTopologyTestDriver(config, createMultiplexingTopology());
-        driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
-        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
-        assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
-
-        driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
-        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2(1)");
-        assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2(2)");
-
-        driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
-        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3(1)");
-        assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4(1)");
-        assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5(1)");
-        assertNextOutputRecord(OUTPUT_TOPIC_2, "key3", "value3(2)");
-        assertNextOutputRecord(OUTPUT_TOPIC_2, "key4", "value4(2)");
-        assertNextOutputRecord(OUTPUT_TOPIC_2, "key5", "value5(2)");
-    }
-
-    @Test
-    public void testDrivingStatefulTopology() {
-        String storeName = "entries";
-        driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName), storeName);
-        driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC, "key1", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
-        assertNoOutputRecord(OUTPUT_TOPIC_1);
-
-        KeyValueStore<String, String> store = driver.getKeyValueStore("entries");
-        assertEquals("value4", store.get("key1"));
-        assertEquals("value2", store.get("key2"));
-        assertEquals("value3", store.get("key3"));
-        assertNull(store.get("key4"));
-    }
-
-    protected void assertNextOutputRecord(String topic, String key, String value) {
-        assertProducerRecord(driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER), topic, key, value);
-    }
-
-    protected void assertNoOutputRecord(String topic) {
-        assertNull(driver.readOutput(topic));
-    }
-
-    private void assertProducerRecord(ProducerRecord<String, String> record, String topic, String key, String value) {
-        assertEquals(topic, record.topic());
-        assertEquals(key, record.key());
-        assertEquals(value, record.value());
-        // Kafka Streaming doesn't set the partition, so it's always null
-        assertNull(record.partition());
-    }
-
-    protected TopologyBuilder createSimpleTopology() {
-        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC)
-                                    .addProcessor("processor", define(new ForwardingProcessor()), "source")
-                                    .addSink("sink", OUTPUT_TOPIC_1, "processor");
-    }
-
-    protected TopologyBuilder createMultiplexingTopology() {
-        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC)
-                                    .addProcessor("processor", define(new MultiplexingProcessor(2)), "source")
-                                    .addSink("sink1", OUTPUT_TOPIC_1, "processor")
-                                    .addSink("sink2", OUTPUT_TOPIC_2, "processor");
-    }
-
-    protected TopologyBuilder createStatefulTopology(String storeName) {
-        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC)
-                                    .addProcessor("processor", define(new StatefulProcessor(storeName)), "source")
-                                    .addStateStore(
-                                            Stores.create(storeName, config).withStringKeys().withStringValues().inMemory().build(),
-                                            "processor"
-                                    )
-                                    .addSink("counts", OUTPUT_TOPIC_1, "processor");
-    }
-
-    /**
-     * A processor that simply forwards all messages to all children.
-     */
-    protected static class ForwardingProcessor extends AbstractProcessor<String, String> {
-
-        @Override
-        public void process(String key, String value) {
-            context().forward(key, value);
-        }
-
-        @Override
-        public void punctuate(long streamTime) {
-            context().forward(Long.toString(streamTime), "punctuate");
-        }
-    }
-
-    /**
-     * A processor that forwards slightly-modified messages to each child.
-     */
-    protected static class MultiplexingProcessor extends AbstractProcessor<String, String> {
-
-        private final int numChildren;
-
-        public MultiplexingProcessor(int numChildren) {
-            this.numChildren = numChildren;
-        }
-
-        @Override
-        public void process(String key, String value) {
-            for (int i = 0; i != numChildren; ++i) {
-                context().forward(key, value + "(" + (i + 1) + ")", i);
-            }
-        }
-
-        @Override
-        public void punctuate(long streamTime) {
-            for (int i = 0; i != numChildren; ++i) {
-                context().forward(Long.toString(streamTime), "punctuate(" + (i + 1) + ")", i);
-            }
-        }
-    }
-
-    /**
-     * A processor that stores each key-value pair in an in-memory key-value store registered with the context. When
-     * {@link #punctuate(long)} is called, it outputs the total number of entries in the store.
-     */
-    protected static class StatefulProcessor extends AbstractProcessor<String, String> {
-
-        private KeyValueStore<String, String> store;
-        private final String storeName;
-
-        public StatefulProcessor(String storeName) {
-            this.storeName = storeName;
-        }
-
-        @Override
-        @SuppressWarnings("unchecked")
-        public void init(ProcessorContext context) {
-            super.init(context);
-            store = (KeyValueStore<String, String>) context.getStateStore(storeName);
-        }
-
-        @Override
-        public void process(String key, String value) {
-            store.put(key, value);
-        }
-
-        @Override
-        public void punctuate(long streamTime) {
-            int count = 0;
-            for (KeyValueIterator<String, String> iter = store.all(); iter.hasNext();) {
-                iter.next();
-                ++count;
-            }
-            context().forward(Long.toString(streamTime), count);
-        }
-
-        @Override
-        public void close() {
-            store.close();
-        }
-    }
-
-    protected <K, V> ProcessorSupplier<K, V> define(final Processor<K, V> processor) {
-        return new ProcessorSupplier<K, V>() {
-            @Override
-            public Processor<K, V> get() {
-                return processor;
-            }
-        };
-    }
-
-    public static class CustomTimestampExtractor implements TimestampExtractor {
-        @Override
-        public long extract(ConsumerRecord<Object, Object> record) {
-            return timestamp;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
deleted file mode 100644
index 2c7aaeb..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.junit.Test;
-
-import java.util.ArrayList;
-
-import static org.junit.Assert.assertEquals;
-
-public class PunctuationQueueTest {
-
-    @Test
-    public void testPunctuationInterval() {
-        TestProcessor processor = new TestProcessor();
-        ProcessorNode<String, String> node = new ProcessorNode<>("test", processor, null);
-        PunctuationQueue queue = new PunctuationQueue();
-
-        PunctuationSchedule sched = new PunctuationSchedule(node, 100L);
-        final long now = sched.timestamp - 100L;
-
-        queue.schedule(sched);
-
-        Punctuator punctuator = new Punctuator() {
-            public void punctuate(ProcessorNode node, long time) {
-                node.processor().punctuate(time);
-            }
-        };
-
-        queue.mayPunctuate(now, punctuator);
-        assertEquals(0, processor.punctuatedAt.size());
-
-        queue.mayPunctuate(now + 99L, punctuator);
-        assertEquals(0, processor.punctuatedAt.size());
-
-        queue.mayPunctuate(now + 100L, punctuator);
-        assertEquals(1, processor.punctuatedAt.size());
-
-        queue.mayPunctuate(now + 199L, punctuator);
-        assertEquals(1, processor.punctuatedAt.size());
-
-        queue.mayPunctuate(now + 200L, punctuator);
-        assertEquals(2, processor.punctuatedAt.size());
-    }
-
-    private static class TestProcessor implements Processor<String, String> {
-
-        public final ArrayList<Long> punctuatedAt = new ArrayList<>();
-
-        @Override
-        public void init(ProcessorContext context) {
-        }
-
-        @Override
-        public void process(String key, String value) {
-        }
-
-        @Override
-        public void punctuate(long streamTime) {
-            punctuatedAt.add(streamTime);
-        }
-
-        @Override
-        public void close() {
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java
deleted file mode 100644
index c40e881..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.junit.Test;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-public class QuickUnionTest {
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testUnite() {
-        QuickUnion<Long> qu = new QuickUnion<>();
-
-        long[] ids = {
-            1L, 2L, 3L, 4L, 5L
-        };
-
-        for (long id : ids) {
-            qu.add(id);
-        }
-
-        assertEquals(5, roots(qu, ids).size());
-
-        qu.unite(1L, 2L);
-        assertEquals(4, roots(qu, ids).size());
-        assertEquals(qu.root(1L), qu.root(2L));
-
-        qu.unite(3L, 4L);
-        assertEquals(3, roots(qu, ids).size());
-        assertEquals(qu.root(1L), qu.root(2L));
-        assertEquals(qu.root(3L), qu.root(4L));
-
-        qu.unite(1L, 5L);
-        assertEquals(2, roots(qu, ids).size());
-        assertEquals(qu.root(1L), qu.root(2L));
-        assertEquals(qu.root(2L), qu.root(5L));
-        assertEquals(qu.root(3L), qu.root(4L));
-
-        qu.unite(3L, 5L);
-        assertEquals(1, roots(qu, ids).size());
-        assertEquals(qu.root(1L), qu.root(2L));
-        assertEquals(qu.root(2L), qu.root(3L));
-        assertEquals(qu.root(3L), qu.root(4L));
-        assertEquals(qu.root(4L), qu.root(5L));
-    }
-
-    @Test
-    public void testUniteMany() {
-        QuickUnion<Long> qu = new QuickUnion<>();
-
-        long[] ids = {
-            1L, 2L, 3L, 4L, 5L
-        };
-
-        for (long id : ids) {
-            qu.add(id);
-        }
-
-        assertEquals(5, roots(qu, ids).size());
-
-        qu.unite(1L, 2L, 3L, 4L);
-        assertEquals(2, roots(qu, ids).size());
-        assertEquals(qu.root(1L), qu.root(2L));
-        assertEquals(qu.root(2L), qu.root(3L));
-        assertEquals(qu.root(3L), qu.root(4L));
-        assertNotEquals(qu.root(1L), qu.root(5L));
-    }
-
-    private Set<Long> roots(QuickUnion<Long> qu, long... ids) {
-        HashSet<Long> roots = new HashSet<>();
-        for (long id : ids) {
-            roots.add(qu.root(id));
-        }
-        return roots;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
deleted file mode 100644
index 6e86410..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.test.MockSourceNode;
-import org.apache.kafka.test.MockTimestampExtractor;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.List;
-
-public class RecordQueueTest {
-    private final Serializer<Integer> intSerializer = new IntegerSerializer();
-    private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
-    private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
-    private final RecordQueue queue = new RecordQueue(new TopicPartition("topic", 1), new MockSourceNode<>(intDeserializer, intDeserializer));
-
-    private final byte[] recordValue = intSerializer.serialize(null, 10);
-    private final byte[] recordKey = intSerializer.serialize(null, 1);
-
-    @Test
-    public void testTimeTracking() {
-
-        assertTrue(queue.isEmpty());
-
-        // add three 3 out-of-order records with timestamp 2, 1, 3
-        List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
-            new ConsumerRecord<>("topic", 1, 2, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 1, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 3, recordKey, recordValue));
-
-        queue.addRawRecords(list1, timestampExtractor);
-
-        assertEquals(3, queue.size());
-        assertEquals(TimestampTracker.NOT_KNOWN, queue.timestamp());
-
-        // poll the first record, now with 1, 3
-        assertEquals(2L, queue.poll().timestamp);
-        assertEquals(2, queue.size());
-        assertEquals(1L, queue.timestamp());
-
-        // poll the second record, now with 3
-        assertEquals(1L, queue.poll().timestamp);
-        assertEquals(1, queue.size());
-        assertEquals(3L, queue.timestamp());
-
-        // add three 3 out-of-order records with timestamp 4, 1, 2
-        // now with 3, 4, 1, 2
-        List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
-            new ConsumerRecord<>("topic", 1, 4, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 1, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 2, recordKey, recordValue));
-
-        queue.addRawRecords(list2, timestampExtractor);
-
-        assertEquals(4, queue.size());
-        assertEquals(3L, queue.timestamp());
-
-        // poll the third record, now with 4, 1, 2
-        assertEquals(3L, queue.poll().timestamp);
-        assertEquals(3, queue.size());
-        assertEquals(3L, queue.timestamp());
-
-        // poll the rest records
-        assertEquals(4L, queue.poll().timestamp);
-        assertEquals(3L, queue.timestamp());
-
-        assertEquals(1L, queue.poll().timestamp);
-        assertEquals(3L, queue.timestamp());
-
-        assertEquals(2L, queue.poll().timestamp);
-        assertEquals(0, queue.size());
-        assertEquals(3L, queue.timestamp());
-
-        // add three more records with 4, 5, 6
-        List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
-            new ConsumerRecord<>("topic", 1, 4, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 5, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 6, recordKey, recordValue));
-
-        queue.addRawRecords(list3, timestampExtractor);
-
-        assertEquals(3, queue.size());
-        assertEquals(3L, queue.timestamp());
-
-        // poll one record again, the timestamp should advance now
-        assertEquals(4L, queue.poll().timestamp);
-        assertEquals(2, queue.size());
-        assertEquals(5L, queue.timestamp());
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
deleted file mode 100644
index a95c2fa..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.MockConsumer;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.producer.MockProducer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.test.MockSourceNode;
-import org.junit.Test;
-import org.junit.Before;
-
-import java.io.File;
-import java.nio.file.Files;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Properties;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class StreamTaskTest {
-
-    private final Serializer<Integer> intSerializer = new IntegerSerializer();
-    private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
-    private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
-
-    private final TopicPartition partition1 = new TopicPartition("topic1", 1);
-    private final TopicPartition partition2 = new TopicPartition("topic2", 1);
-    private final Set<TopicPartition> partitions = Utils.mkSet(partition1, partition2);
-
-    private final MockSourceNode source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
-    private final MockSourceNode source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
-    private final ProcessorTopology topology = new ProcessorTopology(
-            Arrays.asList((ProcessorNode) source1, (ProcessorNode) source2),
-            new HashMap<String, SourceNode>() {
-                {
-                    put("topic1", source1);
-                    put("topic2", source2);
-                }
-            },
-            Collections.<StateStoreSupplier>emptyList()
-    );
-
-    private StreamingConfig createConfig(final File baseDir) throws Exception {
-        return new StreamingConfig(new Properties() {
-            {
-                setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
-                setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
-                setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
-                setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
-            }
-        });
-    }
-
-    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-    private final MockProducer<byte[], byte[]> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer);
-    private final MockConsumer<byte[], byte[]> restoreStateConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-
-    private final byte[] recordValue = intSerializer.serialize(null, 10);
-    private final byte[] recordKey = intSerializer.serialize(null, 1);
-
-
-    @Before
-    public void setup() {
-        consumer.assign(Arrays.asList(partition1, partition2));
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testProcessOrder() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-            StreamingConfig config = createConfig(baseDir);
-            StreamTask task = new StreamTask(new TaskId(0, 0), consumer, producer, restoreStateConsumer, partitions, topology, config, null);
-
-            task.addRecords(partition1, records(
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, recordKey, recordValue),
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, recordKey, recordValue)
-            ));
-
-            task.addRecords(partition2, records(
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, recordKey, recordValue),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, recordKey, recordValue),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, recordKey, recordValue)
-            ));
-
-            assertEquals(5, task.process());
-            assertEquals(1, source1.numReceived);
-            assertEquals(0, source2.numReceived);
-
-            assertEquals(4, task.process());
-            assertEquals(1, source1.numReceived);
-            assertEquals(1, source2.numReceived);
-
-            assertEquals(3, task.process());
-            assertEquals(2, source1.numReceived);
-            assertEquals(1, source2.numReceived);
-
-            assertEquals(2, task.process());
-            assertEquals(3, source1.numReceived);
-            assertEquals(1, source2.numReceived);
-
-            assertEquals(1, task.process());
-            assertEquals(3, source1.numReceived);
-            assertEquals(2, source2.numReceived);
-
-            assertEquals(0, task.process());
-            assertEquals(3, source1.numReceived);
-            assertEquals(3, source2.numReceived);
-
-            task.close();
-
-        } finally {
-            Utils.delete(baseDir);
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testPauseResume() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-            StreamingConfig config = createConfig(baseDir);
-            StreamTask task = new StreamTask(new TaskId(1, 1), consumer, producer, restoreStateConsumer, partitions, topology, config, null);
-
-            task.addRecords(partition1, records(
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, recordKey, recordValue)
-            ));
-
-            task.addRecords(partition2, records(
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, recordKey, recordValue),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, recordKey, recordValue),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 55, recordKey, recordValue),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 65, recordKey, recordValue)
-            ));
-
-            assertEquals(5, task.process());
-            assertEquals(1, source1.numReceived);
-            assertEquals(0, source2.numReceived);
-
-            assertEquals(1, consumer.paused().size());
-            assertTrue(consumer.paused().contains(partition2));
-
-            task.addRecords(partition1, records(
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, recordKey, recordValue),
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, recordKey, recordValue),
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 50, recordKey, recordValue)
-            ));
-
-            assertEquals(2, consumer.paused().size());
-            assertTrue(consumer.paused().contains(partition1));
-            assertTrue(consumer.paused().contains(partition2));
-
-            assertEquals(7, task.process());
-            assertEquals(1, source1.numReceived);
-            assertEquals(1, source2.numReceived);
-
-            assertEquals(1, consumer.paused().size());
-            assertTrue(consumer.paused().contains(partition1));
-
-            assertEquals(6, task.process());
-            assertEquals(2, source1.numReceived);
-            assertEquals(1, source2.numReceived);
-
-            assertEquals(0, consumer.paused().size());
-
-            task.close();
-
-        } finally {
-            Utils.delete(baseDir);
-        }
-    }
-
-    private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) {
-        return Arrays.asList(recs);
-    }
-}


Mime
View raw message