flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [1/2] flink git commit: [FLINK-3102] Allow reading from multiple topics with one FlinkKafkaConsumer instance
Date Tue, 08 Dec 2015 17:22:56 GMT
Repository: flink
Updated Branches:
  refs/heads/master 4dbb10f52 -> fc8be1ca6


http://git-wip-us.apache.org/repos/asf/flink/blob/fc8be1ca/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
index 3d392aa..c4b026b 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
@@ -19,10 +19,12 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
+import org.apache.kafka.common.Node;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -30,23 +32,30 @@ import java.util.Set;
 
 import static org.junit.Assert.*;
 
+
 /**
  * Tests that the partition assignment is deterministic and stable.
  */
 public class KafkaConsumerPartitionAssignmentTest {
 
+	private final Node fake = new Node(1337, "localhost", 1337);
+
 	@Test
 	public void testPartitionsEqualConsumers() {
 		try {
-			int[] partitions = {4, 52, 17, 1};
-			
-			for (int i = 0; i < partitions.length; i++) {
-				List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
-						partitions, "test-topic", partitions.length, i);
-				
+			List<KafkaTopicPartitionLeader> inPartitions = new ArrayList<>();
+			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 4),
fake));
+			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 52),
fake));
+			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 17),
fake));
+			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 1),
fake));
+
+			for (int i = 0; i < inPartitions.size(); i++) {
+				List<KafkaTopicPartitionLeader> parts = FlinkKafkaConsumer.assignPartitions(
+						inPartitions, inPartitions.size(), i);
+
 				assertNotNull(parts);
 				assertEquals(1, parts.size());
-				assertTrue(contains(partitions, parts.get(0).partition()));
+				assertTrue(contains(inPartitions, parts.get(0).getTopicPartition().getPartition()));
 			}
 		}
 		catch (Exception e) {
@@ -55,31 +64,43 @@ public class KafkaConsumerPartitionAssignmentTest {
 		}
 	}
 
+	private boolean contains(List<KafkaTopicPartitionLeader> inPartitions, int partition)
{
+		for (KafkaTopicPartitionLeader ktp: inPartitions) {
+			if (ktp.getTopicPartition().getPartition() == partition) {
+				return true;
+			}
+		}
+		return false;
+	}
+
 	@Test
 	public void testMultiplePartitionsPerConsumers() {
 		try {
-			final int[] partitions = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
+			final int[] partitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
+
+			final List<KafkaTopicPartitionLeader> partitions = new ArrayList<>();
+			final Set<KafkaTopicPartitionLeader> allPartitions = new HashSet<>();
 
-			final Set<Integer> allPartitions = new HashSet<>();
-			for (int i : partitions) {
-				allPartitions.add(i);
+			for (int p : partitionIDs) {
+				KafkaTopicPartitionLeader part = new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic",
p), fake);
+				partitions.add(part);
+				allPartitions.add(part);
 			}
-			
+
 			final int numConsumers = 3;
-			final int minPartitionsPerConsumer = partitions.length / numConsumers;
-			final int maxPartitionsPerConsumer = partitions.length / numConsumers + 1;
-			
+			final int minPartitionsPerConsumer = partitions.size() / numConsumers;
+			final int maxPartitionsPerConsumer = partitions.size() / numConsumers + 1;
+
 			for (int i = 0; i < numConsumers; i++) {
-				List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
-						partitions, "test-topic", numConsumers, i);
+				List<KafkaTopicPartitionLeader> parts = FlinkKafkaConsumer.assignPartitions(partitions,
numConsumers, i);
 
 				assertNotNull(parts);
 				assertTrue(parts.size() >= minPartitionsPerConsumer);
 				assertTrue(parts.size() <= maxPartitionsPerConsumer);
 
-				for (TopicPartition p : parts) {
+				for (KafkaTopicPartitionLeader p : parts) {
 					// check that the element was actually contained
-					assertTrue(allPartitions.remove(p.partition()));
+					assertTrue(allPartitions.remove(p));
 				}
 			}
 
@@ -95,25 +116,26 @@ public class KafkaConsumerPartitionAssignmentTest {
 	@Test
 	public void testPartitionsFewerThanConsumers() {
 		try {
-			final int[] partitions = {4, 52, 17, 1};
+			List<KafkaTopicPartitionLeader> inPartitions = new ArrayList<>();
+			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 4),
fake));
+			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 52),
fake));
+			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 17),
fake));
+			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 1),
fake));
 
