flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [07/14] flink git commit: [FLINK-3067] [kafka connector] Use curator for committing offsets to ZK from Kafka
Date Thu, 31 Dec 2015 14:33:30 GMT
[FLINK-3067] [kafka connector] Use curator for committing offsets to ZK from Kafka

This closes #1451


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4f8c5e83
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4f8c5e83
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4f8c5e83

Branch: refs/heads/master
Commit: 4f8c5e839acbfd67aba112547138a5b0b0d29c9d
Parents: 73e8586
Author: Robert Metzger <rmetzger@apache.org>
Authored: Sat Dec 12 19:44:17 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Dec 31 00:45:27 2015 +0100

----------------------------------------------------------------------
 .../flink-connector-kafka/pom.xml               | 22 ++---
 .../connectors/kafka/FlinkKafkaConsumer.java    | 13 ++-
 .../kafka/internals/ZookeeperOffsetHandler.java | 84 ++++++++++++--------
 .../connectors/kafka/KafkaConsumerTestBase.java | 34 ++++----
 .../connectors/kafka/KafkaTestBase.java         | 12 ++-
 .../internals/ZookeeperOffsetHandlerTest.java   | 17 ++--
 6 files changed, 104 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4f8c5e83/flink-streaming-connectors/flink-connector-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/pom.xml b/flink-streaming-connectors/flink-connector-kafka/pom.xml
index fe763f3..7bd9bcb 100644
--- a/flink-streaming-connectors/flink-connector-kafka/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka/pom.xml
@@ -93,12 +93,7 @@ under the License.
 		</dependency>
 
 		<!-- force using the latest zkclient -->
-		<dependency>
-			<groupId>com.101tec</groupId>
-			<artifactId>zkclient</artifactId>
-			<version>0.7</version>
-			<type>jar</type>
-		</dependency>
+
 
 		<dependency>
 			<groupId>com.google.guava</groupId>
@@ -115,6 +110,12 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded-curator-recipes</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-test-utils</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
@@ -122,15 +123,6 @@ under the License.
 
 	</dependencies>
 
-	<dependencyManagement>
-		<dependencies>
-			<dependency>
-				<groupId>com.101tec</groupId>
-				<artifactId>zkclient</artifactId>
-				<version>0.7</version>
-			</dependency>
-		</dependencies>
-	</dependencyManagement>
 	
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/4f8c5e83/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
index b139e95..69ed9bf 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
@@ -241,7 +241,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 	 * Creates a new Flink Kafka Consumer, using the given type of fetcher and offset handler.
 	 *
 	 * <p>To determine which kink of fetcher and offset handler to use, please refer to
the docs
-	 * at the beginnign of this class.</p>
+	 * at the beginning of this class.</p>
 	 *
 	 * @param topic
 	 *           The Kafka topic to read from.
@@ -264,7 +264,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 	 * Creates a new Flink Kafka Consumer, using the given type of fetcher and offset handler.
 	 * 
 	 * <p>To determine which kink of fetcher and offset handler to use, please refer to
the docs
-	 * at the beginnign of this class.</p>
+	 * at the beginning of this class.</p>
 	 * 
 	 * @param topics
 	 *           The Kafka topics to read from.
@@ -522,7 +522,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 		}
 
 		// the use of clone() is okay here is okay, we just need a new map, the keys are not changed
-		//noinspection unchecked
+		@SuppressWarnings("unchecked")
 		HashMap<KafkaTopicPartition, Long> currentOffsets = (HashMap<KafkaTopicPartition,
Long>) lastOffsets.clone();
 
 		// the map cannot be asynchronously updated, because only one checkpoint call can happen
@@ -570,7 +570,6 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 
 				//noinspection unchecked
 				checkpointOffsets = (HashMap<KafkaTopicPartition, Long>) pendingCheckpoints.remove(posInMap);
