flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/2] flink git commit: [FLINK-7174] [kafka connector] Bump Kafka 0.10 dependency to 0.10.2.1
Date Sun, 23 Jul 2017 16:48:40 GMT
[FLINK-7174] [kafka connector] Bump Kafka 0.10 dependency to 0.10.2.1

This closes #4321


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/02850545
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/02850545
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/02850545

Branch: refs/heads/master
Commit: 02850545e3143600c7265e737e278663e3264317
Parents: d80ba4d
Author: Piotr Nowojski <piotr.nowojski@gmail.com>
Authored: Thu Jul 13 11:07:28 2017 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Sun Jul 23 16:00:18 2017 +0200

----------------------------------------------------------------------
 .../flink-connector-kafka-0.10/pom.xml          |  2 +-
 .../kafka/KafkaTestEnvironmentImpl.java         | 19 +++++++++---
 .../kafka/internal/KafkaConsumerThread.java     | 26 +++++++++++++++-
 .../kafka/internal/KafkaConsumerThreadTest.java | 32 +++++++++++++++++++-
 .../internals/AbstractPartitionDiscoverer.java  |  2 +-
 5 files changed, 73 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/02850545/flink-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml
index 143cb7f..0ecaebc 100644
--- a/flink-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml
@@ -37,7 +37,7 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<kafka.version>0.10.0.1</kafka.version>
+		<kafka.version>0.10.2.1</kafka.version>
 	</properties>
 
 	<dependencies>

http://git-wip-us.apache.org/repos/asf/flink/blob/02850545/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 051d91e..f437060 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -28,9 +28,9 @@ import org.apache.flink.util.NetUtils;
 
 import kafka.admin.AdminUtils;
 import kafka.common.KafkaException;
+import kafka.metrics.KafkaMetricsReporter;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
-import kafka.utils.SystemTime$;
 import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.collections.list.UnmodifiableList;
@@ -40,8 +40,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,6 +59,8 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
+import scala.collection.mutable.ArraySeq;
+
 import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -248,10 +252,17 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
 
 				if (secureMode) {
-					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT))
+ ",";
+					brokerConnectionString += hostAndPortToUrlString(
+							KafkaTestEnvironment.KAFKA_HOST,
+							brokers.get(i).socketServer().boundPort(
+									ListenerName.forSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT)));
 				} else {
-					brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT))
+ ",";
+					brokerConnectionString += hostAndPortToUrlString(
+							KafkaTestEnvironment.KAFKA_HOST,
+							brokers.get(i).socketServer().boundPort(
+									ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)));
 				}
+				brokerConnectionString +=  ",";
 			}
 
 			LOG.info("ZK and KafkaServer started.");
