flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [3/9] flink git commit: [FLINK-3058] Add support for Kafka 0.9.0.0
Date Wed, 20 Jan 2016 19:31:48 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
deleted file mode 100644
index f269aa3..0000000
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * A serializable representation of a kafka topic and a partition.
- * Used as an operator state for the Kafka consumer
- */
-public class KafkaTopicPartition implements Serializable {
-
-	private static final long serialVersionUID = 722083576322742325L;
-
-	private final String topic;
-	private final int partition;
-	private final int cachedHash;
-
-	public KafkaTopicPartition(String topic, int partition) {
-		this.topic = checkNotNull(topic);
-		this.partition = partition;
-		this.cachedHash = 31 * topic.hashCode() + partition;
-	}
-
-	public String getTopic() {
-		return topic;
-	}
-
-	public int getPartition() {
-		return partition;
-	}
-
-	@Override
-	public String toString() {
-		return "KafkaTopicPartition{" +
-				"topic='" + topic + '\'' +
-				", partition=" + partition +
-				'}';
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (!(o instanceof KafkaTopicPartition)) {
-			return false;
-		}
-
-		KafkaTopicPartition that = (KafkaTopicPartition) o;
-
-		if (partition != that.partition) {
-			return false;
-		}
-		return topic.equals(that.topic);
-	}
-
-	@Override
-	public int hashCode() {
-		return cachedHash;
-	}
-
-
-	// ------------------- Utilities -------------------------------------
-
-	/**
-	 * Returns a unique list of topics from the topic partition map
-	 *
-	 * @param topicPartitionMap A map of KafkaTopicPartition's
-	 * @return A unique list of topics from the input map
-	 */
-	public static List<String> getTopics(Map<KafkaTopicPartition, ?> topicPartitionMap) {
-		HashSet<String> uniqueTopics = new HashSet<>();
-		for (KafkaTopicPartition ktp: topicPartitionMap.keySet()) {
-			uniqueTopics.add(ktp.getTopic());
-		}
-		return new ArrayList<>(uniqueTopics);
-	}
-
-	public static String toString(Map<KafkaTopicPartition, Long> map) {
-		StringBuilder sb = new StringBuilder();
-		for (Map.Entry<KafkaTopicPartition, Long> p: map.entrySet()) {
-			KafkaTopicPartition ktp = p.getKey();
-			sb.append(ktp.getTopic()).append(":").append(ktp.getPartition()).append("=").append(p.getValue()).append(", ");
-		}
-		return sb.toString();
-	}
-
-	/**
-	 * Checks whether this partition is contained in the map with KafkaTopicPartitionLeaders
-	 *
-	 * @param map The map of KafkaTopicPartitionLeaders
-	 * @return true if the element is contained.
-	 */
-	public boolean isContained(Map<KafkaTopicPartitionLeader, ?> map) {
-		for(Map.Entry<KafkaTopicPartitionLeader, ?> entry : map.entrySet()) {
-			if(entry.getKey().getTopicPartition().equals(this)) {
-				return true;
-			}
-		}
-		return false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
deleted file mode 100644
index 8dd9a52..0000000
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.apache.kafka.common.Node;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Serializable Topic Partition info with leader Node information.
- * This class is used at runtime.
- */
-public class KafkaTopicPartitionLeader implements Serializable {
-
-	private static final long serialVersionUID = 9145855900303748582L;
-
-	private final int leaderId;
-	private final int leaderPort;
-	private final String leaderHost;
-	private final KafkaTopicPartition topicPartition;
-	private final int cachedHash;
-
-	public KafkaTopicPartitionLeader(KafkaTopicPartition topicPartition, Node leader) {
-		this.topicPartition = topicPartition;
-		if (leader == null) {
-			this.leaderId = -1;
-			this.leaderHost = null;
-			this.leaderPort = -1;
-		} else {
-			this.leaderId = leader.id();
-			this.leaderPort = leader.port();
-			this.leaderHost = leader.host();
-		}
-		int cachedHash = (leader == null) ? 14 : leader.hashCode();
-		this.cachedHash = 31 * cachedHash + topicPartition.hashCode();
-	}
-
-	public KafkaTopicPartition getTopicPartition() {
-		return topicPartition;
-	}
-
-	public Node getLeader() {
-		if (this.leaderId == -1) {
-			return null;
-		} else {
-			return new Node(leaderId, leaderHost, leaderPort);
-		}
-	}
-
-	public static Object toString(List<KafkaTopicPartitionLeader> partitions) {
-		StringBuilder sb = new StringBuilder();
-		for (KafkaTopicPartitionLeader p: partitions) {
-			sb.append(p.getTopicPartition().getTopic()).append(":").append(p.getTopicPartition().getPartition()).append(", ");
-		}
-		return sb.toString();
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (!(o instanceof KafkaTopicPartitionLeader)) {
-			return false;
-		}
-
-		KafkaTopicPartitionLeader that = (KafkaTopicPartitionLeader) o;
-
-		if (!topicPartition.equals(that.topicPartition)) {
-			return false;
-		}
-		return leaderId == that.leaderId && leaderPort == that.leaderPort && leaderHost.equals(that.leaderHost);
-	}
-
-	@Override
-	public int hashCode() {
-		return cachedHash;
-	}
-
-	@Override
-	public String toString() {
-		return "KafkaTopicPartitionLeader{" +
-				"leaderId=" + leaderId +
-				", leaderPort=" + leaderPort +
-				", leaderHost='" + leaderHost + '\'' +
-				", topic=" + topicPartition.getTopic() +
-				", partition=" + topicPartition.getPartition() +
-				'}';
-	}
-
-
-	/**
-	 * Replaces an existing KafkaTopicPartition ignoring the leader in the given map.
-	 *
-	 * @param newKey new topicpartition
-	 * @param newValue new offset
-	 * @param map map to do the search in
-	 * @return oldValue the old value (offset)
-	 */
-	public static Long replaceIgnoringLeader(KafkaTopicPartitionLeader newKey, Long newValue, Map<KafkaTopicPartitionLeader, Long> map) {
-		for(Map.Entry<KafkaTopicPartitionLeader, Long> entry: map.entrySet()) {
-			if(entry.getKey().getTopicPartition().equals(newKey.getTopicPartition())) {
-				Long oldValue = map.remove(entry.getKey());
-				if(map.put(newKey, newValue) != null) {
-					throw new IllegalStateException("Key was not removed before");
-				}
-				return oldValue;
-			}
-		}
-		return null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
deleted file mode 100644
index b51ad61..0000000
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
+++ /dev/null
@@ -1,648 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import kafka.api.FetchRequestBuilder;
-import kafka.api.OffsetRequest;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.common.ErrorMapping;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.OffsetResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.MessageAndOffset;
-
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
-
-import org.apache.flink.util.StringUtils;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.Node;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * This fetcher uses Kafka's low-level API to pull data from a specific
- * set of topics and partitions.
- * 
- * <p>This code is in parts based on the tutorial code for the low-level Kafka consumer.</p>
- */
-public class LegacyFetcher implements Fetcher {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(LegacyFetcher.class);
-
-	
-	/** The properties that configure the Kafka connection */
-	private final Properties config;
-	
-	/** The task name, to give more readable names to the spawned threads */
-	private final String taskName;
-	
-	/** The first error that occurred in a connection thread */
-	private final AtomicReference<Throwable> error;
-
-	/** The partitions that the fetcher should read, with their starting offsets */
-	private Map<KafkaTopicPartitionLeader, Long> partitionsToRead;
-
-	/** The seek() method might receive KafkaTopicPartition's without leader information
-	 * (for example when restoring).
-	 * If there are elements in this list, we'll fetch the leader from Kafka.
-	 **/
-	private Map<KafkaTopicPartition, Long> partitionsToReadWithoutLeader;
-	
-	/** Reference the the thread that executed the run() method. */
-	private volatile Thread mainThread;
-	
-	/** Flag to shot the fetcher down */
-	private volatile boolean running = true;
-
-	public LegacyFetcher(List<KafkaTopicPartitionLeader> partitions, Properties props, String taskName) {
-		this.config = checkNotNull(props, "The config properties cannot be null");
-		//this.topic = checkNotNull(topic, "The topic cannot be null");
-		this.partitionsToRead = new HashMap<>();
-		for (KafkaTopicPartitionLeader p: partitions) {
-			partitionsToRead.put(p, FlinkKafkaConsumer.OFFSET_NOT_SET);
-		}
-		this.taskName = taskName;
-		this.error = new AtomicReference<>();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Fetcher methods
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public void seek(KafkaTopicPartition topicPartition, long offsetToRead) {
-		if (partitionsToRead == null) {
-			throw new IllegalArgumentException("No partitions to read set");
-		}
-		if (!topicPartition.isContained(partitionsToRead)) {
-			throw new IllegalArgumentException("Can not set offset on a partition (" + topicPartition
-					+ ") we are not going to read. Partitions to read " + partitionsToRead);
-		}
-		if (partitionsToReadWithoutLeader == null) {
-			partitionsToReadWithoutLeader = new HashMap<>();
-		}
-		partitionsToReadWithoutLeader.put(topicPartition, offsetToRead);
-	}
-	
-	@Override
-	public void close() {
-		// flag needs to be check by the run() method that creates the spawned threads
-		this.running = false;
-		
-		// all other cleanup is made by the run method itself
-	}
-
-	@Override
-	public <T> void run(SourceFunction.SourceContext<T> sourceContext,
-						KeyedDeserializationSchema<T> deserializer,
-						HashMap<KafkaTopicPartition, Long> lastOffsets) throws Exception {
-		
-		if (partitionsToRead == null || partitionsToRead.size() == 0) {
-			throw new IllegalArgumentException("No partitions set");
-		}
-		
-		// NOTE: This method is needs to always release all resources it acquires
-		
-		this.mainThread = Thread.currentThread();
-
-		LOG.info("Reading from partitions " + partitionsToRead + " using the legacy fetcher");
-
-		// get lead broker if necessary
-		if (partitionsToReadWithoutLeader != null && partitionsToReadWithoutLeader.size() > 0) {
-			LOG.info("Refreshing leader information for partitions {}", KafkaTopicPartition.toString(partitionsToReadWithoutLeader));
-			// NOTE: The kafka client apparently locks itself in an infinite loop sometimes
-			// when it is interrupted, so we run it only in a separate thread.
-			// since it sometimes refuses to shut down, we resort to the admittedly harsh
-			// means of killing the thread after a timeout.
-			PartitionInfoFetcher infoFetcher = new PartitionInfoFetcher(KafkaTopicPartition.getTopics(partitionsToReadWithoutLeader), config);
-			infoFetcher.start();
-
-			KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 60000);
-			watchDog.start();
-
-			List<KafkaTopicPartitionLeader> topicPartitionWithLeaderList = infoFetcher.getPartitions();
-
-			// replace potentially outdated leader information in partitionsToRead with fresh data from topicPartitionWithLeader
-			for (Map.Entry<KafkaTopicPartition, Long> pt: partitionsToReadWithoutLeader.entrySet()) {
-				KafkaTopicPartitionLeader topicPartitionWithLeader = null;
-				// go through list
-				for (KafkaTopicPartitionLeader withLeader: topicPartitionWithLeaderList) {
-					if (withLeader.getTopicPartition().equals(pt.getKey())) {
-						topicPartitionWithLeader = withLeader;
-						break;
-					}
-				}
-				if (topicPartitionWithLeader == null) {
-					throw new IllegalStateException("Unable to find topic/partition leader information");
-				}
-				Long removed = KafkaTopicPartitionLeader.replaceIgnoringLeader(topicPartitionWithLeader, pt.getValue(), partitionsToRead);
-				if (removed == null) {
-					throw new IllegalStateException("Seek request on unknown topic partition");
-				}
-			}
-		}
-
-
-		// build a map for each broker with its partitions
-		Map<Node, List<FetchPartition>> fetchBrokers = new HashMap<>();
-
-		for (Map.Entry<KafkaTopicPartitionLeader, Long> entry : partitionsToRead.entrySet()) {
-			final KafkaTopicPartitionLeader topicPartition = entry.getKey();
-			final long offset = entry.getValue();
-
-			List<FetchPartition> partitions = fetchBrokers.get(topicPartition.getLeader());
-			if (partitions == null) {
-				partitions = new ArrayList<>();
-				fetchBrokers.put(topicPartition.getLeader(), partitions);
-			}
-
-			partitions.add(new FetchPartition(topicPartition.getTopicPartition().getTopic(), topicPartition.getTopicPartition().getPartition(), offset));
-		}
-
-		// create SimpleConsumers for each broker
-		ArrayList<SimpleConsumerThread<?>> consumers = new ArrayList<>(fetchBrokers.size());
-		
-		for (Map.Entry<Node, List<FetchPartition>> brokerInfo : fetchBrokers.entrySet()) {
-			final Node broker = brokerInfo.getKey();
-			final List<FetchPartition> partitionsList = brokerInfo.getValue();
-			
-			FetchPartition[] partitions = partitionsList.toArray(new FetchPartition[partitionsList.size()]);
-
-			SimpleConsumerThread<T> thread = new SimpleConsumerThread<>(this, config,
-					broker, partitions, sourceContext, deserializer, lastOffsets);
-
-			thread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)",
-					taskName, broker.id(), broker.host(), broker.port()));
-			thread.setDaemon(true);
-			consumers.add(thread);
-		}
-		
-		// last check whether we should abort.
-		if (!running) {
-			return;
-		}
-		
-		// start all consumer threads
-		for (SimpleConsumerThread<?> t : consumers) {
-			LOG.info("Starting thread {}", t.getName());
-			t.start();
-		}
-		
-		// wait until all consumer threads are done, or until we are aborted, or until
-		// an error occurred in one of the fetcher threads
-		try {
-			boolean someConsumersRunning = true;
-			while (running && error.get() == null && someConsumersRunning) {
-				try {
-					// wait for the consumer threads. if an error occurs, we are interrupted
-					for (SimpleConsumerThread<?> t : consumers) {
-						t.join();
-					}
-	
-					// safety net
-					someConsumersRunning = false;
-					for (SimpleConsumerThread<?> t : consumers) {
-						someConsumersRunning |= t.isAlive();
-					}
-				}
-				catch (InterruptedException e) {
-					// ignore. we should notice what happened in the next loop check
-				}
-			}
-			
-			// make sure any asynchronous error is noticed
-			Throwable error = this.error.get();
-			if (error != null) {
-				throw new Exception(error.getMessage(), error);
-			}
-		}
-		finally {
-			// make sure that in any case (completion, abort, error), all spawned threads are stopped
-			for (SimpleConsumerThread<?> t : consumers) {
-				if (t.isAlive()) {
-					t.cancel();
-				}
-			}
-		}
-	}
-	
-	/**
-	 * Reports an error from a fetch thread. This will cause the main thread to see this error,
-	 * abort, and cancel all other fetch threads.
-	 * 
-	 * @param error The error to report.
-	 */
-	@Override
-	public void stopWithError(Throwable error) {
-		if (this.error.compareAndSet(null, error)) {
-			// we are the first to report an error
-			if (mainThread != null) {
-				mainThread.interrupt();
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Representation of a partition to fetch.
-	 */
-	private static class FetchPartition {
-
-		final String topic;
-		
-		/** ID of the partition within the topic (0 indexed, as given by Kafka) */
-		final int partition;
-		
-		/** Offset pointing at the next element to read from that partition. */
-		long nextOffsetToRead;
-
-		FetchPartition(String topic, int partition, long nextOffsetToRead) {
-			this.topic = topic;
-			this.partition = partition;
-			this.nextOffsetToRead = nextOffsetToRead;
-		}
-		
-		@Override
-		public String toString() {
-			return "FetchPartition {topic=" + topic +", partition=" + partition + ", offset=" + nextOffsetToRead + '}';
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Per broker fetcher
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * Each broker needs its separate connection. This thread implements the connection to
-	 * one broker. The connection can fetch multiple partitions from the broker.
-	 * 
-	 * @param <T> The data type fetched.
-	 */
-	private static class SimpleConsumerThread<T> extends Thread {
-		
-		private final SourceFunction.SourceContext<T> sourceContext;
-		private final KeyedDeserializationSchema<T> deserializer;
-		private final HashMap<KafkaTopicPartition, Long> offsetsState;
-		
-		private final FetchPartition[] partitions;
-		
-		private final Node broker;
-
-		private final Properties config;
-
-		private final LegacyFetcher owner;
-
-		private SimpleConsumer consumer;
-		
-		private volatile boolean running = true;
-
-
-		// exceptions are thrown locally
-		public SimpleConsumerThread(LegacyFetcher owner,
-									Properties config,
-									Node broker,
-									FetchPartition[] partitions,
-									SourceFunction.SourceContext<T> sourceContext,
-									KeyedDeserializationSchema<T> deserializer,
-									HashMap<KafkaTopicPartition, Long> offsetsState) {
-			this.owner = owner;
-			this.config = config;
-			this.broker = broker;
-			this.partitions = partitions;
-			this.sourceContext = checkNotNull(sourceContext);
-			this.deserializer = checkNotNull(deserializer);
-			this.offsetsState = checkNotNull(offsetsState);
-		}
-
-		@Override
-		public void run() {
-			LOG.info("Starting to fetch from {}", Arrays.toString(this.partitions));
-			try {
-				// set up the config values
-				final String clientId = "flink-kafka-consumer-legacy-" + broker.id();
-
-				// these are the actual configuration values of Kafka + their original default values.
-				final int soTimeout = Integer.valueOf(config.getProperty("socket.timeout.ms", "30000"));
-				final int bufferSize = Integer.valueOf(config.getProperty("socket.receive.buffer.bytes", "65536"));
-				final int fetchSize = Integer.valueOf(config.getProperty("fetch.message.max.bytes", "1048576"));
-				final int maxWait = Integer.valueOf(config.getProperty("fetch.wait.max.ms", "100"));
-				final int minBytes = Integer.valueOf(config.getProperty("fetch.min.bytes", "1"));
-				
-				// create the Kafka consumer that we actually use for fetching
-				consumer = new SimpleConsumer(broker.host(), broker.port(), soTimeout, bufferSize, clientId);
-
-				// make sure that all partitions have some offsets to start with
-				// those partitions that do not have an offset from a checkpoint need to get
-				// their start offset from ZooKeeper
-				{
-					List<FetchPartition> partitionsToGetOffsetsFor = new ArrayList<>();
-
-					for (FetchPartition fp : partitions) {
-						if (fp.nextOffsetToRead == FlinkKafkaConsumer.OFFSET_NOT_SET) {
-							// retrieve the offset from the consumer
-							partitionsToGetOffsetsFor.add(fp);
-						}
-					}
-					if (partitionsToGetOffsetsFor.size() > 0) {
-						getLastOffset(consumer, partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config));
-						LOG.info("No prior offsets found for some partitions. Fetched the following start offsets {}", partitionsToGetOffsetsFor);
-					}
-				}
-				
-				// Now, the actual work starts :-)
-				int offsetOutOfRangeCount = 0;
-				while (running) {
-					FetchRequestBuilder frb = new FetchRequestBuilder();
-					frb.clientId(clientId);
-					frb.maxWait(maxWait);
-					frb.minBytes(minBytes);
-					
-					for (FetchPartition fp : partitions) {
-						frb.addFetch(fp.topic, fp.partition, fp.nextOffsetToRead, fetchSize);
-					}
-					kafka.api.FetchRequest fetchRequest = frb.build();
-					LOG.debug("Issuing fetch request {}", fetchRequest);
-
-					FetchResponse fetchResponse = consumer.fetch(fetchRequest);
-
-					if (fetchResponse.hasError()) {
-						String exception = "";
-						List<FetchPartition> partitionsToGetOffsetsFor = new ArrayList<>();
-						for (FetchPartition fp : partitions) {
-							short code = fetchResponse.errorCode(fp.topic, fp.partition);
-
-							if (code == ErrorMapping.OffsetOutOfRangeCode()) {
-								// we were asked to read from an out-of-range-offset (maybe set wrong in Zookeeper)
-								// Kafka's high level consumer is resetting the offset according to 'auto.offset.reset'
-								partitionsToGetOffsetsFor.add(fp);
-							} else if (code != ErrorMapping.NoError()) {
-								exception += "\nException for partition " + fp.partition + ": " +
-										StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
-							}
-						}
-						if (partitionsToGetOffsetsFor.size() > 0) {
-							// safeguard against an infinite loop.
-							if (offsetOutOfRangeCount++ > 0) {
-								throw new RuntimeException("Found invalid offsets more than once in partitions "+partitionsToGetOffsetsFor.toString()+" " +
-										"Exceptions: "+exception);
-							}
-							// get valid offsets for these partitions and try again.
-							LOG.warn("The following partitions had an invalid offset: {}", partitionsToGetOffsetsFor);
-							getLastOffset(consumer, partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config));
-							LOG.warn("The new partition offsets are {}", partitionsToGetOffsetsFor);
-							continue; // jump back to create a new fetch request. The offset has not been touched.
-						} else {
-							// all partitions failed on an error
-							throw new IOException("Error while fetching from broker: " + exception);
-						}
-					}
-
-					int messagesInFetch = 0;
-					int deletedMessages = 0;
-					for (FetchPartition fp : partitions) {
-						final ByteBufferMessageSet messageSet = fetchResponse.messageSet(fp.topic, fp.partition);
-						final KafkaTopicPartition topicPartition = new KafkaTopicPartition(fp.topic, fp.partition);
-						
-						for (MessageAndOffset msg : messageSet) {
-							if (running) {
-								messagesInFetch++;
-								if (msg.offset() < fp.nextOffsetToRead) {
-									// we have seen this message already
-									LOG.info("Skipping message with offset " + msg.offset()
-											+ " because we have seen messages until " + fp.nextOffsetToRead
-											+ " from partition " + fp.partition + " already");
-									continue;
-								}
-
-								final long offset = msg.offset();
-
-								ByteBuffer payload = msg.message().payload();
-
-								// If the message value is null, this represents a delete command for the message key.
-								// Log this and pass it on to the client who might want to also receive delete messages.
-								byte[] valueBytes;
-								if (payload == null) {
-									deletedMessages++;
-									valueBytes = null;
-								} else {
-									valueBytes = new byte[payload.remaining()];
-									payload.get(valueBytes);
-								}
-
-								// put key into byte array
-								byte[] keyBytes = null;
-								int keySize = msg.message().keySize();
-
-								if (keySize >= 0) { // message().hasKey() is doing the same. We save one int deserialization
-									ByteBuffer keyPayload = msg.message().key();
-									keyBytes = new byte[keySize];
-									keyPayload.get(keyBytes);
-								}
-
-								final T value = deserializer.deserialize(keyBytes, valueBytes, fp.topic, offset);
-								synchronized (sourceContext.getCheckpointLock()) {
-									sourceContext.collect(value);
-									offsetsState.put(topicPartition, offset);
-								}
-								
-								// advance offset for the next request
-								fp.nextOffsetToRead = offset + 1;
-							}
-							else {
-								// no longer running
-								return;
-							}
-						}
-					}
-					LOG.debug("This fetch contained {} messages ({} deleted messages)", messagesInFetch, deletedMessages);
-				}
-			}
-			catch (Throwable t) {
-				// report to the main thread
-				owner.stopWithError(t);
-			}
-			finally {
-				// end of run loop. close connection to consumer
-				if (consumer != null) {
-					// closing the consumer should not fail the program
-					try {
-						consumer.close();
-					}
-					catch (Throwable t) {
-						LOG.error("Error while closing the Kafka simple consumer", t);
-					}
-				}
-			}
-		}
-
-		/**
-		 * Cancels this fetch thread. The thread will release all resources and terminate.
-		 */
-		public void cancel() {
-			this.running = false;
-			
-			// interrupt whatever the consumer is doing
-			if (consumer != null) {
-				consumer.close();
-			}
-			
-			this.interrupt();
-		}
-
-		/**
-		 * Request latest offsets for a set of partitions, via a Kafka consumer.
-		 *
-		 * @param consumer The consumer connected to lead broker
-		 * @param partitions The list of partitions we need offsets for
-		 * @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest)
-		 */
-		private static void getLastOffset(SimpleConsumer consumer, List<FetchPartition> partitions, long whichTime) {
-
-			Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
-			for (FetchPartition fp: partitions) {
-				TopicAndPartition topicAndPartition = new TopicAndPartition(fp.topic, fp.partition);
-				requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
-			}
-
-			kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
-			OffsetResponse response = consumer.getOffsetsBefore(request);
-
-			if (response.hasError()) {
-				String exception = "";
-				for (FetchPartition fp: partitions) {
-					short code;
-					if ( (code=response.errorCode(fp.topic, fp.partition)) != ErrorMapping.NoError()) {
-						exception += "\nException for partition "+fp.partition+": "+ StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
-					}
-				}
-				throw new RuntimeException("Unable to get last offset for partitions " + partitions + ". " + exception);
-			}
-
-			for (FetchPartition fp: partitions) {
-				// the resulting offset is the next offset we are going to read
-				// for not-yet-consumed partitions, it is 0.
-				fp.nextOffsetToRead = response.offsets(fp.topic, fp.partition)[0];
-			}
-		}
-
-		private static long getInvalidOffsetBehavior(Properties config) {
-			long timeType;
-			if (config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest").equals("latest")) {
-				timeType = OffsetRequest.LatestTime();
-			} else {
-				timeType = OffsetRequest.EarliestTime();
-			}
-			return timeType;
-		}
-	}
-
-
-	private static class PartitionInfoFetcher extends Thread {
-
-		private final List<String> topics;
-		private final Properties properties;
-
-		private volatile List<KafkaTopicPartitionLeader> result;
-		private volatile Throwable error;
-
-
-		PartitionInfoFetcher(List<String> topics, Properties properties) {
-			this.topics = topics;
-			this.properties = properties;
-		}
-
-		@Override
-		public void run() {
-			try {
-				result = FlinkKafkaConsumer.getPartitionsForTopic(topics, properties);
-			}
-			catch (Throwable t) {
-				this.error = t;
-			}
-		}
-
-		public List<KafkaTopicPartitionLeader> getPartitions() throws Exception {
-			try {
-				this.join();
-			}
-			catch (InterruptedException e) {
-				throw new Exception("Partition fetching was cancelled before completion");
-			}
-
-			if (error != null) {
-				throw new Exception("Failed to fetch partitions for topics " + topics.toString(), error);
-			}
-			if (result != null) {
-				return result;
-			}
-			throw new Exception("Partition fetching failed");
-		}
-	}
-
-	private static class KillerWatchDog extends Thread {
-
-		private final Thread toKill;
-		private final long timeout;
-
-		private KillerWatchDog(Thread toKill, long timeout) {
-			super("KillerWatchDog");
-			setDaemon(true);
-
-			this.toKill = toKill;
-			this.timeout = timeout;
-		}
-
-		@SuppressWarnings("deprecation")
-		@Override
-		public void run() {
-			final long deadline = System.currentTimeMillis() + timeout;
-			long now;
-
-			while (toKill.isAlive() && (now = System.currentTimeMillis()) < deadline) {
-				try {
-					toKill.join(deadline - now);
-				}
-				catch (InterruptedException e) {
-					// ignore here, our job is important!
-				}
-			}
-
-			// this is harsh, but this watchdog is a last resort
-			if (toKill.isAlive()) {
-				toKill.stop();
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
deleted file mode 100644
index fdd89c6..0000000
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/OffsetHandler.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * The offset handler is responsible for locating the initial partition offsets 
- * where the source should start reading, as well as committing offsets from completed
- * checkpoints.
- */
-public interface OffsetHandler {
-
-	/**
-	 * Commits the given offset for the partitions. May commit the offsets to the Kafka broker,
-	 * or to ZooKeeper, based on its configured behavior.
-	 *
-	 * @param offsetsToCommit The offset to commit, per partition.
-	 */
-	void commit(Map<KafkaTopicPartition, Long> offsetsToCommit) throws Exception;
-
-	/**
-	 * Positions the given fetcher to the initial read offsets where the stream consumption
-	 * will start from.
-	 * 
-	 * @param partitions The partitions for which to seeks the fetcher to the beginning.
-	 * @param fetcher The fetcher that will pull data from Kafka and must be positioned.
-	 */
-	void seekFetcherToInitialOffsets(List<KafkaTopicPartitionLeader> partitions, Fetcher fetcher) throws Exception;
-
-	/**
-	 * Closes the offset handler, releasing all resources.
-	 * 
-	 * @throws IOException Thrown, if the closing fails.
-	 */
-	void close() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
deleted file mode 100644
index a38c3bd..0000000
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import kafka.producer.Partitioner;
-import kafka.utils.VerifiableProperties;
-
-/**
- * Hacky wrapper to send an object instance through a Properties - map.
- *
- * This works as follows:
- * The recommended way of creating a KafkaSink is specifying a classname for the partitioner.
- *
- * Otherwise (if the user gave a (serializable) class instance), we give Kafka the PartitionerWrapper class of Flink.
- * This is set in the key-value (java.util.Properties) map.
- * In addition to that, we use the Properties.put(Object, Object) to store the instance of the (serializable).
- * This is a hack because the put() method is called on the underlying Hashmap.
- *
- * This PartitionerWrapper is called with the Properties. From there, we extract the wrapped Partitioner instance.
- *
- * The serializable PartitionerWrapper is serialized into the Properties Hashmap and also deserialized from there.
- */
-public class PartitionerWrapper implements Partitioner {
-	public final static String SERIALIZED_WRAPPER_NAME = "flink.kafka.wrapper.serialized";
-
-	private Partitioner wrapped;
-	public PartitionerWrapper(VerifiableProperties properties) {
-		wrapped = (Partitioner) properties.props().get(SERIALIZED_WRAPPER_NAME);
-	}
-
-	@Override
-	public int partition(Object value, int numberOfPartitions) {
-		return wrapped.partition(value, numberOfPartitions);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java
deleted file mode 100644
index 001b6cb..0000000
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-
-import java.nio.charset.Charset;
-
-/**
- * Simple ZooKeeper serializer for Strings.
- */
-public class ZooKeeperStringSerializer implements ZkSerializer {
-
-	private static final Charset CHARSET = Charset.forName("UTF-8");
-	
-	@Override
-	public byte[] serialize(Object data) {
-		if (data instanceof String) {
-			return ((String) data).getBytes(CHARSET);
-		}
-		else {
-			throw new IllegalArgumentException("ZooKeeperStringSerializer can only serialize strings.");
-		}
-	}
-
-	@Override
-	public Object deserialize(byte[] bytes) {
-		if (bytes == null) {
-			return null;
-		}
-		else {
-			return new String(bytes, CHARSET);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
deleted file mode 100644
index f72117d..0000000
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.internals;
-
-import kafka.utils.ZKGroupTopicDirs;
-
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-public class ZookeeperOffsetHandler implements OffsetHandler {
-
-	private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class);
-	
-	private static final long OFFSET_NOT_SET = FlinkKafkaConsumer.OFFSET_NOT_SET;
-
-	private final String groupId;
-
-	private final CuratorFramework curatorClient;
-
-
-	public ZookeeperOffsetHandler(Properties props) {
-		this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
-		if (this.groupId == null) {
-			throw new IllegalArgumentException("Required property '"
-					+ ConsumerConfig.GROUP_ID_CONFIG + "' has not been set");
-		}
-		
-		String zkConnect = props.getProperty("zookeeper.connect");
-		if (zkConnect == null) {
-			throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
-		}
-
-		// we use Curator's default timeouts
-		int sessionTimeoutMs =  Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "60000"));
-		int connectionTimeoutMs = Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "15000"));
-		
-		// undocumented config options allowing users to configure the retry policy. (they are "flink." prefixed as they are no official kafka configs)
-		int backoffBaseSleepTime = Integer.valueOf(props.getProperty("flink.zookeeper.base-sleep-time.ms", "100"));
-		int backoffMaxRetries =  Integer.valueOf(props.getProperty("flink.zookeeper.max-retries", "10"));
-		
-		RetryPolicy retryPolicy = new ExponentialBackoffRetry(backoffBaseSleepTime, backoffMaxRetries);
-		curatorClient = CuratorFrameworkFactory.newClient(zkConnect, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
-		curatorClient.start();
-	}
-
-
-	@Override
-	public void commit(Map<KafkaTopicPartition, Long> offsetsToCommit) throws Exception {
-		for (Map.Entry<KafkaTopicPartition, Long> entry : offsetsToCommit.entrySet()) {
-			KafkaTopicPartition tp = entry.getKey();
-			long offset = entry.getValue();
-			
-			if (offset >= 0) {
-				setOffsetInZooKeeper(curatorClient, groupId, tp.getTopic(), tp.getPartition(), offset);
-			}
-		}
-	}
-
-	@Override
-	public void seekFetcherToInitialOffsets(List<KafkaTopicPartitionLeader> partitions, Fetcher fetcher) throws Exception {
-		for (KafkaTopicPartitionLeader tp : partitions) {
-			long offset = getOffsetFromZooKeeper(curatorClient, groupId, tp.getTopicPartition().getTopic(), tp.getTopicPartition().getPartition());
-
-			if (offset != OFFSET_NOT_SET) {
-				LOG.info("Offset for partition {} was set to {} in ZooKeeper. Seeking fetcher to that position.",
-						tp.getTopicPartition().getPartition(), offset);
-
-				// the offset in Zookeeper was the last read offset, seek is accepting the next-to-read-offset.
-				fetcher.seek(tp.getTopicPartition(), offset + 1);
-			}
-		}
-	}
-
-	@Override
-	public void close() throws IOException {
-		curatorClient.close();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Communication with Zookeeper
-	// ------------------------------------------------------------------------
-	
-	public static void setOffsetInZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition, long offset) throws Exception {
-		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
-		String path = topicDirs.consumerOffsetDir() + "/" + partition;
-		curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
-		byte[] data = Long.toString(offset).getBytes();
-		curatorClient.setData().forPath(path, data);
-	}
-
-	public static long getOffsetFromZooKeeper(CuratorFramework curatorClient, String groupId, String topic, int partition) throws Exception {
-		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topic);
-		String path = topicDirs.consumerOffsetDir() + "/" + partition;
-		curatorClient.newNamespaceAwareEnsurePath(path).ensure(curatorClient.getZookeeperClient());
-		
-		byte[] data = curatorClient.getData().forPath(path);
-		
-		if (data == null) {
-			return OFFSET_NOT_SET;
-		} else {
-			String asString = new String(data);
-			if (asString.length() == 0) {
-				return OFFSET_NOT_SET;
-			} else {
-				try {
-					return Long.parseLong(asString);
-				} catch (NumberFormatException e) {
-					throw new Exception(String.format(
-						"The offset in ZooKeeper for group '%s', topic '%s', partition %d is a malformed string: %s",
-						groupId, topic, partition, asString));
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
deleted file mode 100644
index 61735e9..0000000
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.partitioner;
-
-import java.io.Serializable;
-
-/**
- * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition.
- *
- * Note, one Kafka partition can contain multiple Flink partitions.
- *
- * Cases:
- * 	# More Flink partitions than kafka partitions
- * <pre>
- * 		Flink Sinks:		Kafka Partitions
- * 			1	----------------&gt;	1
- * 			2   --------------/
- * 			3   -------------/
- * 			4	------------/
- * </pre>
- * Some (or all) kafka partitions contain the output of more than one flink partition
- *
- *# Fewer Flink partitions than Kafka
- * <pre>
- * 		Flink Sinks:		Kafka Partitions
- * 			1	----------------&gt;	1
- * 			2	----------------&gt;	2
- * 										3
- * 										4
- * 										5
- * </pre>
- *
- *  Not all Kafka partitions contain data
- *  To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will
- *  cause a lot of network connections between all the Flink instances and all the Kafka brokers
- *
- *
- */
-public class FixedPartitioner extends KafkaPartitioner implements Serializable {
-	private static final long serialVersionUID = 1627268846962918126L;
-
-	int targetPartition = -1;
-
-	@Override
-	public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
-		int p = 0;
-		for (int i = 0; i < parallelInstances; i++) {
-			if (i == parallelInstanceId) {
-				targetPartition = partitions[p];
-				return;
-			}
-			if (++p == partitions.length) {
-				p = 0;
-			}
-		}
-	}
-
-	@Override
-	public int partition(Object element, int numPartitions) {
-		if (targetPartition == -1) {
-			throw new RuntimeException("The partitioner has not been initialized properly");
-		}
-		return targetPartition;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
deleted file mode 100644
index 55519f0..0000000
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.partitioner;
-
-
-import kafka.producer.Partitioner;
-
-import java.io.Serializable;
-
-/**
- * Extended Kafka Partitioner.
- * It contains a open() method which is called on each parallel instance.
- * Partitioners have to be serializable!
- */
-public abstract class KafkaPartitioner implements Partitioner, Serializable {
-
-	private static final long serialVersionUID = -1974260817778593473L;
-
-	/**
-	 * Initializer for the Partitioner.
-	 * @param parallelInstanceId 0-indexed id of the parallel instance in Flink
-	 * @param parallelInstances the total number of parallel instances
-	 * @param partitions an array describing the partition IDs of the available Kafka partitions.
-	 */
-	public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
-		// overwrite this method if needed.
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
deleted file mode 100644
index c4b026b..0000000
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
-import org.apache.kafka.common.Node;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static org.junit.Assert.*;
-
-
-/**
- * Tests that the partition assignment is deterministic and stable.
- */
-public class KafkaConsumerPartitionAssignmentTest {
-
-	private final Node fake = new Node(1337, "localhost", 1337);
-
-	@Test
-	public void testPartitionsEqualConsumers() {
-		try {
-			List<KafkaTopicPartitionLeader> inPartitions = new ArrayList<>();
-			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 4), fake));
-			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 52), fake));
-			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 17), fake));
-			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 1), fake));
-
-			for (int i = 0; i < inPartitions.size(); i++) {
-				List<KafkaTopicPartitionLeader> parts = FlinkKafkaConsumer.assignPartitions(
-						inPartitions, inPartitions.size(), i);
-
-				assertNotNull(parts);
-				assertEquals(1, parts.size());
-				assertTrue(contains(inPartitions, parts.get(0).getTopicPartition().getPartition()));
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	private boolean contains(List<KafkaTopicPartitionLeader> inPartitions, int partition) {
-		for (KafkaTopicPartitionLeader ktp: inPartitions) {
-			if (ktp.getTopicPartition().getPartition() == partition) {
-				return true;
-			}
-		}
-		return false;
-	}
-
-	@Test
-	public void testMultiplePartitionsPerConsumers() {
-		try {
-			final int[] partitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
-
-			final List<KafkaTopicPartitionLeader> partitions = new ArrayList<>();
-			final Set<KafkaTopicPartitionLeader> allPartitions = new HashSet<>();
-
-			for (int p : partitionIDs) {
-				KafkaTopicPartitionLeader part = new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", p), fake);
-				partitions.add(part);
-				allPartitions.add(part);
-			}
-
-			final int numConsumers = 3;
-			final int minPartitionsPerConsumer = partitions.size() / numConsumers;
-			final int maxPartitionsPerConsumer = partitions.size() / numConsumers + 1;
-
-			for (int i = 0; i < numConsumers; i++) {
-				List<KafkaTopicPartitionLeader> parts = FlinkKafkaConsumer.assignPartitions(partitions, numConsumers, i);
-
-				assertNotNull(parts);
-				assertTrue(parts.size() >= minPartitionsPerConsumer);
-				assertTrue(parts.size() <= maxPartitionsPerConsumer);
-
-				for (KafkaTopicPartitionLeader p : parts) {
-					// check that the element was actually contained
-					assertTrue(allPartitions.remove(p));
-				}
-			}
-
-			// all partitions must have been assigned
-			assertTrue(allPartitions.isEmpty());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testPartitionsFewerThanConsumers() {
-		try {
-			List<KafkaTopicPartitionLeader> inPartitions = new ArrayList<>();
-			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 4), fake));
-			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 52), fake));
-			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 17), fake));
-			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 1), fake));
-
-			final Set<KafkaTopicPartitionLeader> allPartitions = new HashSet<>();
-			allPartitions.addAll(inPartitions);
-
-			final int numConsumers = 2 * inPartitions.size() + 3;
-
-			for (int i = 0; i < numConsumers; i++) {
-				List<KafkaTopicPartitionLeader> parts = FlinkKafkaConsumer.assignPartitions(inPartitions, numConsumers, i);
-
-				assertNotNull(parts);
-				assertTrue(parts.size() <= 1);
-
-				for (KafkaTopicPartitionLeader p : parts) {
-					// check that the element was actually contained
-					assertTrue(allPartitions.remove(p));
-				}
-			}
-
-			// all partitions must have been assigned
-			assertTrue(allPartitions.isEmpty());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testAssignEmptyPartitions() {
-		try {
-			List<KafkaTopicPartitionLeader> ep = new ArrayList<>();
-			List<KafkaTopicPartitionLeader> parts1 = FlinkKafkaConsumer.assignPartitions(ep, 4, 2);
-			assertNotNull(parts1);
-			assertTrue(parts1.isEmpty());
-
-			List<KafkaTopicPartitionLeader> parts2 = FlinkKafkaConsumer.assignPartitions(ep, 1, 0);
-			assertNotNull(parts2);
-			assertTrue(parts2.isEmpty());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testGrowingPartitionsRemainsStable() {
-		try {
-			final int[] newPartitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
-			List<KafkaTopicPartitionLeader> newPartitions = new ArrayList<>();
-
-			for (int p : newPartitionIDs) {
-				KafkaTopicPartitionLeader part = new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", p), fake);
-				newPartitions.add(part);
-			}
-
-			List<KafkaTopicPartitionLeader> initialPartitions = newPartitions.subList(0, 7);
-
-			final Set<KafkaTopicPartitionLeader> allNewPartitions = new HashSet<>(newPartitions);
-			final Set<KafkaTopicPartitionLeader> allInitialPartitions = new HashSet<>(initialPartitions);
-
-			final int numConsumers = 3;
-			final int minInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers;
-			final int maxInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers + 1;
-			final int minNewPartitionsPerConsumer = newPartitions.size() / numConsumers;
-			final int maxNewPartitionsPerConsumer = newPartitions.size() / numConsumers + 1;
-
-			List<KafkaTopicPartitionLeader> parts1 = FlinkKafkaConsumer.assignPartitions(
-					initialPartitions, numConsumers, 0);
-			List<KafkaTopicPartitionLeader> parts2 = FlinkKafkaConsumer.assignPartitions(
-					initialPartitions, numConsumers, 1);
-			List<KafkaTopicPartitionLeader> parts3 = FlinkKafkaConsumer.assignPartitions(
-					initialPartitions, numConsumers, 2);
-
-			assertNotNull(parts1);
-			assertNotNull(parts2);
-			assertNotNull(parts3);
-
-			assertTrue(parts1.size() >= minInitialPartitionsPerConsumer);
-			assertTrue(parts1.size() <= maxInitialPartitionsPerConsumer);
-			assertTrue(parts2.size() >= minInitialPartitionsPerConsumer);
-			assertTrue(parts2.size() <= maxInitialPartitionsPerConsumer);
-			assertTrue(parts3.size() >= minInitialPartitionsPerConsumer);
-			assertTrue(parts3.size() <= maxInitialPartitionsPerConsumer);
-
-			for (KafkaTopicPartitionLeader p : parts1) {
-				// check that the element was actually contained
-				assertTrue(allInitialPartitions.remove(p));
-			}
-			for (KafkaTopicPartitionLeader p : parts2) {
-				// check that the element was actually contained
-				assertTrue(allInitialPartitions.remove(p));
-			}
-			for (KafkaTopicPartitionLeader p : parts3) {
-				// check that the element was actually contained
-				assertTrue(allInitialPartitions.remove(p));
-			}
-
-			// all partitions must have been assigned
-			assertTrue(allInitialPartitions.isEmpty());
-
-			// grow the set of partitions and distribute anew
-
-			List<KafkaTopicPartitionLeader> parts1new = FlinkKafkaConsumer.assignPartitions(
-					newPartitions, numConsumers, 0);
-			List<KafkaTopicPartitionLeader> parts2new = FlinkKafkaConsumer.assignPartitions(
-					newPartitions, numConsumers, 1);
-			List<KafkaTopicPartitionLeader> parts3new = FlinkKafkaConsumer.assignPartitions(
-					newPartitions, numConsumers, 2);
-
-			// new partitions must include all old partitions
-
-			assertTrue(parts1new.size() > parts1.size());
-			assertTrue(parts2new.size() > parts2.size());
-			assertTrue(parts3new.size() > parts3.size());
-
-			assertTrue(parts1new.containsAll(parts1));
-			assertTrue(parts2new.containsAll(parts2));
-			assertTrue(parts3new.containsAll(parts3));
-
-			assertTrue(parts1new.size() >= minNewPartitionsPerConsumer);
-			assertTrue(parts1new.size() <= maxNewPartitionsPerConsumer);
-			assertTrue(parts2new.size() >= minNewPartitionsPerConsumer);
-			assertTrue(parts2new.size() <= maxNewPartitionsPerConsumer);
-			assertTrue(parts3new.size() >= minNewPartitionsPerConsumer);
-			assertTrue(parts3new.size() <= maxNewPartitionsPerConsumer);
-
-			for (KafkaTopicPartitionLeader p : parts1new) {
-				// check that the element was actually contained
-				assertTrue(allNewPartitions.remove(p));
-			}
-			for (KafkaTopicPartitionLeader p : parts2new) {
-				// check that the element was actually contained
-				assertTrue(allNewPartitions.remove(p));
-			}
-			for (KafkaTopicPartitionLeader p : parts3new) {
-				// check that the element was actually contained
-				assertTrue(allNewPartitions.remove(p));
-			}
-
-			// all partitions must have been assigned
-			assertTrue(allNewPartitions.isEmpty());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
deleted file mode 100644
index efae922..0000000
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.commons.collections.map.LinkedMap;
-
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-public class KafkaConsumerTest {
-
-	@Test
-	public void testValidateZooKeeperConfig() {
-		try {
-			// empty
-			Properties emptyProperties = new Properties();
-			try {
-				FlinkKafkaConsumer.validateZooKeeperConfig(emptyProperties);
-				fail("should fail with an exception");
-			}
-			catch (IllegalArgumentException e) {
-				// expected
-			}
-
-			// no connect string (only group string)
-			Properties noConnect = new Properties();
-			noConnect.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-test-group");
-			try {
-				FlinkKafkaConsumer.validateZooKeeperConfig(noConnect);
-				fail("should fail with an exception");
-			}
-			catch (IllegalArgumentException e) {
-				// expected
-			}
-
-			// no group string (only connect string)
-			Properties noGroup = new Properties();
-			noGroup.put("zookeeper.connect", "localhost:47574");
-			try {
-				FlinkKafkaConsumer.validateZooKeeperConfig(noGroup);
-				fail("should fail with an exception");
-			}
-			catch (IllegalArgumentException e) {
-				// expected
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testSnapshot() {
-		try {
-			Field offsetsField = FlinkKafkaConsumer.class.getDeclaredField("lastOffsets");
-			Field runningField = FlinkKafkaConsumer.class.getDeclaredField("running");
-			Field mapField = FlinkKafkaConsumer.class.getDeclaredField("pendingCheckpoints");
-
-			offsetsField.setAccessible(true);
-			runningField.setAccessible(true);
-			mapField.setAccessible(true);
-
-			FlinkKafkaConsumer<?> consumer = mock(FlinkKafkaConsumer.class);
-			when(consumer.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
-
-
-			HashMap<KafkaTopicPartition, Long> testOffsets = new HashMap<>();
-			long[] offsets = new long[] { 43, 6146, 133, 16, 162, 616 };
-			int j = 0;
-			for (long i: offsets) {
-				KafkaTopicPartition ktp = new KafkaTopicPartition("topic", j++);
-				testOffsets.put(ktp, i);
-			}
-
-			LinkedMap map = new LinkedMap();
-
-			offsetsField.set(consumer, testOffsets);
-			runningField.set(consumer, true);
-			mapField.set(consumer, map);
-
-			assertTrue(map.isEmpty());
-
-			// make multiple checkpoints
-			for (long checkpointId = 10L; checkpointId <= 2000L; checkpointId += 9L) {
-				HashMap<KafkaTopicPartition, Long> checkpoint = consumer.snapshotState(checkpointId, 47 * checkpointId);
-				assertEquals(testOffsets, checkpoint);
-
-				// change the offsets, make sure the snapshot did not change
-				HashMap<KafkaTopicPartition, Long> checkpointCopy = (HashMap<KafkaTopicPartition, Long>) checkpoint.clone();
-
-				for (Map.Entry<KafkaTopicPartition, Long> e: testOffsets.entrySet()) {
-					testOffsets.put(e.getKey(), e.getValue() + 1);
-				}
-
-				assertEquals(checkpointCopy, checkpoint);
-
-				assertTrue(map.size() > 0);
-				assertTrue(map.size() <= FlinkKafkaConsumer.MAX_NUM_PENDING_CHECKPOINTS);
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	@Ignore("Kafka consumer internally makes an infinite loop")
-	public void testCreateSourceWithoutCluster() {
-		try {
-			Properties props = new Properties();
-			props.setProperty("zookeeper.connect", "localhost:56794");
-			props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222");
-			props.setProperty("group.id", "non-existent-group");
-
-			new FlinkKafkaConsumer<>(Collections.singletonList("no op topic"), new SimpleStringSchema(), props,
-					FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER,
-					FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}


Mime
View raw message