-
 				
 				// remove older checkpoints in map
 				for (int i = 0; i < posInMap; i++) {
@@ -613,8 +612,10 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 	 * Thread to periodically commit the current read offset into Zookeeper.
 	 */
 	private static class PeriodicOffsetCommitter<T> extends Thread {
+		
 		private final long commitInterval;
 		private final FlinkKafkaConsumer<T> consumer;
+		
 		private volatile boolean running = true;
 
 		public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer<T> consumer)
{
@@ -625,13 +626,13 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 		@Override
 		public void run() {
 			try {
-
 				while (running) {
 					try {
 						Thread.sleep(commitInterval);
 						//  ------------  commit current offsets ----------------
 
 						// create copy of current offsets
+						//noinspection unchecked
 						HashMap<KafkaTopicPartition, Long> currentOffsets = (HashMap<KafkaTopicPartition,
Long>) consumer.lastOffsets.clone();
 						commitOffsets(currentOffsets, this.consumer);
 					} catch (InterruptedException e) {
@@ -639,8 +640,6 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 							// throw unexpected interruption
 							throw e;
 						}
-						// looks like the thread is being closed. Leave loop
-						break;
 					}
 				}
 			} catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4f8c5e83/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
index f9b8448..f72117d 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
@@ -18,20 +18,18 @@
 
 package org.apache.flink.streaming.connectors.kafka.internals;
 
-import kafka.common.TopicAndPartition;
 import kafka.utils.ZKGroupTopicDirs;
-import kafka.utils.ZkUtils;
 
-import org.I0Itec.zkclient.ZkClient;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.zookeeper.data.Stat;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.Option;
-
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -42,16 +40,14 @@ public class ZookeeperOffsetHandler implements OffsetHandler {
 	private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class);
 	
 	private static final long OFFSET_NOT_SET = FlinkKafkaConsumer.OFFSET_NOT_SET;
-	
-	
-	private final ZkClient zkClient;
-	
+
 	private final String groupId;
 
-	
+	private final CuratorFramework curatorClient;
+
+
 	public ZookeeperOffsetHandler(Properties props) {
 		this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
-		
 		if (this.groupId == null) {
 			throw new IllegalArgumentException("Required property '"
 					+ ConsumerConfig.GROUP_ID_CONFIG + "' has not been set");
@@ -61,30 +57,37 @@ public class ZookeeperOffsetHandler implements OffsetHandler {
 		if (zkConnect == null) {
 			throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been
set");
 		}
+
+		// we use Curator's default timeouts
+		int sessionTimeoutMs =  Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms",
"60000"));
+		int connectionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms",
"15000"));
 		
-		zkClient = new ZkClient(zkConnect,
-				Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "6000")),
-				Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "6000")),
-				new ZooKeeperStringSerializer());
+		// undocumented config options allowing users to configure the retry policy. (they are
"flink." prefixed as they are no official kafka configs)
+		int backoffBaseSleepTime = Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms",
"100"));
+		int backoffMaxRetries =  Integer.valueOf(props.getProperty("flink.zookeeper.max-retries",
"10"));
+		
+		RetryPolicy retryPolicy = new ExponentialBackoffRetry(backoffBaseSleepTime, backoffMaxRetries);
+		curatorClient = CuratorFrameworkFactory.newClient(zkConnect, sessionTimeoutMs, connectionTimeoutMs,
retryPolicy);
+		curatorClient.start();
 	}
 
 
 	@Override
-	public void commit(Map<KafkaTopicPartition, Long> offsetsToCommit) {
+	public void commit(Map<KafkaTopicPartition, Long> offsetsToCommit) throws Exception
{
 		for (Map.Entry<KafkaTopicPartition, Long> entry : offsetsToCommit.entrySet()) {
 			KafkaTopicPartition tp = entry.getKey();
 			long offset = entry.getValue();
 			
 			if (offset >= 0) {
-				setOffsetInZooKeeper(zkClient, groupId, tp.getTopic(), tp.getPartition(), offset);
+				setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), offset);
 			}
 		}
 	}
 
 	@Override