@@ -415,7 +426,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
 			try {
 				scala.Option<String> stringNone = scala.Option.apply(null);
-				KafkaServer server = new KafkaServer(kafkaConfig, SystemTime$.MODULE$, stringNone);
+				KafkaServer server = new KafkaServer(kafkaConfig, Time.SYSTEM, stringNone, new ArraySeq<KafkaMetricsReporter>(0));
 				server.startup();
 				return server;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/02850545/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
index 65300cd..de8fb0b 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
@@ -92,6 +92,9 @@ public class KafkaConsumerThread extends Thread {
 	/** This lock is used to isolate the consumer for partition reassignment. */
 	private final Object consumerReassignmentLock;
 
+	/** Indication if this consumer has any assigned partition. */
+	private boolean hasAssignedPartitions;
+
 	/**
 	 * Flag to indicate whether an external operation ({@link #setOffsetsToCommit(Map)} or {@link
#shutdown()})
 	 * had attempted to wakeup the consumer while it was isolated for partition reassignment.
@@ -210,7 +213,16 @@ public class KafkaConsumerThread extends Thread {
 				}
 
 				try {
-					newPartitions = unassignedPartitionsQueue.pollBatch();
+					if (hasAssignedPartitions) {
+						newPartitions = unassignedPartitionsQueue.pollBatch();
+					}
+					else {
+						// if no assigned partitions block until we get at least one
+						// instead of hot spinning this loop. We rely on a fact that
+						// unassignedPartitionsQueue will be closed on a shutdown, so
+						// we don't block indefinitely
+						newPartitions = unassignedPartitionsQueue.getBatchBlocking();
+					}
 					if (newPartitions != null) {
 						reassignPartitions(newPartitions);
 					}
@@ -218,6 +230,11 @@ public class KafkaConsumerThread extends Thread {
 					continue;
 				}
 
+				if (!hasAssignedPartitions) {
+					// Without assigned partitions KafkaConsumer.poll will throw an exception
+					continue;
+				}
+
 				// get the next batch of records, unless we did not manage to hand the old batch over
 				if (records == null) {
 					try {
@@ -264,6 +281,9 @@ public class KafkaConsumerThread extends Thread {
 	public void shutdown() {
 		running = false;
 
+		// wake up all blocking calls on the queue
+		unassignedPartitionsQueue.close();
+
 		// We cannot call close() on the KafkaConsumer, because it will actually throw
 		// an exception if a concurrent call is in progress
 
@@ -335,6 +355,10 @@ public class KafkaConsumerThread extends Thread {
 	 */
 	@VisibleForTesting
 	void reassignPartitions(List<KafkaTopicPartitionState<TopicPartition>> newPartitions)
throws Exception {
+		if (newPartitions.size() == 0) {
+			return;
+		}
+		hasAssignedPartitions = true;
 		boolean reassignmentStarted = false;
 
 		// since the reassignment may introduce several Kafka blocking calls that cannot be interrupted,

http://git-wip-us.apache.org/repos/asf/flink/blob/02850545/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
index 091cd71..3a2f84a 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
@@ -61,6 +61,36 @@ import static org.mockito.Mockito.when;
  */
 public class KafkaConsumerThreadTest {
 
+	@Test(timeout = 10000)
+	public void testCloseWithoutAssignedPartitions() throws Exception {
+		// no initial assignment
+		final KafkaConsumer<byte[], byte[]> mockConsumer = createMockConsumer(
+			new LinkedHashMap<TopicPartition, Long>(),
+			Collections.<TopicPartition, Long>emptyMap(),
+			false,
+			null,
+			null);
+
+		// setup latch so the test waits until testThread is blocked on getBatchBlocking method
+		final MultiShotLatch getBatchBlockingInvoked = new MultiShotLatch();
+		final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue
=
+			new ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>>() {
+				@Override
+				public List<KafkaTopicPartitionState<TopicPartition>> getBatchBlocking()
throws InterruptedException {
+					getBatchBlockingInvoked.trigger();
+					return super.getBatchBlocking();
+				}
+			};
+
+		final TestKafkaConsumerThread testThread =
+			new TestKafkaConsumerThread(mockConsumer, unassignedPartitionsQueue, new Handover());
+
+		testThread.start();
+		getBatchBlockingInvoked.await();
+		testThread.shutdown();
+		testThread.join();
+	}
+
 	/**
 	 * Tests reassignment works correctly in the case when:
 	 *  - the consumer initially had no assignments
@@ -744,6 +774,7 @@ public class KafkaConsumerThreadTest {
 			final OneShotLatch continueAssignmentLatch) {
 
 		final KafkaConsumer<byte[], byte[]> mockConsumer = mock(KafkaConsumer.class);
+
 		when(mockConsumer.assignment()).thenAnswer(new Answer<Object>() {
 			@Override
 			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
@@ -754,7 +785,6 @@ public class KafkaConsumerThreadTest {
 				if (continueAssignmentLatch != null) {
 					continueAssignmentLatch.await();
 				}
-
 				return mockConsumerAssignmentAndPosition.keySet();
 			}
 		});

http://git-wip-us.apache.org/repos/asf/flink/blob/02850545/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
index d55099a..725092e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
@@ -196,7 +196,7 @@ public abstract class AbstractPartitionDiscoverer {
 	 */
 	public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
 		if (isUndiscoveredPartition(partition)) {
-				topicsToLargestDiscoveredPartitionId.put(partition.getTopic(), partition.getPartition());
+			topicsToLargestDiscoveredPartitionId.put(partition.getTopic(), partition.getPartition());
 
 			return shouldAssignToThisSubtask(partition, indexOfThisSubtask, numParallelSubtasks);
 		}


Mime
View raw message