flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [37/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Remove copied Kafka code again. Implemented our own topic metadata retrieval.
Date Thu, 27 Aug 2015 11:25:54 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/FlinkKafkaConsumer082.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/FlinkKafkaConsumer082.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/FlinkKafkaConsumer082.java
deleted file mode 100644
index 1540f8f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/FlinkKafkaConsumer082.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors;
-
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import java.util.Properties;
-
-/**
- * Creates a Kafka consumer compatible with reading from Kafka 0.8.2.x brokers.
- * The consumer will use the new Kafka consumer API (early Flink backport version),
- * and manually commit offsets partition offsets to ZooKeeper.
- *
- * @param <T> The type of elements produced by this consumer.
- */
-public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer<T> {
-
-	private static final long serialVersionUID = -8450689820627198228L;
-
-	/**
-	 * Creates a new Kafka 0.8.2.x streaming source consumer.
-	 * 
-	 * @param topic
-	 *           The name of the topic that should be consumed.
-	 * @param valueDeserializer
-	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects. 
-	 * @param props
-	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
-	 */
-	public FlinkKafkaConsumer082(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
-		super(topic, valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.NEW_HIGH_LEVEL);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/FlinkKafkaConsumer083.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/FlinkKafkaConsumer083.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/FlinkKafkaConsumer083.java
deleted file mode 100644
index 6ed2930..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/FlinkKafkaConsumer083.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors;
-
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import java.util.Properties;
-
-/**
- * Creates a Kafka consumer compatible with reading from Kafka 0.8.3.x brokers.
- * The consumer will use the new Kafka consumer API (early Flink backport version),
- * and lets Kafka handle the offset committing internally.
- * 
- * @param <T> The type of elements produced by this consumer.
- */
-public class FlinkKafkaConsumer083<T> extends FlinkKafkaConsumer<T> {
-
-	private static final long serialVersionUID = 1126432820518992927L;
-
-	/**
-	 * Creates a new Kafka 0.8.3.x streaming source consumer.
-	 *
-	 * @param topic
-	 *           The name of the topic that should be consumed.
-	 * @param valueDeserializer
-	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects. 
-	 * @param props
-	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
-	 */
-	public FlinkKafkaConsumer083(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
-		super(topic, valueDeserializer, props, OffsetStore.KAFKA, FetcherType.NEW_HIGH_LEVEL);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/KafkaSink.java
deleted file mode 100644
index 51129ed..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/KafkaSink.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors;
-
-import com.google.common.base.Preconditions;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import kafka.serializer.DefaultEncoder;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.util.NetUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.Properties;
-
-
-/**
- * Sink that emits its inputs to a Kafka topic.
- *
- * @param <IN>
- * 		Type of the sink input
- */
-public class KafkaSink<IN> extends RichSinkFunction<IN> {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
-
-	private Producer<IN, byte[]> producer;
-	private Properties userDefinedProperties;
-	private String topicId;
-	private String brokerList;
-	private SerializationSchema<IN, byte[]> schema;
-	private SerializableKafkaPartitioner partitioner;
-	private Class<? extends SerializableKafkaPartitioner> partitionerClass = null;
-
-	/**
-	 * Creates a KafkaSink for a given topic. The sink produces its input to
-	 * the topic.
-	 *
-	 * @param brokerList
-	 *			Addresses of the brokers
-	 * @param topicId
-	 * 		ID of the Kafka topic.
-	 * @param serializationSchema
-	 * 		User defined serialization schema.
-	 */
-	public KafkaSink(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema) {
-		this(brokerList, topicId, new Properties(), serializationSchema);
-	}
-
-	/**
-	 * Creates a KafkaSink for a given topic with custom Producer configuration.
-	 * If you use this constructor, the broker should be set with the "metadata.broker.list"
-	 * configuration.
-	 *
-	 * @param brokerList
-	 * 		Addresses of the brokers
-	 * @param topicId
-	 * 		ID of the Kafka topic.
-	 * @param producerConfig
-	 * 		Configurations of the Kafka producer
-	 * @param serializationSchema
-	 * 		User defined serialization schema.
-	 */
-	public KafkaSink(String brokerList, String topicId, Properties producerConfig, SerializationSchema<IN, byte[]> serializationSchema) {
-		String[] elements = brokerList.split(",");
-		for(String broker: elements) {
-			NetUtils.ensureCorrectHostnamePort(broker);
-		}
-		Preconditions.checkNotNull(topicId, "TopicID not set");
-
-		this.brokerList = brokerList;
-		this.topicId = topicId;
-		this.schema = serializationSchema;
-		this.partitionerClass = null;
-		this.userDefinedProperties = producerConfig;
-	}
-
-	/**
-	 * Creates a KafkaSink for a given topic. The sink produces its input to
-	 * the topic.
-	 *
-	 * @param brokerList
-	 * @param topicId
-	 * 		ID of the Kafka topic.
-	 * @param serializationSchema
-	 * 		User defined serialization schema.
-	 * @param partitioner
-	 * 		User defined partitioner.
-	 */
-	public KafkaSink(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema, SerializableKafkaPartitioner partitioner) {
-		this(brokerList, topicId, serializationSchema);
-		ClosureCleaner.ensureSerializable(partitioner);
-		this.partitioner = partitioner;
-	}
-
-	public KafkaSink(String brokerList,
-					String topicId,
-					SerializationSchema<IN, byte[]> serializationSchema,
-					Class<? extends SerializableKafkaPartitioner> partitioner) {
-		this(brokerList, topicId, serializationSchema);
-		this.partitionerClass = partitioner;
-	}
-
-	/**
-	 * Initializes the connection to Kafka.
-	 */
-	@Override
-	public void open(Configuration configuration) {
-
-		Properties properties = new Properties();
-
-		properties.put("metadata.broker.list", brokerList);
-		properties.put("request.required.acks", "-1");
-		properties.put("message.send.max.retries", "10");
-
-		properties.put("serializer.class", DefaultEncoder.class.getCanonicalName());
-
-		// this will not be used as the key will not be serialized
-		properties.put("key.serializer.class", DefaultEncoder.class.getCanonicalName());
-
-		for (Map.Entry<Object, Object> propertiesEntry : userDefinedProperties.entrySet()) {
-			properties.put(propertiesEntry.getKey(), propertiesEntry.getValue());
-		}
-
-		if (partitioner != null) {
-			properties.put("partitioner.class", PartitionerWrapper.class.getCanonicalName());
-			// java serialization will do the rest.
-			properties.put(PartitionerWrapper.SERIALIZED_WRAPPER_NAME, partitioner);
-		}
-		if (partitionerClass != null) {
-			properties.put("partitioner.class", partitionerClass);
-		}
-
-		ProducerConfig config = new ProducerConfig(properties);
-
-		try {
-			producer = new Producer<IN, byte[]>(config);
-		} catch (NullPointerException e) {
-			throw new RuntimeException("Cannot connect to Kafka broker " + brokerList, e);
-		}
-	}
-
-	/**
-	 * Called when new data arrives to the sink, and forwards it to Kafka.
-	 *
-	 * @param next
-	 * 		The incoming data
-	 */
-	@Override
-	public void invoke(IN next) {
-		byte[] serialized = schema.serialize(next);
-
-		// Sending message without serializable key.
-		producer.send(new KeyedMessage<IN, byte[]>(topicId, null, next, serialized));
-	}
-
-	@Override
-	public void close() {
-		if (producer != null) {
-			producer.close();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/PartitionerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/PartitionerWrapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/PartitionerWrapper.java
deleted file mode 100644
index 3a83e18..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/PartitionerWrapper.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors;
-
-import kafka.producer.Partitioner;
-import kafka.utils.VerifiableProperties;
-
-/**
- * Hacky wrapper to send an object instance through a Properties - map.
- *
- * This works as follows:
- * The recommended way of creating a KafkaSink is specifying a classname for the partitioner.
- *
- * Otherwise (if the user gave a (serializable) class instance), we give Kafka the PartitionerWrapper class of Flink.
- * This is set in the key-value (java.util.Properties) map.
- * In addition to that, we use the Properties.put(Object, Object) to store the instance of the (serializable).
- * This is a hack because the put() method is called on the underlying Hashmap.
- *
- * This PartitionerWrapper is called with the Properties. From there, we extract the wrapped Partitioner instance.
- *
- * The serializable PartitionerWrapper is serialized into the Properties Hashmap and also deserialized from there.
- */
-public class PartitionerWrapper implements Partitioner {
-	public final static String SERIALIZED_WRAPPER_NAME = "flink.kafka.wrapper.serialized";
-
-	private Partitioner wrapped;
-	public PartitionerWrapper(VerifiableProperties properties) {
-		wrapped = (Partitioner) properties.props().get(SERIALIZED_WRAPPER_NAME);
-	}
-
-	@Override
-	public int partition(Object value, int numberOfPartitions) {
-		return wrapped.partition(value, numberOfPartitions);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/SerializableKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/SerializableKafkaPartitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/SerializableKafkaPartitioner.java
deleted file mode 100644
index aff0c3a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/SerializableKafkaPartitioner.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors;
-
-import kafka.producer.Partitioner;
-
-import java.io.Serializable;
-
-public interface SerializableKafkaPartitioner extends Serializable, Partitioner {
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/Fetcher.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/Fetcher.java
deleted file mode 100644
index 25a3fea..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/Fetcher.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.internals;
-
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.kafka_backport.common.TopicPartition;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * A fetcher pulls data from Kafka, from a fix set of partitions.
- * The fetcher supports "seeking" inside the partitions, i.e., moving to a different offset.
- */
-public interface Fetcher {
-
-	/**
-	 * Set which partitions the fetcher should pull from.
-	 * 
-	 * @param partitions The list of partitions for a topic that the fetcher will pull from.
-	 */
-	void setPartitionsToRead(List<TopicPartition> partitions);
-
-	/**
-	 * Closes the fetcher. This will stop any operation in the
-	 * {@link #run(SourceFunction.SourceContext, DeserializationSchema, long[])} method and eventually
-	 * close underlying connections and release all resources.
-	 */
-	void close() throws IOException;
-
-	/**
-	 * Starts fetch data from Kafka and emitting it into the stream.
-	 * 
-	 * <p>To provide exactly once guarantees, the fetcher needs emit a record and update the update
-	 * of the last consumed offset in one atomic operation:</p>
-	 * <pre>{@code
-	 * 
-	 * while (running) {
-	 *     T next = ...
-	 *     long offset = ...
-	 *     int partition = ...
-	 *     synchronized (sourceContext.getCheckpointLock()) {
-	 *         sourceContext.collect(next);
-	 *         lastOffsets[partition] = offset;
-	 *     }
-	 * }
-	 * }</pre>
-	 * 
-	 * @param sourceContext The source context to emit elements to.
-	 * @param valueDeserializer The deserializer to decode the raw values with.
-	 * @param lastOffsets The array into which to store the offsets foe which elements are emitted. 
-	 * 
-	 * @param <T> The type of elements produced by the fetcher and emitted to the source context.
-	 */
-	<T> void run(SourceFunction.SourceContext<T> sourceContext, DeserializationSchema<T> valueDeserializer, 
-					long[] lastOffsets) throws Exception;
-	
-	/**
-	 * Set the next offset to read from for the given partition.
-	 * For example, if the partition <i>i</i> offset is set to <i>n</i>, the Fetcher's next result
-	 * will be the message with <i>offset=n</i>.
-	 * 
-	 * @param topicPartition The partition for which to seek the offset.
-	 * @param offsetToRead To offset to seek to.
-	 */
-	void seek(TopicPartition topicPartition, long offsetToRead);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/LegacyFetcher.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/LegacyFetcher.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/LegacyFetcher.java
deleted file mode 100644
index 9638b84..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/LegacyFetcher.java
+++ /dev/null
@@ -1,602 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.internals;
-
-import kafka.api.FetchRequestBuilder;
-import kafka.api.OffsetRequest;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.common.ErrorMapping;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.OffsetResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.MessageAndOffset;
-
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.FlinkKafkaConsumer;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.util.StringUtils;
-import org.apache.flink.kafka_backport.common.Node;
-import org.apache.flink.kafka_backport.common.PartitionInfo;
-import org.apache.flink.kafka_backport.common.TopicPartition;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * This fetcher uses Kafka's low-level API to pull data from a specific
- * set of partitions and offsets for a certain topic.
- * 
- * <p>This code is in parts based on the tutorial code for the low-level Kafka consumer.</p>
- */
-public class LegacyFetcher implements Fetcher {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer.class);
-
-	/** The topic from which this fetcher pulls data */
-	private final String topic;
-	
-	/** The properties that configure the Kafka connection */
-	private final Properties config;
-	
-	/** The task name, to give more readable names to the spawned threads */
-	private final String taskName;
-	
-	/** The first error that occurred in a connection thread */
-	private final AtomicReference<Throwable> error;
-
-	/** The partitions that the fetcher should read, with their starting offsets */
-	private Map<TopicPartition, Long> partitionsToRead;
-	
-	/** Reference the the thread that executed the run() method. */
-	private volatile Thread mainThread;
-	
-	/** Flag to shot the fetcher down */
-	private volatile boolean running = true;
-
-
-	public LegacyFetcher(String topic, Properties props) {
-		this(topic, props, "");
-	}
-	
-	public LegacyFetcher(String topic, Properties props, String taskName) {
-		this.config = checkNotNull(props, "The config properties cannot be null");
-		this.topic = checkNotNull(topic, "The topic cannot be null");
-		this.taskName = taskName;
-		this.error = new AtomicReference<>();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Fetcher methods
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public void setPartitionsToRead(List<TopicPartition> partitions) {
-		partitionsToRead = new HashMap<TopicPartition, Long>(partitions.size());
-		for (TopicPartition tp: partitions) {
-			partitionsToRead.put(tp, FlinkKafkaConsumer.OFFSET_NOT_SET);
-		}
-	}
-
-	@Override
-	public void seek(TopicPartition topicPartition, long offsetToRead) {
-		if (partitionsToRead == null) {
-			throw new IllegalArgumentException("No partitions to read set");
-		}
-		if (!partitionsToRead.containsKey(topicPartition)) {
-			throw new IllegalArgumentException("Can not set offset on a partition (" + topicPartition
-					+ ") we are not going to read. Partitions to read " + partitionsToRead);
-		}
-		partitionsToRead.put(topicPartition, offsetToRead);
-	}
-	
-	@Override
-	public void close() {
-		// flag needs to be check by the run() method that creates the spawned threads
-		this.running = false;
-		
-		// all other cleanup is made by the run method itself
-	}
-
-	@Override
-	public <T> void run(SourceFunction.SourceContext<T> sourceContext, 
-						DeserializationSchema<T> valueDeserializer,
-						long[] lastOffsets) throws Exception {
-		
-		if (partitionsToRead == null || partitionsToRead.size() == 0) {
-			throw new IllegalArgumentException("No partitions set");
-		}
-		
-		// NOTE: This method is needs to always release all resources it acquires
-		
-		this.mainThread = Thread.currentThread();
-
-		LOG.info("Reading from partitions " + partitionsToRead + " using the legacy fetcher");
-		
-		// get lead broker for each partition
-		
-		// NOTE: The kafka client apparently locks itself in an infinite loop sometimes
-		// when it is interrupted, so we run it only in a separate thread.
-		// since it sometimes refuses to shut down, we resort to the admittedly harsh
-		// means of killing the thread after a timeout.
-		PartitionInfoFetcher infoFetcher = new PartitionInfoFetcher(topic, config);
-		infoFetcher.start();
-		
-		KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 60000);
-		watchDog.start();
-		
-		final List<PartitionInfo> allPartitionsInTopic = infoFetcher.getPartitions();
-		
-		// brokers to fetch partitions from.
-		int fetchPartitionsCount = 0;
-		Map<Node, List<FetchPartition>> fetchBrokers = new HashMap<Node, List<FetchPartition>>();
-		
-		for (PartitionInfo partitionInfo : allPartitionsInTopic) {
-			if (partitionInfo.leader() == null) {
-				throw new RuntimeException("Unable to consume partition " + partitionInfo.partition()
-						+ " from topic "+partitionInfo.topic()+" because it does not have a leader");
-			}
-			
-			for (Map.Entry<TopicPartition, Long> entry : partitionsToRead.entrySet()) {
-				final TopicPartition topicPartition = entry.getKey();
-				final long offset = entry.getValue();
-				
-				// check if that partition is for us
-				if (topicPartition.partition() == partitionInfo.partition()) {
-					List<FetchPartition> partitions = fetchBrokers.get(partitionInfo.leader());
-					if (partitions == null) {
-						partitions = new ArrayList<FetchPartition>();
-						fetchBrokers.put(partitionInfo.leader(), partitions);
-					}
-					
-					partitions.add(new FetchPartition(topicPartition.partition(), offset));
-					fetchPartitionsCount++;
-					
-				}
-				// else this partition is not for us
-			}
-		}
-		
-		if (partitionsToRead.size() != fetchPartitionsCount) {
-			throw new RuntimeException(partitionsToRead.size() + " partitions to read, but got only "
-					+ fetchPartitionsCount + " partition infos with lead brokers.");
-		}
-
-		// create SimpleConsumers for each broker
-		ArrayList<SimpleConsumerThread<?>> consumers = new ArrayList<>(fetchBrokers.size());
-		
-		for (Map.Entry<Node, List<FetchPartition>> brokerInfo : fetchBrokers.entrySet()) {
-			final Node broker = brokerInfo.getKey();
-			final List<FetchPartition> partitionsList = brokerInfo.getValue();
-			
-			FetchPartition[] partitions = partitionsList.toArray(new FetchPartition[partitionsList.size()]);
-
-			SimpleConsumerThread<T> thread = new SimpleConsumerThread<T>(this, config, topic,
-					broker, partitions, sourceContext, valueDeserializer, lastOffsets);
-
-			thread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)",
-					taskName, broker.idString(), broker.host(), broker.port()));
-			thread.setDaemon(true);
-			consumers.add(thread);
-		}
-		
-		// last check whether we should abort.
-		if (!running) {
-			return;
-		}
-		
-		// start all consumer threads
-		for (SimpleConsumerThread<?> t : consumers) {
-			LOG.info("Starting thread {}", t.getName());
-			t.start();
-		}
-		
-		// wait until all consumer threads are done, or until we are aborted, or until
-		// an error occurred in one of the fetcher threads
-		try {
-			boolean someConsumersRunning = true;
-			while (running && error.get() == null && someConsumersRunning) {
-				try {
-					// wait for the consumer threads. if an error occurs, we are interrupted
-					for (SimpleConsumerThread<?> t : consumers) {
-						t.join();
-					}
-	
-					// safety net
-					someConsumersRunning = false;
-					for (SimpleConsumerThread<?> t : consumers) {
-						someConsumersRunning |= t.isAlive();
-					}
-				}
-				catch (InterruptedException e) {
-					// ignore. we should notice what happened in the next loop check
-				}
-			}
-			
-			// make sure any asynchronous error is noticed
-			Throwable error = this.error.get();
-			if (error != null) {
-				throw new Exception(error.getMessage(), error);
-			}
-		}
-		finally {
-			// make sure that in any case (completion, abort, error), all spawned threads are stopped
-			for (SimpleConsumerThread<?> t : consumers) {
-				if (t.isAlive()) {
-					t.cancel();
-				}
-			}
-		}
-	}
-	
-	/**
-	 * Reports an error from a fetch thread. This will cause the main thread to see this error,
-	 * abort, and cancel all other fetch threads.
-	 * 
-	 * @param error The error to report.
-	 */
-	void onErrorInFetchThread(Throwable error) {
-		if (this.error.compareAndSet(null, error)) {
-			// we are the first to report an error
-			if (mainThread != null) {
-				mainThread.interrupt();
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Representation of a partition to fetch.
-	 */
-	private static class FetchPartition {
-		
-		/** ID of the partition within the topic (0 indexed, as given by Kafka) */
-		int partition;
-		
-		/** Offset pointing at the next element to read from that partition. */
-		long nextOffsetToRead;
-
-		FetchPartition(int partition, long nextOffsetToRead) {
-			this.partition = partition;
-			this.nextOffsetToRead = nextOffsetToRead;
-		}
-		
-		@Override
-		public String toString() {
-			return "FetchPartition {partition=" + partition + ", offset=" + nextOffsetToRead + '}';
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Per broker fetcher
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * Each broker needs its separate connection. This thread implements the connection to
-	 * one broker. The connection can fetch multiple partitions from the broker.
-	 * 
-	 * @param <T> The data type fetched.
-	 */
-	private static class SimpleConsumerThread<T> extends Thread {
-		
-		private final SourceFunction.SourceContext<T> sourceContext;
-		private final DeserializationSchema<T> valueDeserializer;
-		private final long[] offsetsState;
-		
-		private final FetchPartition[] partitions;
-		
-		private final Node broker;
-		private final String topic;
-		private final Properties config;
-
-		private final LegacyFetcher owner;
-
-		private SimpleConsumer consumer;
-		
-		private volatile boolean running = true;
-
-
-		// exceptions are thrown locally
-		public SimpleConsumerThread(LegacyFetcher owner,
-									Properties config, String topic,
-									Node broker,
-									FetchPartition[] partitions,
-									SourceFunction.SourceContext<T> sourceContext,
-									DeserializationSchema<T> valueDeserializer,
-									long[] offsetsState) {
-			this.owner = owner;
-			this.config = config;
-			this.topic = topic;
-			this.broker = broker;
-			this.partitions = partitions;
-			this.sourceContext = checkNotNull(sourceContext);
-			this.valueDeserializer = checkNotNull(valueDeserializer);
-			this.offsetsState = checkNotNull(offsetsState);
-		}
-
-		@Override
-		public void run() {
-			try {
-				// set up the config values
-				final String clientId = "flink-kafka-consumer-legacy-" + broker.idString();
-
-				// these are the actual configuration values of Kafka + their original default values.
-				
-				final int soTimeout = Integer.valueOf(config.getProperty("socket.timeout.ms", "30000"));
-				final int bufferSize = Integer.valueOf(config.getProperty("socket.receive.buffer.bytes", "65536"));
-				final int fetchSize = Integer.valueOf(config.getProperty("fetch.message.max.bytes", "1048576"));
-				final int maxWait = Integer.valueOf(config.getProperty("fetch.wait.max.ms", "100"));
-				final int minBytes = Integer.valueOf(config.getProperty("fetch.min.bytes", "1"));
-				
-				// create the Kafka consumer that we actually use for fetching
-				consumer = new SimpleConsumer(broker.host(), broker.port(), bufferSize, soTimeout, clientId);
-
-				// make sure that all partitions have some offsets to start with
-				// those partitions that do not have an offset from a checkpoint need to get
-				// their start offset from ZooKeeper
-				
-				List<FetchPartition> partitionsToGetOffsetsFor = new ArrayList<FetchPartition>();
-
-				for (FetchPartition fp : partitions) {
-					if (fp.nextOffsetToRead == FlinkKafkaConsumer.OFFSET_NOT_SET) {
-						// retrieve the offset from the consumer
-						partitionsToGetOffsetsFor.add(fp);
-					}
-				}
-				if (partitionsToGetOffsetsFor.size() > 0) {
-					long timeType;
-					if (config.getProperty("auto.offset.reset", "latest").equals("latest")) {
-						timeType = OffsetRequest.LatestTime();
-					} else {
-						timeType = OffsetRequest.EarliestTime();
-					}
-					getLastOffset(consumer, topic, partitionsToGetOffsetsFor, timeType);
-					LOG.info("No prior offsets found for some partitions in topic {}. Fetched the following start offsets {}",
-							topic, partitionsToGetOffsetsFor);
-				}
-				
-				// Now, the actual work starts :-)
-				
-				while (running) {
-					FetchRequestBuilder frb = new FetchRequestBuilder();
-					frb.clientId(clientId);
-					frb.maxWait(maxWait);
-					frb.minBytes(minBytes);
-					
-					for (FetchPartition fp : partitions) {
-						frb.addFetch(topic, fp.partition, fp.nextOffsetToRead, fetchSize);
-					}
-					kafka.api.FetchRequest fetchRequest = frb.build();
-					LOG.debug("Issuing fetch request {}", fetchRequest);
-
-					FetchResponse fetchResponse;
-					fetchResponse = consumer.fetch(fetchRequest);
-
-					if (fetchResponse.hasError()) {
-						String exception = "";
-						for (FetchPartition fp : partitions) {
-							short code;
-							if ((code = fetchResponse.errorCode(topic, fp.partition)) != ErrorMapping.NoError()) {
-								exception += "\nException for partition " + fp.partition + ": " + 
-										StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
-							}
-						}
-						throw new IOException("Error while fetching from broker: " + exception);
-					}
-
-					int messagesInFetch = 0;
-					for (FetchPartition fp : partitions) {
-						final ByteBufferMessageSet messageSet = fetchResponse.messageSet(topic, fp.partition);
-						final int partition = fp.partition;
-						
-						for (MessageAndOffset msg : messageSet) {
-							if (running) {
-								messagesInFetch++;
-								if (msg.offset() < fp.nextOffsetToRead) {
-									// we have seen this message already
-									LOG.info("Skipping message with offset " + msg.offset()
-											+ " because we have seen messages until " + fp.nextOffsetToRead
-											+ " from partition " + fp.partition + " already");
-									continue;
-								}
-								
-								ByteBuffer payload = msg.message().payload();
-								byte[] valueByte = new byte[payload.remaining()];
-								payload.get(valueByte);
-								
-								final T value = valueDeserializer.deserialize(valueByte);
-								final long offset = msg.offset();
-										
-								synchronized (sourceContext.getCheckpointLock()) {
-									offsetsState[partition] = offset;
-									sourceContext.collect(value);
-								}
-								
-								// advance offset for the next request
-								fp.nextOffsetToRead = offset + 1;
-							}
-							else {
-								// no longer running
-								return;
-							}
-						}
-					}
-					LOG.debug("This fetch contained {} messages", messagesInFetch);
-				}
-			}
-			catch (Throwable t) {
-				// report to the main thread
-				owner.onErrorInFetchThread(t);
-			}
-			finally {
-				// end of run loop. close connection to consumer
-				if (consumer != null) {
-					// closing the consumer should not fail the program
-					try {
-						consumer.close();
-					}
-					catch (Throwable t) {
-						LOG.error("Error while closing the Kafka simple consumer", t);
-					}
-				}
-			}
-		}
-
-		/**
-		 * Cancels this fetch thread. The thread will release all resources and terminate.
-		 */
-		public void cancel() {
-			this.running = false;
-			
-			// interrupt whatever the consumer is doing
-			if (consumer != null) {
-				consumer.close();
-			}
-			
-			this.interrupt();
-		}
-
-		/**
-		 * Request latest offsets for a set of partitions, via a Kafka consumer.
-		 *
-		 * @param consumer The consumer connected to lead broker
-		 * @param topic The topic name
-		 * @param partitions The list of partitions we need offsets for
-		 * @param whichTime The type of time we are requesting. -1 and -2 are special constants (See OffsetRequest)
-		 */
-		private static void getLastOffset(SimpleConsumer consumer, String topic, List<FetchPartition> partitions, long whichTime) {
-
-			Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
-			for (FetchPartition fp: partitions) {
-				TopicAndPartition topicAndPartition = new TopicAndPartition(topic, fp.partition);
-				requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
-			}
-
-			kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
-			OffsetResponse response = consumer.getOffsetsBefore(request);
-
-			if (response.hasError()) {
-				String exception = "";
-				for (FetchPartition fp: partitions) {
-					short code;
-					if ( (code=response.errorCode(topic, fp.partition)) != ErrorMapping.NoError()) {
-						exception += "\nException for partition "+fp.partition+": "+ StringUtils.stringifyException(ErrorMapping.exceptionFor(code));
-					}
-				}
-				throw new RuntimeException("Unable to get last offset for topic " + topic + " and partitions " + partitions
-						+ ". " + exception);
-			}
-
-			for (FetchPartition fp: partitions) {
-				// the resulting offset is the next offset we are going to read
-				// for not-yet-consumed partitions, it is 0.
-				fp.nextOffsetToRead = response.offsets(topic, fp.partition)[0];
-			}
-		}
-	}
-	
-	private static class PartitionInfoFetcher extends Thread {
-
-		private final String topic;
-		private final Properties properties;
-		
-		private volatile List<PartitionInfo> result;
-		private volatile Throwable error;
-
-		
-		PartitionInfoFetcher(String topic, Properties properties) {
-			this.topic = topic;
-			this.properties = properties;
-		}
-
-		@Override
-		public void run() {
-			try {
-				result = FlinkKafkaConsumer.getPartitionsForTopic(topic, properties);
-			}
-			catch (Throwable t) {
-				this.error = t;
-			}
-		}
-		
-		public List<PartitionInfo> getPartitions() throws Exception {
-			try {
-				this.join();
-			}
-			catch (InterruptedException e) {
-				throw new Exception("Partition fetching was cancelled before completion");
-			}
-			
-			if (error != null) {
-				throw new Exception("Failed to fetch partitions for topic " + topic, error);
-			}
-			if (result != null) {
-				return result;
-			}
-			throw new Exception("Partition fetching failed");
-		}
-	}
-
-	private static class KillerWatchDog extends Thread {
-		
-		private final Thread toKill;
-		private final long timeout;
-
-		private KillerWatchDog(Thread toKill, long timeout) {
-			super("KillerWatchDog");
-			setDaemon(true);
-			
-			this.toKill = toKill;
-			this.timeout = timeout;
-		}
-
-		@SuppressWarnings("deprecation")
-		@Override
-		public void run() {
-			final long deadline = System.currentTimeMillis() + timeout;
-			long now;
-			
-			while (toKill.isAlive() && (now = System.currentTimeMillis()) < deadline) {
-				try {
-					toKill.join(deadline - now);
-				}
-				catch (InterruptedException e) {
-					// ignore here, out job is important!
-				}
-			}
-			
-			// this is harsh, but this watchdog is a last resort
-			if (toKill.isAlive()) {
-				toKill.stop();
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/NewConsumerApiFetcher.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/NewConsumerApiFetcher.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/NewConsumerApiFetcher.java
deleted file mode 100644
index db9424e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/NewConsumerApiFetcher.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.internals;
-
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.kafka_backport.clients.consumer.CommitType;
-import org.apache.flink.kafka_backport.clients.consumer.ConsumerRecord;
-import org.apache.flink.kafka_backport.clients.consumer.ConsumerRecords;
-import org.apache.flink.kafka_backport.clients.consumer.KafkaConsumer;
-import org.apache.flink.kafka_backport.common.TopicPartition;
-import org.apache.flink.kafka_backport.common.serialization.ByteArrayDeserializer;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * A fetcher that uses the new Kafka consumer API to fetch data for a specifies set of partitions.
- */
-public class NewConsumerApiFetcher implements Fetcher, OffsetHandler {
-
-	private static final String POLL_TIMEOUT_PROPERTY = "flink.kafka.consumer.poll.timeout";
-	private static final long DEFAULT_POLL_TIMEOUT = 50;
-	
-	private static final ByteArrayDeserializer NO_OP_SERIALIZER = new ByteArrayDeserializer();
-
-	
-	private final KafkaConsumer<byte[], byte[]> fetcher;
-	
-	private final long pollTimeout;
-	
-	private volatile boolean running = true;
-
-	
-	public NewConsumerApiFetcher(Properties props) {
-		this.pollTimeout = props.contains(POLL_TIMEOUT_PROPERTY) ?
-				Long.valueOf(props.getProperty(POLL_TIMEOUT_PROPERTY)) :
-				DEFAULT_POLL_TIMEOUT;
-		
-		this.fetcher = new KafkaConsumer<byte[], byte[]>(props, null, NO_OP_SERIALIZER, NO_OP_SERIALIZER);
-	}
-
-	@Override
-	public void setPartitionsToRead(List<TopicPartition> partitions) {
-		synchronized (fetcher) {
-			if (fetcher.subscriptions().isEmpty()) {
-				fetcher.subscribe(partitions.toArray(new TopicPartition[partitions.size()]));
-			}
-			else {
-				throw new IllegalStateException("Fetcher has already subscribed to its set of partitions");
-			}
-		}
-	}
-
-	@Override
-	public void seek(TopicPartition topicPartition, long offsetToRead) {
-		synchronized (fetcher) {
-			fetcher.seek(topicPartition, offsetToRead);
-		}
-	}
-
-	@Override
-	public void close() {
-		running = false;
-		synchronized (fetcher) {
-			fetcher.close();
-		}
-	}
-
-	@Override
-	public <T> void run(SourceFunction.SourceContext<T> sourceContext,
-						DeserializationSchema<T> valueDeserializer, long[] lastOffsets) {
-		while (running) {
-			// poll is always returning a new object.
-			ConsumerRecords<byte[], byte[]> consumed;
-			synchronized (fetcher) {
-				consumed = fetcher.poll(pollTimeout);
-			}
-
-			final Iterator<ConsumerRecord<byte[], byte[]>> records = consumed.iterator();
-			while (running && records.hasNext()) {
-				ConsumerRecord<byte[], byte[]> record = records.next();
-				T value = valueDeserializer.deserialize(record.value());
-				
-				// synchronize inside the loop to allow checkpoints in between batches
-				synchronized (sourceContext.getCheckpointLock()) {
-					sourceContext.collect(value);
-					lastOffsets[record.partition()] = record.offset();
-				}
-			}
-		}
-	}
-
-	@Override
-	public void commit(Map<TopicPartition, Long> offsetsToCommit) {
-		synchronized (fetcher) {
-			fetcher.commit(offsetsToCommit, CommitType.SYNC);
-		}
-	}
-
-	@Override
-	public void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) {
-		// no need to do anything here.
-		// if Kafka manages the offsets, it has them automatically
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/OffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/OffsetHandler.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/OffsetHandler.java
deleted file mode 100644
index d7eb19d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/OffsetHandler.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.internals;
-
-import org.apache.flink.kafka_backport.common.TopicPartition;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * The offset handler is responsible for locating the initial partition offsets 
- * where the source should start reading, as well as committing offsets from completed
- * checkpoints.
- */
-public interface OffsetHandler {
-
-	/**
-	 * Commits the given offset for the partitions. May commit the offsets to the Kafka broker,
-	 * or to ZooKeeper, based on its configured behavior.
-	 *
-	 * @param offsetsToCommit The offset to commit, per partition.
-	 */
-	void commit(Map<TopicPartition, Long> offsetsToCommit) throws Exception;
-
-	/**
-	 * Positions the given fetcher to the initial read offsets where the stream consumption
-	 * will start from.
-	 * 
-	 * @param partitions The partitions for which to seeks the fetcher to the beginning.
-	 * @param fetcher The fetcher that will pull data from Kafka and must be positioned.
-	 */
-	void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) throws Exception;
-
-	/**
-	 * Closes the offset handler, releasing all resources.
-	 * 
-	 * @throws IOException Thrown, if the closing fails.
-	 */
-	void close() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/ZooKeeperStringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/ZooKeeperStringSerializer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/ZooKeeperStringSerializer.java
deleted file mode 100644
index a6417a7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/ZooKeeperStringSerializer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.internals;
-
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-
-import java.nio.charset.Charset;
-
-/**
- * Simple ZooKeeper serializer for Strings.
- */
-public class ZooKeeperStringSerializer implements ZkSerializer {
-
-	private static final Charset CHARSET = Charset.forName("UTF-8");
-	
-	@Override
-	public byte[] serialize(Object data) {
-		if (data instanceof String) {
-			return ((String) data).getBytes(CHARSET);
-		}
-		else {
-			throw new IllegalArgumentException("ZooKeeperStringSerializer can only serialize strings.");
-		}
-	}
-
-	@Override
-	public Object deserialize(byte[] bytes) {
-		if (bytes == null) {
-			return null;
-		}
-		else {
-			return new String(bytes, CHARSET);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandler.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandler.java
deleted file mode 100644
index 9dd1192..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/streaming/connectors/internals/ZookeeperOffsetHandler.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.internals;
-
-import kafka.common.TopicAndPartition;
-import kafka.utils.ZKGroupTopicDirs;
-import kafka.utils.ZkUtils;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.flink.kafka_backport.clients.consumer.ConsumerConfig;
-import org.apache.flink.kafka_backport.common.TopicPartition;
-import org.apache.flink.streaming.connectors.FlinkKafkaConsumer;
-import org.apache.zookeeper.data.Stat;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.Option;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-public class ZookeeperOffsetHandler implements OffsetHandler {
-
-	private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class);
-	
-	private static final long OFFSET_NOT_SET = FlinkKafkaConsumer.OFFSET_NOT_SET;
-	
-	
-	private final ZkClient zkClient;
-	
-	private final String groupId;
-
-	
-	public ZookeeperOffsetHandler(Properties props) {
-		this.groupId = props.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
-		
-		if (this.groupId == null) {
-			throw new IllegalArgumentException("Required property '"
-					+ ConsumerConfig.GROUP_ID_CONFIG + "' has not been set");
-		}
-		
-		String zkConnect = props.getProperty("zookeeper.connect");
-		if (zkConnect == null) {
-			throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
-		}
-		
-		zkClient = new ZkClient(zkConnect,
-				Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "6000")),
-				Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "6000")),
-				new ZooKeeperStringSerializer());
-	}
-
-
-	@Override
-	public void commit(Map<TopicPartition, Long> offsetsToCommit) {
-		for (Map.Entry<TopicPartition, Long> entry : offsetsToCommit.entrySet()) {
-			TopicPartition tp = entry.getKey();
-			long offset = entry.getValue();
-			
-			if (offset >= 0) {
-				setOffsetInZooKeeper(zkClient, groupId, tp.topic(), tp.partition(), offset);
-			}
-		}
-	}
-
-	@Override
-	public void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) {
-		for (TopicPartition tp : partitions) {
-			long offset = getOffsetFromZooKeeper(zkClient, groupId, tp.topic(), tp.partition());
-
-			if (offset != OFFSET_NOT_SET) {
-				LOG.info("Offset for partition {} was set to {} in ZooKeeper. Seeking fetcher to that position.",
-						tp.partition(), offset);
-
-				// the offset in Zookeeper was the last read offset, seek is accepting the next-to-read-offset.
-				fetcher.seek(tp, offset + 1);
-			}
-		}
-	}
-
-	@Override
-	public void close() throws IOException {
-		zkClient.close();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Communication with Zookeeper
-	// ------------------------------------------------------------------------
-	
-	public static void setOffsetInZooKeeper(ZkClient zkClient, String groupId, String topic, int partition, long offset) {
-		TopicAndPartition tap = new TopicAndPartition(topic, partition);
-		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
-		ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition(), Long.toString(offset));
-	}
-
-	public static long getOffsetFromZooKeeper(ZkClient zkClient, String groupId, String topic, int partition) {
-		TopicAndPartition tap = new TopicAndPartition(topic, partition);
-		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
-
-		scala.Tuple2<Option<String>, Stat> data = ZkUtils.readDataMaybeNull(zkClient,
-				topicDirs.consumerOffsetDir() + "/" + tap.partition());
-
-		if (data._1().isEmpty()) {
-			return OFFSET_NOT_SET;
-		} else {
-			return Long.valueOf(data._1().get());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka081ITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka081ITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka081ITCase.java
deleted file mode 100644
index 218315f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka081ITCase.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors;
-
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import org.junit.Test;
-
-import java.util.Properties;
-
-
-public class Kafka081ITCase extends KafkaConsumerTestBase {
-	
-	@Override
-	protected <T> FlinkKafkaConsumer<T> getConsumer(String topic, DeserializationSchema<T> deserializationSchema, Properties props) {
-		return new FlinkKafkaConsumer081<T>(topic, deserializationSchema, props);
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Suite of Tests
-	// ------------------------------------------------------------------------
-	
-	@Test
-	public void testCheckpointing() {
-		runCheckpointingTest();
-	}
-
-	@Test
-	public void testOffsetInZookeeper() {
-		runOffsetInZookeeperValidationTest();
-	}
-	
-	@Test
-	public void testConcurrentProducerConsumerTopology() {
-		runSimpleConcurrentProducerConsumerTopology();
-	}
-
-	// --- canceling / failures ---
-	
-	@Test
-	public void testCancelingEmptyTopic() {
-		runCancelingOnEmptyInputTest();
-	}
-
-	@Test
-	public void testCancelingFullTopic() {
-		runCancelingOnFullInputTest();
-	}
-
-	@Test
-	public void testFailOnDeploy() {
-		runFailOnDeployTest();
-	}
-
-	// --- source to partition mappings and exactly once ---
-	
-	@Test
-	public void testOneToOneSources() {
-		runOneToOneExactlyOnceTest();
-	}
-
-	@Test
-	public void testOneSourceMultiplePartitions() {
-		runOneSourceMultiplePartitionsExactlyOnceTest();
-	}
-
-	@Test
-	public void testMultipleSourcesOnePartition() {
-		runMultipleSourcesOnePartitionExactlyOnceTest();
-	}
-
-	// --- broker failure ---
-
-	@Test
-	public void testBrokerFailure() {
-		runBrokerFailureTest();
-	}
-
-	// --- special executions ---
-	
-	@Test
-	public void testBigRecordJob() {
-		runBigRecordTestTopology();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka082ITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka082ITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka082ITCase.java
deleted file mode 100644
index 2f80fcb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/Kafka082ITCase.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors;
-
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.Properties;
-
-
-public class Kafka082ITCase extends KafkaConsumerTestBase {
-	
-	@Override
-	protected <T> FlinkKafkaConsumer<T> getConsumer(String topic, DeserializationSchema<T> deserializationSchema, Properties props) {
-		return new FlinkKafkaConsumer082<T>(topic, deserializationSchema, props);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Suite of Tests
-	// ------------------------------------------------------------------------
-
-	@Test
-	public void testCheckpointing() {
-		runCheckpointingTest();
-	}
-
-	@Test
-	public void testOffsetInZookeeper() {
-		runOffsetInZookeeperValidationTest();
-	}
-
-	@Test
-	public void testConcurrentProducerConsumerTopology() {
-		runSimpleConcurrentProducerConsumerTopology();
-	}
-
-	// --- canceling / failures ---
-
-	@Test
-	public void testCancelingEmptyTopic() {
-		runCancelingOnEmptyInputTest();
-	}
-
-	@Test
-	public void testCancelingFullTopic() {
-		runCancelingOnFullInputTest();
-	}
-
-	@Test
-	public void testFailOnDeploy() {
-		runFailOnDeployTest();
-	}
-
-	// --- source to partition mappings and exactly once ---
-
-	@Test
-	public void testOneToOneSources() {
-		runOneToOneExactlyOnceTest();
-	}
-
-	@Test
-	public void testOneSourceMultiplePartitions() {
-		runOneSourceMultiplePartitionsExactlyOnceTest();
-	}
-
-	@Test
-	public void testMultipleSourcesOnePartition() {
-		runMultipleSourcesOnePartitionExactlyOnceTest();
-	}
-
-	// --- broker failure ---
-
-	@Test
-	public void testBrokerFailure() {
-		runBrokerFailureTest();
-	}
-
-	// --- special executions ---
-
-	@Test
-	@Ignore("this does not work with the new consumer")
-	public void testBigRecordJob() {
-		runBigRecordTestTopology();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerPartitionAssignmentTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerPartitionAssignmentTest.java
deleted file mode 100644
index 8248cee..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerPartitionAssignmentTest.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors;
-
-import org.apache.flink.kafka_backport.common.TopicPartition;
-
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests that the partition assignment is deterministic and stable.
- */
-public class KafkaConsumerPartitionAssignmentTest {
-
-	@Test
-	public void testPartitionsEqualConsumers() {
-		try {
-			int[] partitions = {4, 52, 17, 1};
-			
-			for (int i = 0; i < partitions.length; i++) {
-				List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
-						partitions, "test-topic", partitions.length, i);
-				
-				assertNotNull(parts);
-				assertEquals(1, parts.size());
-				assertTrue(contains(partitions, parts.get(0).partition()));
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testMultiplePartitionsPerConsumers() {
-		try {
-			final int[] partitions = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
-
-			final Set<Integer> allPartitions = new HashSet<>();
-			for (int i : partitions) {
-				allPartitions.add(i);
-			}
-			
-			final int numConsumers = 3;
-			final int minPartitionsPerConsumer = partitions.length / numConsumers;
-			final int maxPartitionsPerConsumer = partitions.length / numConsumers + 1;
-			
-			for (int i = 0; i < numConsumers; i++) {
-				List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
-						partitions, "test-topic", numConsumers, i);
-
-				assertNotNull(parts);
-				assertTrue(parts.size() >= minPartitionsPerConsumer);
-				assertTrue(parts.size() <= maxPartitionsPerConsumer);
-
-				for (TopicPartition p : parts) {
-					// check that the element was actually contained
-					assertTrue(allPartitions.remove(p.partition()));
-				}
-			}
-
-			// all partitions must have been assigned
-			assertTrue(allPartitions.isEmpty());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testPartitionsFewerThanConsumers() {
-		try {
-			final int[] partitions = {4, 52, 17, 1};
-
-			final Set<Integer> allPartitions = new HashSet<>();
-			for (int i : partitions) {
-				allPartitions.add(i);
-			}
-
-			final int numConsumers = 2 * partitions.length + 3;
-			
-			for (int i = 0; i < numConsumers; i++) {
-				List<TopicPartition> parts = FlinkKafkaConsumer.assignPartitions(
-						partitions, "test-topic", numConsumers, i);
-
-				assertNotNull(parts);
-				assertTrue(parts.size() <= 1);
-				
-				for (TopicPartition p : parts) {
-					// check that the element was actually contained
-					assertTrue(allPartitions.remove(p.partition()));
-				}
-			}
-
-			// all partitions must have been assigned
-			assertTrue(allPartitions.isEmpty());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testAssignEmptyPartitions() {
-		try {
-			List<TopicPartition> parts1 = FlinkKafkaConsumer.assignPartitions(new int[0], "test-topic", 4, 2);
-			assertNotNull(parts1);
-			assertTrue(parts1.isEmpty());
-
-			List<TopicPartition> parts2 = FlinkKafkaConsumer.assignPartitions(new int[0], "test-topic", 1, 0);
-			assertNotNull(parts2);
-			assertTrue(parts2.isEmpty());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testGrowingPartitionsRemainsStable() {
-		try {
-			final int[] newPartitions = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
-			final int[] initialPartitions = Arrays.copyOfRange(newPartitions, 0, 7);
-
-			final Set<Integer> allNewPartitions = new HashSet<>();
-			final Set<Integer> allInitialPartitions = new HashSet<>();
-			for (int i : newPartitions) {
-				allNewPartitions.add(i);
-			}
-			for (int i : initialPartitions) {
-				allInitialPartitions.add(i);
-			}
-
-			final int numConsumers = 3;
-			final int minInitialPartitionsPerConsumer = initialPartitions.length / numConsumers;
-			final int maxInitialPartitionsPerConsumer = initialPartitions.length / numConsumers + 1;
-			final int minNewPartitionsPerConsumer = newPartitions.length / numConsumers;
-			final int maxNewPartitionsPerConsumer = newPartitions.length / numConsumers + 1;
-			
-			List<TopicPartition> parts1 = FlinkKafkaConsumer.assignPartitions(
-					initialPartitions, "test-topic", numConsumers, 0);
-			List<TopicPartition> parts2 = FlinkKafkaConsumer.assignPartitions(
-					initialPartitions, "test-topic", numConsumers, 1);
-			List<TopicPartition> parts3 = FlinkKafkaConsumer.assignPartitions(
-					initialPartitions, "test-topic", numConsumers, 2);
-
-			assertNotNull(parts1);
-			assertNotNull(parts2);
-			assertNotNull(parts3);
-			
-			assertTrue(parts1.size() >= minInitialPartitionsPerConsumer);
-			assertTrue(parts1.size() <= maxInitialPartitionsPerConsumer);
-			assertTrue(parts2.size() >= minInitialPartitionsPerConsumer);
-			assertTrue(parts2.size() <= maxInitialPartitionsPerConsumer);
-			assertTrue(parts3.size() >= minInitialPartitionsPerConsumer);
-			assertTrue(parts3.size() <= maxInitialPartitionsPerConsumer);
-
-			for (TopicPartition p : parts1) {
-				// check that the element was actually contained
-				assertTrue(allInitialPartitions.remove(p.partition()));
-			}
-			for (TopicPartition p : parts2) {
-				// check that the element was actually contained
-				assertTrue(allInitialPartitions.remove(p.partition()));
-			}
-			for (TopicPartition p : parts3) {
-				// check that the element was actually contained
-				assertTrue(allInitialPartitions.remove(p.partition()));
-			}
-			
-			// all partitions must have been assigned
-			assertTrue(allInitialPartitions.isEmpty());
-			
-			// grow the set of partitions and distribute anew
-			
-			List<TopicPartition> parts1new = FlinkKafkaConsumer.assignPartitions(
-					newPartitions, "test-topic", numConsumers, 0);
-			List<TopicPartition> parts2new = FlinkKafkaConsumer.assignPartitions(
-					newPartitions, "test-topic", numConsumers, 1);
-			List<TopicPartition> parts3new = FlinkKafkaConsumer.assignPartitions(
-					newPartitions, "test-topic", numConsumers, 2);
-
-			// new partitions must include all old partitions
-			
-			assertTrue(parts1new.size() > parts1.size());
-			assertTrue(parts2new.size() > parts2.size());
-			assertTrue(parts3new.size() > parts3.size());
-			
-			assertTrue(parts1new.containsAll(parts1));
-			assertTrue(parts2new.containsAll(parts2));
-			assertTrue(parts3new.containsAll(parts3));
-
-			assertTrue(parts1new.size() >= minNewPartitionsPerConsumer);
-			assertTrue(parts1new.size() <= maxNewPartitionsPerConsumer);
-			assertTrue(parts2new.size() >= minNewPartitionsPerConsumer);
-			assertTrue(parts2new.size() <= maxNewPartitionsPerConsumer);
-			assertTrue(parts3new.size() >= minNewPartitionsPerConsumer);
-			assertTrue(parts3new.size() <= maxNewPartitionsPerConsumer);
-
-			for (TopicPartition p : parts1new) {
-				// check that the element was actually contained
-				assertTrue(allNewPartitions.remove(p.partition()));
-			}
-			for (TopicPartition p : parts2new) {
-				// check that the element was actually contained
-				assertTrue(allNewPartitions.remove(p.partition()));
-			}
-			for (TopicPartition p : parts3new) {
-				// check that the element was actually contained
-				assertTrue(allNewPartitions.remove(p.partition()));
-			}
-
-			// all partitions must have been assigned
-			assertTrue(allNewPartitions.isEmpty());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	private static boolean contains(int[] array, int value) {
-		for (int i : array) {
-			if (i == value) {
-				return true;
-			}
-		}
-		return false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTest.java
deleted file mode 100644
index 4949714..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/KafkaConsumerTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors;
-
-import org.apache.commons.collections.map.LinkedMap;
-import org.apache.flink.kafka_backport.clients.consumer.ConsumerConfig;
-
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.Arrays;
-import java.util.Properties;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-public class KafkaConsumerTest {
-
-	@Test
-	public void testValidateZooKeeperConfig() {
-		try {
-			// empty
-			Properties emptyProperties = new Properties();
-			try {
-				FlinkKafkaConsumer.validateZooKeeperConfig(emptyProperties);
-				fail("should fail with an exception");
-			}
-			catch (IllegalArgumentException e) {
-				// expected
-			}
-
-			// no connect string (only group string)
-			Properties noConnect = new Properties();
-			noConnect.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-test-group");
-			try {
-				FlinkKafkaConsumer.validateZooKeeperConfig(noConnect);
-				fail("should fail with an exception");
-			}
-			catch (IllegalArgumentException e) {
-				// expected
-			}
-
-			// no group string (only connect string)
-			Properties noGroup = new Properties();
-			noGroup.put("zookeeper.connect", "localhost:47574");
-			try {
-				FlinkKafkaConsumer.validateZooKeeperConfig(noGroup);
-				fail("should fail with an exception");
-			}
-			catch (IllegalArgumentException e) {
-				// expected
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testSnapshot() {
-		try {
-			Field offsetsField = FlinkKafkaConsumer.class.getDeclaredField("lastOffsets");
-			Field runningField = FlinkKafkaConsumer.class.getDeclaredField("running");
-			Field mapField = FlinkKafkaConsumer.class.getDeclaredField("pendingCheckpoints");
-			
-			offsetsField.setAccessible(true);
-			runningField.setAccessible(true);
-			mapField.setAccessible(true);
-
-			FlinkKafkaConsumer<?> consumer = mock(FlinkKafkaConsumer.class);
-			when(consumer.snapshotState(anyLong(), anyLong())).thenCallRealMethod();
-			
-			long[] testOffsets = new long[] { 43, 6146, 133, 16, 162, 616 };
-			LinkedMap map = new LinkedMap();
-			
-			offsetsField.set(consumer, testOffsets);
-			runningField.set(consumer, true);
-			mapField.set(consumer, map);
-			
-			assertTrue(map.isEmpty());
-
-			// make multiple checkpoints
-			for (long checkpointId = 10L; checkpointId <= 2000L; checkpointId += 9L) {
-				long[] checkpoint = consumer.snapshotState(checkpointId, 47 * checkpointId);
-				assertArrayEquals(testOffsets, checkpoint);
-				
-				// change the offsets, make sure the snapshot did not change
-				long[] checkpointCopy = Arrays.copyOf(checkpoint, checkpoint.length);
-				
-				for (int i = 0; i < testOffsets.length; i++) {
-					testOffsets[i] += 1L;
-				}
-				
-				assertArrayEquals(checkpointCopy, checkpoint);
-				
-				assertTrue(map.size() > 0);
-				assertTrue(map.size() <= FlinkKafkaConsumer.MAX_NUM_PENDING_CHECKPOINTS);
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	@Ignore("Kafka consumer internally makes an infinite loop")
-	public void testCreateSourceWithoutCluster() {
-		try {
-			Properties props = new Properties();
-			props.setProperty("zookeeper.connect", "localhost:56794");
-			props.setProperty("bootstrap.servers", "localhost:11111, localhost:22222");
-			props.setProperty("group.id", "non-existent-group");
-			
-			new FlinkKafkaConsumer<String>("no op topic", new JavaDefaultStringSchema(), props,
-					FlinkKafkaConsumer.OffsetStore.FLINK_ZOOKEEPER,
-					FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}


Mime
View raw message