-	public void seekFetcherToInitialOffsets(List<KafkaTopicPartitionLeader> partitions,
Fetcher fetcher) {
+	public void seekFetcherToInitialOffsets(List<KafkaTopicPartitionLeader> partitions,
Fetcher fetcher) throws Exception {
 		for (KafkaTopicPartitionLeader tp : partitions) {
-			long offset = getOffsetFromZooKeeper(zkClient, groupId, tp.getTopicPartition().getTopic(),
tp.getTopicPartition().getPartition());
+			long offset = getOffsetFromZooKeeper(curatorClient, groupId, tp.getTopicPartition().getTopic(),
tp.getTopicPartition().getPartition());
 
 			if (offset != OFFSET_NOT_SET) {
 				LOG.info("Offset for partition {} was set to {} in ZooKeeper. Seeking fetcher to that
position.",
@@ -98,30 +101,43 @@ public class ZookeeperOffsetHandler implements OffsetHandler {
 
 	@Override
 	public void close() throws IOException {
-		zkClient.close();
+		curatorClient.close();
 	}
 
 	// ------------------------------------------------------------------------
 	//  Communication with Zookeeper
 	// ------------------------------------------------------------------------
 	
-	public static void setOffsetInZooKeeper(ZkClient zkClient, String groupId, String topic,
int partition, long offset) {
-		TopicAndPartition tap = new TopicAndPartition(topic, partition);
-		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
-		ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition(),
Long.toString(offset));
+	public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId,
String topic, int partition, long offset) throws Exception {
+		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
+		String path = topicDirs.consumerOffsetDir() + "/" + partition;
+		curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
+		byte[] data = Long.toString(offset).getBytes();
+		curatorClient.setData().forPath(path, data);
 	}
 
-	public static long getOffsetFromZooKeeper(ZkClient zkClient, String groupId, String topic,
int partition) {
-		TopicAndPartition tap = new TopicAndPartition(topic, partition);
-		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
-
-		scala.Tuple2<Option<String>, Stat> data = ZkUtils.readDataMaybeNull(zkClient,
-				topicDirs.consumerOffsetDir() + "/" + tap.partition());
-
-		if (data._1().isEmpty()) {
+	public static long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId,
String topic, int partition) throws Exception {
+		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
+		String path = topicDirs.consumerOffsetDir() + "/" + partition;
+		curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
+		
+		byte[] data = curatorClient.getData().forPath(path);
+		
+		if (data == null) {
 			return OFFSET_NOT_SET;
 		} else {
-			return Long.valueOf(data._1().get());
+			String asString = new String(data);
+			if (asString.length() == 0) {
+				return OFFSET_NOT_SET;
+			} else {
+				try {
+					return Long.parseLong(asString);
+				} catch (NumberFormatException e) {
+					throw new Exception(String.format(
+						"The offset in ZooKeeper for group '%s', topic '%s', partition %d is a malformed string:
%s",
+						groupId, topic, partition, asString));
+				}
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4f8c5e83/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 4d8b7c3..4f71384 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -31,6 +31,7 @@ import kafka.server.KafkaServer;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.collections.map.LinkedMap;
 
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
@@ -58,6 +59,7 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
+import org.apache.flink.streaming.connectors.kafka.internals.ZooKeeperStringSerializer;
 import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
 import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
 import org.apache.flink.streaming.connectors.kafka.testutils.DiscardingSink;
@@ -260,7 +262,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		readSequence(env2, standardProps, parallelism, topicName, 100, 0);
 
-		ZkClient zkClient = createZookeeperClient();
+		CuratorFramework zkClient = createZookeeperClient();
 
 		long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(),
topicName, 0);
 		long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(),
topicName, 1);
@@ -321,7 +323,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		readSequence(env2, readProps, parallelism, topicName, 100, 0);
 
 		// get the offset
-		ZkClient zkClient = createZookeeperClient();
+		CuratorFramework zkClient = createZookeeperClient();
 
 		long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(),
topicName, 0);
 		long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, standardCC.groupId(),
topicName, 1);
@@ -796,7 +798,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		writeSequence(env, topic, 20, parallelism);
 
 		// set invalid offset:
-		ZkClient zkClient = createZookeeperClient();
+		CuratorFramework zkClient = createZookeeperClient();
 		ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, standardCC.groupId(), topic, 0, 1234);
 
 		// read from topic
@@ -1016,20 +1018,24 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase
{
 				topic, parallelism, numElementsPerPartition, true);
 
 		// find leader to shut down
-		ZkClient zkClient = createZookeeperClient();
 		PartitionMetadata firstPart = null;
-		do {
-			if (firstPart != null) {
-				LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
-				// not the first try. Sleep a bit
-				Thread.sleep(150);
-			}
+		{
+			ZkClient zkClient = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
+					standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
+
+			do {
+				if (firstPart != null) {
+					LOG.info("Unable to find leader. error code {}", firstPart.errorCode());
+					// not the first try. Sleep a bit
+					Thread.sleep(150);
+				}
 
-			Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic,
zkClient).partitionsMetadata();
-			firstPart = partitionMetadata.head();
+				Seq<PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic,
zkClient).partitionsMetadata();
+				firstPart = partitionMetadata.head();
+			}
+			while (firstPart.errorCode() != 0);
+			zkClient.close();
 		}
-		while (firstPart.errorCode() != 0);
-		zkClient.close();
 
 		final kafka.cluster.Broker leaderToShutDown = firstPart.leader().get();
 		final String leaderToShutDownConnection = 

http://git-wip-us.apache.org/repos/asf/flink/blob/4f8c5e83/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 3550bd9..779cce9 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -27,6 +27,10 @@ import kafka.server.KafkaServer;
 import org.I0Itec.zkclient.ZkClient;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.test.TestingServer;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
@@ -292,9 +296,11 @@ public abstract class KafkaTestBase extends TestLogger {
 	//  Execution utilities
 	// ------------------------------------------------------------------------
 	
-	protected ZkClient createZookeeperClient() {
-		return new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
-				standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
+	protected CuratorFramework createZookeeperClient() {
+		RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 10);
+		CuratorFramework curatorClient = CuratorFrameworkFactory.newClient(standardProps.getProperty("zookeeper.connect"),
retryPolicy);
+		curatorClient.start();
+		return curatorClient;
 	}
 	
 	protected static void tryExecute(StreamExecutionEnvironment see, String name) throws Exception
{

http://git-wip-us.apache.org/repos/asf/flink/blob/4f8c5e83/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
index 27ad2e8..8d16da0 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandlerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kafka.internals;
 import kafka.admin.AdminUtils;
 
 import org.I0Itec.zkclient.ZkClient;
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.streaming.connectors.kafka.KafkaTestBase;
 
 import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
@@ -41,14 +42,20 @@ public class ZookeeperOffsetHandlerTest extends KafkaTestBase {
 			
 			final long offset = (long) (Math.random() * Long.MAX_VALUE);
 
-			ZkClient zkClient = createZookeeperClient();
-			AdminUtils.createTopic(zkClient, topicName, 3, 2, new Properties());
+			CuratorFramework curatorFramework = createZookeeperClient();
+
+			{
+				ZkClient zkClient = new ZkClient(standardCC.zkConnect(), standardCC.zkSessionTimeoutMs(),
+						standardCC.zkConnectionTimeoutMs(), new ZooKeeperStringSerializer());
+				AdminUtils.createTopic(zkClient, topicName, 3, 2, new Properties());
+				zkClient.close();
+			}
 				
-			ZookeeperOffsetHandler.setOffsetInZooKeeper(zkClient, groupId, topicName, 0, offset);
+			ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorFramework, groupId, topicName, 0, offset);
 	
-			long fetchedOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(zkClient, groupId,
topicName, 0);
+			long fetchedOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, groupId,
topicName, 0);
 
-			zkClient.close();
+			curatorFramework.close();
 			
 			assertEquals(offset, fetchedOffset);
 		}


Mime
View raw message