-			final Set<Integer> allPartitions = new HashSet<>();
-			for (int i : partitions) {
-				allPartitions.add(i);
-			}
+			final Set<KafkaTopicPartitionLeader> allPartitions = new HashSet<>();
+			allPartitions.addAll(inPartitions);
+
+			final int numConsumers = 2 * inPartitions.size() + 3;
 
-			final int numConsumers = 2 * partitions.length + 3;
-			
 			for (int i = 0; i < numConsumers; i++) {
-				List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
-						partitions, "test-topic", numConsumers, i);
+				List<KafkaTopicPartitionLeader> parts = FlinkKafkaConsumer.assignPartitions(inPartitions,
numConsumers, i);
 
 				assertNotNull(parts);
 				assertTrue(parts.size() <= 1);
-				
-				for (TopicPartition p : parts) {
+
+				for (KafkaTopicPartitionLeader p : parts) {
 					// check that the element was actually contained
-					assertTrue(allPartitions.remove(p.partition()));
+					assertTrue(allPartitions.remove(p));
 				}
 			}
 
@@ -125,15 +147,16 @@ public class KafkaConsumerPartitionAssignmentTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testAssignEmptyPartitions() {
 		try {
-			List<TopicPartition> parts1 = FlinkKafkaConsumer.assignPartitions(new int[0], "test-topic",
4, 2);
+			List<KafkaTopicPartitionLeader> ep = new ArrayList<>();
+			List<KafkaTopicPartitionLeader> parts1 = FlinkKafkaConsumer.assignPartitions(ep,
4, 2);
 			assertNotNull(parts1);
 			assertTrue(parts1.isEmpty());
 
-			List<TopicPartition> parts2 = FlinkKafkaConsumer.assignPartitions(new int[0], "test-topic",
1, 0);
+			List<KafkaTopicPartitionLeader> parts2 = FlinkKafkaConsumer.assignPartitions(ep,
1, 0);
 			assertNotNull(parts2);
 			assertTrue(parts2.isEmpty());
 		}
@@ -146,35 +169,36 @@ public class KafkaConsumerPartitionAssignmentTest {
 	@Test
 	public void testGrowingPartitionsRemainsStable() {
 		try {
-			final int[] newPartitions = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
-			final int[] initialPartitions = Arrays.copyOfRange(newPartitions, 0, 7);
+			final int[] newPartitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
+			List<KafkaTopicPartitionLeader> newPartitions = new ArrayList<>();
 
-			final Set<Integer> allNewPartitions = new HashSet<>();
-			final Set<Integer> allInitialPartitions = new HashSet<>();
-			for (int i : newPartitions) {
-				allNewPartitions.add(i);
-			}
-			for (int i : initialPartitions) {
-				allInitialPartitions.add(i);
+			for (int p : newPartitionIDs) {
+				KafkaTopicPartitionLeader part = new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic",
p), fake);
+				newPartitions.add(part);
 			}
 
+			List<KafkaTopicPartitionLeader> initialPartitions = newPartitions.subList(0, 7);
+
+			final Set<KafkaTopicPartitionLeader> allNewPartitions = new HashSet<>(newPartitions);
+			final Set<KafkaTopicPartitionLeader> allInitialPartitions = new HashSet<>(initialPartitions);
+
 			final int numConsumers = 3;
-			final int minInitialPartitionsPerConsumer = initialPartitions.length / numConsumers;
-			final int maxInitialPartitionsPerConsumer = initialPartitions.length / numConsumers +
1;
-			final int minNewPartitionsPerConsumer = newPartitions.length / numConsumers;
-			final int maxNewPartitionsPerConsumer = newPartitions.length / numConsumers + 1;
-			
-			List<TopicPartition> parts1 = FlinkKafkaConsumer.assignPartitions(
-					initialPartitions, "test-topic", numConsumers, 0);
-			List<TopicPartition> parts2 = FlinkKafkaConsumer.assignPartitions(
-					initialPartitions, "test-topic", numConsumers, 1);
-			List<TopicPartition> parts3 = FlinkKafkaConsumer.assignPartitions(
-					initialPartitions, "test-topic", numConsumers, 2);
+			final int minInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers;
+			final int maxInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers +
1;
+			final int minNewPartitionsPerConsumer = newPartitions.size() / numConsumers;
+			final int maxNewPartitionsPerConsumer = newPartitions.size() / numConsumers + 1;
+
+			List<KafkaTopicPartitionLeader> parts1 = FlinkKafkaConsumer.assignPartitions(
+					initialPartitions, numConsumers, 0);
+			List<KafkaTopicPartitionLeader> parts2 = FlinkKafkaConsumer.assignPartitions(
+					initialPartitions, numConsumers, 1);
+			List<KafkaTopicPartitionLeader> parts3 = FlinkKafkaConsumer.assignPartitions(
+					initialPartitions, numConsumers, 2);
 
 			assertNotNull(parts1);
 			assertNotNull(parts2);
 			assertNotNull(parts3);
-			
+
 			assertTrue(parts1.size() >= minInitialPartitionsPerConsumer);
 			assertTrue(parts1.size() <= maxInitialPartitionsPerConsumer);
 			assertTrue(parts2.size() >= minInitialPartitionsPerConsumer);
@@ -182,37 +206,37 @@ public class KafkaConsumerPartitionAssignmentTest {
 			assertTrue(parts3.size() >= minInitialPartitionsPerConsumer);
 			assertTrue(parts3.size() <= maxInitialPartitionsPerConsumer);
 
-			for (TopicPartition p : parts1) {
+			for (KafkaTopicPartitionLeader p : parts1) {
 				// check that the element was actually contained
-				assertTrue(allInitialPartitions.remove(p.partition()));
+				assertTrue(allInitialPartitions.remove(p));
 			}
-			for (TopicPartition p : parts2) {
+			for (KafkaTopicPartitionLeader p : parts2) {
 				// check that the element was actually contained
-				assertTrue(allInitialPartitions.remove(p.partition()));
+				assertTrue(allInitialPartitions.remove(p));
 			}
-			for (TopicPartition p : parts3) {
+			for (KafkaTopicPartitionLeader p : parts3) {
 				// check that the element was actually contained
-				assertTrue(allInitialPartitions.remove(p.partition()));
+				assertTrue(allInitialPartitions.remove(p));
 			}
-			
+
 			// all partitions must have been assigned
 			assertTrue(allInitialPartitions.isEmpty());
-			
+
 			// grow the set of partitions and distribute anew
-			
-			List<TopicPartition> parts1new = FlinkKafkaConsumer.assignPartitions(
-					newPartitions, "test-topic", numConsumers, 0);
-			List<TopicPartition> parts2new = FlinkKafkaConsumer.assignPartitions(
-					newPartitions, "test-topic", numConsumers, 1);
-			List<TopicPartition> parts3new = FlinkKafkaConsumer.assignPartitions(
-					newPartitions, "test-topic", numConsumers, 2);
+
+			List<KafkaTopicPartitionLeader> parts1new = FlinkKafkaConsumer.assignPartitions(
+					newPartitions, numConsumers, 0);
+			List<KafkaTopicPartitionLeader> parts2new = FlinkKafkaConsumer.assignPartitions(
+					newPartitions, numConsumers, 1);
+			List<KafkaTopicPartitionLeader> parts3new = FlinkKafkaConsumer.assignPartitions(
+					newPartitions, numConsumers, 2);
 
 			// new partitions must include all old partitions
-			
+
 			assertTrue(parts1new.size() > parts1.size());
 			assertTrue(parts2new.size() > parts2.size());
 			assertTrue(parts3new.size() > parts3.size());
-			
+
 			assertTrue(parts1new.containsAll(parts1));
 			assertTrue(parts2new.containsAll(parts2));
 			assertTrue(parts3new.containsAll(parts3));
@@ -224,17 +248,17 @@ public class KafkaConsumerPartitionAssignmentTest {
 			assertTrue(parts3new.size() >= minNewPartitionsPerConsumer);
 			assertTrue(parts3new.size() <= maxNewPartitionsPerConsumer);
 
-			for (TopicPartition p : parts1new) {
+			for (KafkaTopicPartitionLeader p : parts1new) {
 				// check that the element was actually contained
-				assertTrue(allNewPartitions.remove(p.partition()));
+				assertTrue(allNewPartitions.remove(p));
 			}
-			for (TopicPartition p : parts2new) {
+			for (KafkaTopicPartitionLeader p : parts2new) {
 				// check that the element was actually contained
-				assertTrue(allNewPartitions.remove(p.partition()));
+				assertTrue(allNewPartitions.remove(p));
 			}
-			for (TopicPartition p : parts3new) {
+			for (KafkaTopicPartitionLeader p : parts3new) {
 				// check that the element was actually contained
-				assertTrue(allNewPartitions.remove(p.partition()));
+				assertTrue(allNewPartitions.remove(p));
 			}
 
 			// all partitions must have been assigned
@@ -245,13 +269,5 @@ public class KafkaConsumerPartitionAssignmentTest {
 			fail(e.getMessage());
 		}
 	}
-	
-	private static boolean contains(int[] array, int value) {
-		for (int i : array) {
-			if (i == value) {
-				return true;
-			}
-		}
-		return false;
-	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fc8be1ca/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
index ec7db42..efae922 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
@@ -21,13 +21,16 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.commons.collections.map.LinkedMap;
 
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.junit.Ignore;
 import org.junit.Test;
 
 import java.lang.reflect.Field;
-import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
 import static org.junit.Assert.*;
@@ -82,37 +85,45 @@ public class KafkaConsumerTest {
 			Field offsetsField = FlinkKafkaConsumer.class.getDeclaredField("lastOffsets");
 			Field runningField = FlinkKafkaConsumer.class.getDeclaredField("running");
 			Field mapField = FlinkKafkaConsumer.class.getDeclaredField("pendingCheckpoints");
-			
+
 			offsetsField.setAccessible(true);
 			runningField.setAccessible(true);
 			mapField.setAccessible(true);
 
 			FlinkKafkaConsumer<?> consumer = mock(FlinkKafkaConsumer.class);
 			when(consumer.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
-			
-			long[] testOffsets = new long[] { 43, 6146, 133, 16, 162, 616 };
+
+
+			HashMap<KafkaTopicPartition, Long> testOffsets = new HashMap<>();
+			long[] offsets = new long[] { 43, 6146, 133, 16, 162, 616 };
+			int j = 0;
+			for (long i: offsets) {
+				KafkaTopicPartition ktp = new KafkaTopicPartition("topic", j++);
+				testOffsets.put(ktp, i);
+			}
+
 			LinkedMap map = new LinkedMap();
-			
+
 			offsetsField.set(consumer, testOffsets);
 			runningField.set(consumer, true);
 			mapField.set(consumer, map);
-			
+
 			assertTrue(map.isEmpty());
 
 			// make multiple checkpoints
 			for (long checkpointId = 10L; checkpointId <= 2000L; checkpointId += 9L) {
-				long[] checkpoint = consumer.snapshotState(checkpointId, 47 * checkpointId);
-				assertArrayEquals(testOffsets, checkpoint);
-				
+				HashMap<KafkaTopicPartition, Long> checkpoint = consumer.snapshotState(checkpointId,
47 * checkpointId);
+				assertEquals(testOffsets, checkpoint);
+
 				// change the offsets, make sure the snapshot did not change
-				long[] checkpointCopy = Arrays.copyOf(checkpoint, checkpoint.length);
-				
-				for (int i = 0; i < testOffsets.length; i++) {
-					testOffsets[i] += 1L;
+				HashMap<KafkaTopicPartition, Long> checkpointCopy = (HashMap<KafkaTopicPartition,
Long>) checkpoint.clone();
+
+				for (Map.Entry<KafkaTopicPartition, Long> e: testOffsets.entrySet()) {
+					testOffsets.put(e.getKey(), e.getValue() + 1);
 				}
-				
-				assertArrayEquals(checkpointCopy, checkpoint);
-				
+
+				assertEquals(checkpointCopy, checkpoint);
+
 				assertTrue(map.size() > 0);
 				assertTrue(map.size() <= FlinkKafkaConsumer.MAX_NUM_PENDING_CHECKPOINTS);
 			}
@@ -132,7 +143,7 @@ public class KafkaConsumerTest {
 			props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222");
 			props.setProperty("group.id", "non-existent-group");
 
-			new FlinkKafkaConsumer<>("no op topic", new SimpleStringSchema(), props,
+			new FlinkKafkaConsumer<>(Collections.singletonList("no op topic"), new SimpleStringSchema(),
props,
 					FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER,
 					FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc8be1ca/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 2116c01..044680e 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -32,12 +32,16 @@ import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.collections.map.LinkedMap;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
@@ -52,6 +56,8 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
 import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
 import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
 import org.apache.flink.streaming.connectors.kafka.testutils.DiscardingSink;
@@ -74,6 +80,7 @@ import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.util.Collector;
 
 import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.StringUtils;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.junit.Assert;
 
@@ -87,6 +94,7 @@ import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -94,7 +102,6 @@ import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -148,8 +155,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			stream.print();
 			see.execute("No broker test");
 		} catch(RuntimeException re){
-			Assert.assertTrue("Wrong RuntimeException thrown",
-					re.getMessage().contains("Unable to retrieve any partitions for topic"));
+			Assert.assertTrue("Wrong RuntimeException thrown: " + StringUtils.stringifyException(re),
+					re.getMessage().contains("Unable to retrieve any partitions for the requested topics
[doesntexist]"));
 		}
 	}
 	/**
@@ -166,19 +173,21 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		Assert.assertEquals(0, pendingCheckpoints.size());
 		source.setRuntimeContext(new MockRuntimeContext(1, 0));
 
-		final long[] initialOffsets = new long[] { 1337 };
+		final HashMap<KafkaTopicPartition, Long> initialOffsets = new HashMap<>();
+		initialOffsets.put(new KafkaTopicPartition("testCheckpointing", 0), 1337L);
 
 		// first restore
 		source.restoreState(initialOffsets);
 
 		// then open
 		source.open(new Configuration());
-		long[] state1 = source.snapshotState(1, 15);
+		HashMap<KafkaTopicPartition, Long> state1 = source.snapshotState(1, 15);
 
-		assertArrayEquals(initialOffsets, state1);
+		assertEquals(initialOffsets, state1);
+
+		HashMap<KafkaTopicPartition, Long> state2 = source.snapshotState(2, 30);
+		Assert.assertEquals(initialOffsets, state2);
 
-		long[] state2 = source.snapshotState(2, 30);
-		Assert.assertArrayEquals(initialOffsets, state2);
 		Assert.assertEquals(2, pendingCheckpoints.size());
 
 		source.notifyCheckpointComplete(1);
@@ -772,6 +781,92 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		deleteTestTopic(topic);
 	}
 
+	public void runConsumeMultipleTopics() throws java.lang.Exception {
+		final int NUM_TOPICS = 5;
+		final int NUM_ELEMENTS = 20;
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost",
flinkPort);
+
+		// create topics with content
+		final List<String> topics = new ArrayList<>();
+		for (int i = 0; i < NUM_TOPICS; i++) {
+			final String topic = "topic-" + i;
+			topics.add(topic);
+			// create topic
+			createTestTopic(topic, i + 1 /*partitions*/, 1);
+
+			// write something
+			writeSequence(env, topic, NUM_ELEMENTS, i + 1);
+		}
+
+		// validate getPartitionsForTopic method
+		List<KafkaTopicPartitionLeader> topicPartitions = FlinkKafkaConsumer082.getPartitionsForTopic(topics,
standardProps);
+		Assert.assertEquals((NUM_TOPICS * (NUM_TOPICS + 1))/2, topicPartitions.size());
+
+		KeyedDeserializationSchema<Tuple3<Integer, Integer, String>> readSchema = new
Tuple2WithTopicDeserializationSchema(env.getConfig());
+		DataStreamSource<Tuple3<Integer, Integer, String>> stream = env.addSource(new
FlinkKafkaConsumer082<>(topics, readSchema, standardProps));
+
+		stream.flatMap(new FlatMapFunction<Tuple3<Integer, Integer, String>, Integer>()
{
+			Map<String, Integer> countPerTopic = new HashMap<>(NUM_TOPICS);
+			@Override
+			public void flatMap(Tuple3<Integer, Integer, String> value, Collector<Integer>
out) throws Exception {
+				Integer count = countPerTopic.get(value.f2);
+				if (count == null) {
+					count = 1;
+				} else {
+					count++;
+				}
+				countPerTopic.put(value.f2, count);
+
+				// check map:
+				for (Map.Entry<String, Integer> el: countPerTopic.entrySet()) {
+					if (el.getValue() < NUM_ELEMENTS) {
+						break; // not enough yet
+					}
+					if (el.getValue() > NUM_ELEMENTS) {
+						throw new RuntimeException("There is a failure in the test. I've read " +
+								el.getValue() + " from topic " + el.getKey());
+					}
+				}
+				// we've seen messages from all topics
+				throw new SuccessException();
+			}
+		}).setParallelism(1);
+
+		tryExecute(env, "Count elements from the topics");
+
+
+		// delete all topics again
+		for (int i = 0; i < NUM_TOPICS; i++) {
+			final String topic = "topic-" + i;
+			deleteTestTopic(topic);
+		}
+	}
+
+	private static class Tuple2WithTopicDeserializationSchema implements KeyedDeserializationSchema<Tuple3<Integer,
Integer, String>> {
+
+		TypeSerializer ts;
+		public Tuple2WithTopicDeserializationSchema(ExecutionConfig ec) {
+			ts = TypeInfoParser.parse("Tuple2<Integer, Integer>").createSerializer(ec);
+		}
+
+		@Override
+		public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message,
String topic, long offset) throws IOException {
+			Tuple2<Integer, Integer> t2 = (Tuple2<Integer, Integer>) ts.deserialize(new
ByteArrayInputView(message));
+			return new Tuple3<>(t2.f0, t2.f1, topic);
+		}
+
+		@Override
+		public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
+			return false;
+		}
+
+		@Override
+		public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType()
{
+			return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
+		}
+	}
+
 	/**
 	 * Test Flink's Kafka integration also with very big records (30MB)
 	 * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
@@ -816,13 +911,13 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 				elCnt++;
 				if (value.f0 == -1) {
 					// we should have seen 11 elements now.
-					if(elCnt == 11) {
+					if (elCnt == 11) {
 						throw new SuccessException();
 					} else {
 						throw new RuntimeException("There have been "+elCnt+" elements");
 					}
 				}
-				if(elCnt > 10) {
+				if (elCnt > 10) {
 					throw new RuntimeException("More than 10 elements seen: "+elCnt);
 				}
 			}
@@ -965,7 +1060,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			@Override
 			public void run(SourceContext<Tuple2<Long, PojoValue>> ctx) throws Exception
{
 				Random rnd = new Random(1337);
-				for(long i = 0; i < ELEMENT_COUNT; i++) {
+				for (long i = 0; i < ELEMENT_COUNT; i++) {
 					PojoValue pojo = new PojoValue();
 					pojo.when = new Date(rnd.nextLong());
 					pojo.lon = rnd.nextLong();
@@ -1002,13 +1097,13 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase
{
 			public void flatMap(Tuple2<Long, PojoValue> value, Collector<Object> out)
throws Exception {
 				// the elements should be in order.
 				Assert.assertTrue("Wrong value " + value.f1.lat, value.f1.lat == counter );
-				if(value.f1.lat % 2 == 0) {
+				if (value.f1.lat % 2 == 0) {
 					Assert.assertNull("key was not null", value.f0);
 				} else {
 					Assert.assertTrue("Wrong value " + value.f0, value.f0 == counter);
 				}
 				counter++;
-				if(counter == ELEMENT_COUNT) {
+				if (counter == ELEMENT_COUNT) {
 					// we got the right number of elements
 					throw new SuccessException();
 				}
@@ -1083,6 +1178,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 	private static void writeSequence(StreamExecutionEnvironment env, String topicName, final
int numElements, int parallelism) throws Exception {
 
+		LOG.info("\n===================================\n== Writing sequence of "+numElements+"
into "+topicName+" with p="+parallelism+"\n===================================");
 		TypeInformation<Tuple2<Integer, Integer>> resultType = TypeInfoParser.parse("Tuple2<Integer,
Integer>");
 
 		DataStream<Tuple2<Integer, Integer>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Integer,
Integer>>() {
@@ -1130,14 +1226,14 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase
{
 		// will see each message only once.
 		Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
 		Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap);
-		if(streams.size() != 1) {
+		if (streams.size() != 1) {
 			throw new RuntimeException("Expected only one message stream but got "+streams.size());
 		}
 		List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
-		if(kafkaStreams == null) {
+		if (kafkaStreams == null) {
 			throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
 		}
-		if(kafkaStreams.size() != 1) {
+		if (kafkaStreams.size() != 1) {
 			throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+"
streams");
 		}
 		LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
@@ -1148,7 +1244,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		while(iteratorToRead.hasNext()) {
 			read++;
 			result.add(iteratorToRead.next());
-			if(read == stopAfter) {
+			if (read == stopAfter) {
 				LOG.info("Read "+read+" elements");
 				return result;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc8be1ca/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 5f2cdbc..20846cf 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -118,4 +118,10 @@ public class KafkaITCase extends KafkaConsumerTestBase {
 	public void testBigRecordJob() throws Exception {
 		runBigRecordTestTopology();
 	}
+
+	@Test
+	public void testMultipleTopics() throws Exception {
+		runConsumeMultipleTopics();
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fc8be1ca/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
index 5001364..f4c1899 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerITCase.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.util.serialization.TypeInformationSerializatio
 import org.junit.Test;
 
 import java.io.Serializable;
+import java.util.Collections;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -109,7 +110,7 @@ public class KafkaProducerITCase extends KafkaTestBase {
 			// ------ consuming topology ---------
 			
 			FlinkKafkaConsumer<Tuple2<Long, String>> source = 
-					new FlinkKafkaConsumer<>(topic, deserSchema, standardProps, 
+					new FlinkKafkaConsumer<>(Collections.singletonList(topic), deserSchema, standardProps,
 							FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER,
 							FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL);
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/fc8be1ca/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index d511796..e6e179c 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -34,6 +34,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
 import org.apache.flink.streaming.connectors.kafka.internals.ZooKeeperStringSerializer;
 import org.apache.flink.streaming.connectors.kafka.testutils.SuccessException;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
@@ -54,6 +55,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.BindException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
@@ -360,7 +362,7 @@ public abstract class KafkaTestBase extends TestLogger {
 			catch (InterruptedException e) {
 				// restore interrupted state
 			}
-			List<PartitionInfo> partitions = FlinkKafkaConsumer.getPartitionsForTopic(topic,
standardProps);
+			List<KafkaTopicPartitionLeader> partitions = FlinkKafkaConsumer.getPartitionsForTopic(Collections.singletonList(topic),
standardProps);
 			if (partitions != null && partitions.size() > 0) {
 				return;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc8be1ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
index 6a20e44..917c2330 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
@@ -39,7 +39,7 @@ public interface KeyedDeserializationSchema<T> extends Serializable,
ResultTypeQ
 	 * @param offset the offset of the message in the original source (for example the Kafka
offset)
 	 * @return The deserialized message as an object.
 	 */
-	T deserialize(byte[] messageKey, byte[] message, long offset) throws IOException;
+	T deserialize(byte[] messageKey, byte[] message, String topic, long offset) throws IOException;
 
 	/**
 	 * Method to decide whether the element signals the end of the stream. If

http://git-wip-us.apache.org/repos/asf/flink/blob/fc8be1ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
index fc7bd1e..8d9cf5d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
@@ -35,7 +35,7 @@ public class KeyedDeserializationSchemaWrapper<T> implements KeyedDeserializatio
 		this.deserializationSchema = deserializationSchema;
 	}
 	@Override
-	public T deserialize(byte[] messageKey, byte[] message, long offset) throws IOException
{
+	public T deserialize(byte[] messageKey, byte[] message, String topic, long offset) throws
IOException {
 		return deserializationSchema.deserialize(message);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fc8be1ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
index 1c8efd5..ef9cde5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
@@ -79,7 +79,7 @@ public class TypeInformationKeyValueSerializationSchema<K, V> implements
KeyedDe
 
 
 	@Override
-	public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, long offset) throws
IOException {
+	public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, String topic, long
offset) throws IOException {
 		K key = null;
 		if(messageKey != null) {
 			key = keySerializer.deserialize(new ByteArrayInputView(messageKey));

http://git-wip-us.apache.org/repos/asf/flink/blob/fc8be1ca/tools/maven/checkstyle.xml
----------------------------------------------------------------------
diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml
index 0b20b83..a558116 100644
--- a/tools/maven/checkstyle.xml
+++ b/tools/maven/checkstyle.xml
@@ -71,6 +71,11 @@ under the License.
 			<property name="illegalPattern" value="true"/>
 			<property name="message" value="Use Flink's InstantiationUtil instead of common's SerializationUtils"/>
 		</module>
+		<module name="Regexp">
+			<property name="format" value="org\.apache\.commons\.lang\."/>
+			<property name="illegalPattern" value="true"/>
+			<property name="message" value="Use commons-lang3 instead of commons-lang."/>
+		</module>
 		<module name="NeedBraces">
 			<property name="tokens" value="LITERAL_IF, LITERAL_ELSE"/>
 		</module>


Mime
View raw message