flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [8/9] flink git commit: [FLINK-6988][kafka] Add flink-connector-kafka-0.11 with exactly-once semantic
Date Mon, 09 Oct 2017 16:59:14 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
new file mode 100644
index 0000000..6d259fa
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * IT cases for Kafka 0.11 .
+ */
+public class Kafka011ITCase extends KafkaConsumerTestBase {
+
+	@BeforeClass
+	public static void prepare() throws ClassNotFoundException {
+		KafkaProducerTestBase.prepare();
+		((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Suite of Tests
+	// ------------------------------------------------------------------------
+
+	@Test(timeout = 60000)
+	public void testFailOnNoBroker() throws Exception {
+		runFailOnNoBrokerTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testConcurrentProducerConsumerTopology() throws Exception {
+		runSimpleConcurrentProducerConsumerTopology();
+	}
+
+	@Test(timeout = 60000)
+	public void testKeyValueSupport() throws Exception {
+		runKeyValueTest();
+	}
+
+	// --- canceling / failures ---
+
+	@Test(timeout = 60000)
+	public void testCancelingEmptyTopic() throws Exception {
+		runCancelingOnEmptyInputTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testCancelingFullTopic() throws Exception {
+		runCancelingOnFullInputTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testFailOnDeploy() throws Exception {
+		runFailOnDeployTest();
+	}
+
+	// --- source to partition mappings and exactly once ---
+
+	@Test(timeout = 60000)
+	public void testOneToOneSources() throws Exception {
+		runOneToOneExactlyOnceTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testOneSourceMultiplePartitions() throws Exception {
+		runOneSourceMultiplePartitionsExactlyOnceTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testMultipleSourcesOnePartition() throws Exception {
+		runMultipleSourcesOnePartitionExactlyOnceTest();
+	}
+
+	// --- broker failure ---
+
+	@Test(timeout = 60000)
+	public void testBrokerFailure() throws Exception {
+		runBrokerFailureTest();
+	}
+
+	// --- special executions ---
+
+	@Test(timeout = 60000)
+	public void testBigRecordJob() throws Exception {
+		runBigRecordTestTopology();
+	}
+
+	@Test(timeout = 60000)
+	public void testMultipleTopics() throws Exception {
+		runProduceConsumeMultipleTopics();
+	}
+
+	@Test(timeout = 60000)
+	public void testAllDeletes() throws Exception {
+		runAllDeletesTest();
+	}
+
+	@Test(timeout = 60000)
+	public void testMetricsAndEndOfStream() throws Exception {
+		runEndOfStreamTest();
+	}
+
+	// --- startup mode ---
+
+	@Test(timeout = 60000)
+	public void testStartFromEarliestOffsets() throws Exception {
+		runStartFromEarliestOffsets();
+	}
+
+	@Test(timeout = 60000)
+	public void testStartFromLatestOffsets() throws Exception {
+		runStartFromLatestOffsets();
+	}
+
+	@Test(timeout = 60000)
+	public void testStartFromGroupOffsets() throws Exception {
+		runStartFromGroupOffsets();
+	}
+
+	@Test(timeout = 60000)
+	public void testStartFromSpecificOffsets() throws Exception {
+		runStartFromSpecificOffsets();
+	}
+
+	// --- offset committing ---
+
+	@Test(timeout = 60000)
+	public void testCommitOffsetsToKafka() throws Exception {
+		runCommitOffsetsToKafka();
+	}
+
+	@Test(timeout = 60000)
+	public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+		runAutoOffsetRetrievalAndCommitToKafka();
+	}
+
+	/**
+	 * Kafka 0.11 specific test, ensuring Timestamps are properly written to and read from Kafka.
+	 */
+	@Test(timeout = 60000)
+	public void testTimestamps() throws Exception {
+
+		final String topic = "tstopic";
+		createTestTopic(topic, 3, 1);
+
+		// ---------- Produce an event time stream into Kafka -------------------
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+		env.getConfig().disableSysoutLogging();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		DataStream<Long> streamWithTimestamps = env.addSource(new SourceFunction<Long>() {
+			private static final long serialVersionUID = -2255115836471289626L;
+			boolean running = true;
+
+			@Override
+			public void run(SourceContext<Long> ctx) throws Exception {
+				long i = 0;
+				while (running) {
+					ctx.collectWithTimestamp(i, i * 2);
+					if (i++ == 1110L) {
+						running = false;
+					}
+				}
+			}
+
+			@Override
+			public void cancel() {
+				running = false;
+			}
+		});
+
+		final TypeInformationSerializationSchema<Long> longSer = new TypeInformationSerializationSchema<>(TypeInfoParser.<Long>parse("Long"), env.getConfig());
+		FlinkKafkaProducer011<Long> prod = new FlinkKafkaProducer011<>(topic, new KeyedSerializationSchemaWrapper<>(longSer), standardProps, Optional.of(new FlinkKafkaPartitioner<Long>() {
+			private static final long serialVersionUID = -6730989584364230617L;
+
+			@Override
+			public int partition(Long next, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+				return (int) (next % 3);
+			}
+		}));
+		prod.setWriteTimestampToKafka(true);
+
+		streamWithTimestamps.addSink(prod).setParallelism(3);
+
+		env.execute("Produce some");
+
+		// ---------- Consume stream from Kafka -------------------
+
+		env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+		env.getConfig().disableSysoutLogging();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		FlinkKafkaConsumer011<Long> kafkaSource = new FlinkKafkaConsumer011<>(topic, new LimitedLongDeserializer(), standardProps);
+		kafkaSource.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Long>() {
+			private static final long serialVersionUID = -4834111173247835189L;
+
+			@Nullable
+			@Override
+			public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
+				if (lastElement % 11 == 0) {
+					return new Watermark(lastElement);
+				}
+				return null;
+			}
+
+			@Override
+			public long extractTimestamp(Long element, long previousElementTimestamp) {
+				return previousElementTimestamp;
+			}
+		});
+
+		DataStream<Long> stream = env.addSource(kafkaSource);
+		GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
+		stream.transform("timestamp validating operator", objectTypeInfo, new TimestampValidatingOperator()).setParallelism(1);
+
+		env.execute("Consume again");
+
+		deleteTestTopic(topic);
+	}
+
+	private static class TimestampValidatingOperator extends StreamSink<Long> {
+
+		private static final long serialVersionUID = 1353168781235526806L;
+
+		public TimestampValidatingOperator() {
+			super(new SinkFunction<Long>() {
+				private static final long serialVersionUID = -6676565693361786524L;
+
+				@Override
+				public void invoke(Long value) throws Exception {
+					throw new RuntimeException("Unexpected");
+				}
+			});
+		}
+
+		long elCount = 0;
+		long wmCount = 0;
+		long lastWM = Long.MIN_VALUE;
+
+		@Override
+		public void processElement(StreamRecord<Long> element) throws Exception {
+			elCount++;
+			if (element.getValue() * 2 != element.getTimestamp()) {
+				throw new RuntimeException("Invalid timestamp: " + element);
+			}
+		}
+
+		@Override
+		public void processWatermark(Watermark mark) throws Exception {
+			wmCount++;
+
+			if (lastWM <= mark.getTimestamp()) {
+				lastWM = mark.getTimestamp();
+			} else {
+				throw new RuntimeException("Received watermark higher than the last one");
+			}
+
+			if (mark.getTimestamp() % 11 != 0 && mark.getTimestamp() != Long.MAX_VALUE) {
+				throw new RuntimeException("Invalid watermark: " + mark.getTimestamp());
+			}
+		}
+
+		@Override
+		public void close() throws Exception {
+			super.close();
+			if (elCount != 1110L) {
+				throw new RuntimeException("Wrong final element count " + elCount);
+			}
+
+			if (wmCount <= 2) {
+				throw new RuntimeException("Almost no watermarks have been sent " + wmCount);
+			}
+		}
+	}
+
+	private static class LimitedLongDeserializer implements KeyedDeserializationSchema<Long> {
+
+		private static final long serialVersionUID = 6966177118923713521L;
+		private final TypeInformation<Long> ti;
+		private final TypeSerializer<Long> ser;
+		long cnt = 0;
+
+		public LimitedLongDeserializer() {
+			this.ti = TypeInfoParser.parse("Long");
+			this.ser = ti.createSerializer(new ExecutionConfig());
+		}
+
+		@Override
+		public TypeInformation<Long> getProducedType() {
+			return ti;
+		}
+
+		@Override
+		public Long deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+			cnt++;
+			DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
+			Long e = ser.deserialize(in);
+			return e;
+		}
+
+		@Override
+		public boolean isEndOfStream(Long nextElement) {
+			return cnt > 1110L;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
new file mode 100644
index 0000000..c2e256c
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Tests for the {@link Kafka011JsonTableSource}.
+ */
+public class Kafka011JsonTableSourceTest extends KafkaTableSourceTestBase {
+
+	@Override
+	protected KafkaTableSource createTableSource(String topic, Properties properties, TypeInformation<Row> typeInfo) {
+		return new Kafka011JsonTableSource(topic, properties, typeInfo);
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
+		return (Class) JsonRowDeserializationSchema.class;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
+		return (Class) FlinkKafkaConsumer011.class;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java
new file mode 100644
index 0000000..ad63662
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerAtLeastOnceITCase.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.BeforeClass;
+
+/**
+ * IT cases for the {@link FlinkKafkaProducer011}.
+ */
+@SuppressWarnings("serial")
+public class Kafka011ProducerAtLeastOnceITCase extends KafkaProducerTestBase {
+
+	@BeforeClass
+	public static void prepare() throws ClassNotFoundException {
+		KafkaProducerTestBase.prepare();
+		((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE);
+	}
+
+	@Override
+	public void testExactlyOnceRegularSink() throws Exception {
+		// disable test for at least once semantic
+	}
+
+	@Override
+	public void testExactlyOnceCustomOperator() throws Exception {
+		// disable test for at least once semantic
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
new file mode 100644
index 0000000..1167238
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.junit.BeforeClass;
+
+/**
+ * IT cases for the {@link FlinkKafkaProducer011}.
+ */
+@SuppressWarnings("serial")
+public class Kafka011ProducerExactlyOnceITCase extends KafkaProducerTestBase {
+	@BeforeClass
+	public static void prepare() throws ClassNotFoundException {
+		KafkaProducerTestBase.prepare();
+		((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
+	}
+
+	@Override
+	public void testOneToOneAtLeastOnceRegularSink() throws Exception {
+		// TODO: fix this test
+		// currently very often (~50% cases) KafkaProducer live locks itself on commitTransaction call.
+		// Somehow Kafka 0.11 doesn't play along with NetworkFailureProxy. This can either mean a bug in Kafka
+		// that it doesn't work well with some weird network failures, or the NetworkFailureProxy is a broken design
+		// and this test should be reimplemented in completely different way...
+	}
+
+	@Override
+	public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
+		// TODO: fix this test
+		// currently very often (~50% cases) KafkaProducer live locks itself on commitTransaction call.
+		// Somehow Kafka 0.11 doesn't play along with NetworkFailureProxy. This can either mean a bug in Kafka
+		// that it doesn't work well with some weird network failures, or the NetworkFailureProxy is a broken design
+		// and this test should be reimplemented in completely different way...
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
new file mode 100644
index 0000000..e81148b
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.networking.NetworkFailuresProxy;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.util.NetUtils;
+
+import kafka.admin.AdminUtils;
+import kafka.common.KafkaException;
+import kafka.metrics.KafkaMetricsReporter;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.collections.list.UnmodifiableList;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.BindException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.UUID;
+
+import scala.collection.mutable.ArraySeq;
+
+import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * An implementation of the KafkaServerProvider for Kafka 0.11 .
+ */
+public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
+	private File tmpZkDir;
+	private File tmpKafkaParent;
+	private List<File> tmpKafkaDirs;
+	private List<KafkaServer> brokers;
+	private TestingServer zookeeper;
+	private String zookeeperConnectionString;
+	private String brokerConnectionString = "";
+	private Properties standardProps;
+	private FlinkKafkaProducer011.Semantic producerSemantic = FlinkKafkaProducer011.Semantic.EXACTLY_ONCE;
+	// 6 seconds is default. Seems to be too small for travis. 30 seconds
+	private int zkTimeout = 30000;
+	private Config config;
+
+	public String getBrokerConnectionString() {
+		return brokerConnectionString;
+	}
+
+	public void setProducerSemantic(FlinkKafkaProducer011.Semantic producerSemantic) {
+		this.producerSemantic = producerSemantic;
+	}
+
+	@Override
+	public Properties getStandardProperties() {
+		return standardProps;
+	}
+
+	@Override
+	public Properties getSecureProperties() {
+		Properties prop = new Properties();
+		if (config.isSecureMode()) {
+			prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
+			prop.put("security.protocol", "SASL_PLAINTEXT");
+			prop.put("sasl.kerberos.service.name", "kafka");
+
+			//add special timeout for Travis
+			prop.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout));
+			prop.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout));
+			prop.setProperty("metadata.fetch.timeout.ms", "120000");
+		}
+		return prop;
+	}
+
+	@Override
+	public String getVersion() {
+		return "0.11";
+	}
+
+	@Override
+	public List<KafkaServer> getBrokers() {
+		return brokers;
+	}
+
+	@Override
+	public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) {
+		return new FlinkKafkaConsumer011<>(topics, readSchema, props);
+	}
+
+	@Override
+	public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) {
+		List<ConsumerRecord<K, V>> result = new ArrayList<>();
+
+		try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties)) {
+			consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
+
+			while (true) {
+				boolean processedAtLeastOneRecord = false;
+
+				// wait for new records with timeout and break the loop if we didn't get any
+				Iterator<ConsumerRecord<K, V>> iterator = consumer.poll(timeout).iterator();
+				while (iterator.hasNext()) {
+					ConsumerRecord<K, V> record = iterator.next();
+					result.add(record);
+					processedAtLeastOneRecord = true;
+				}
+
+				if (!processedAtLeastOneRecord) {
+					break;
+				}
+			}
+			consumer.commitSync();
+		}
+
+		return UnmodifiableList.decorate(result);
+	}
+
+	@Override
+	public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
+		return new StreamSink<>(new FlinkKafkaProducer011<>(
+			topic,
+			serSchema,
+			props,
+			Optional.ofNullable(partitioner),
+			producerSemantic,
+			FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
+	}
+
+	@Override
+	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
+		return stream.addSink(new FlinkKafkaProducer011<>(
+			topic,
+			serSchema,
+			props,
+			Optional.ofNullable(partitioner),
+			producerSemantic,
+			FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
+	}
+
+	@Override
+	public <T> DataStreamSink<T> writeToKafkaWithTimestamps(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props) {
+		FlinkKafkaProducer011<T> prod = new FlinkKafkaProducer011<>(
+			topic, serSchema, props, Optional.of(new FlinkFixedPartitioner<>()), producerSemantic, FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+
+		prod.setWriteTimestampToKafka(true);
+
+		return stream.addSink(prod);
+	}
+
+	@Override
+	public KafkaOffsetHandler createOffsetHandler() {
+		return new KafkaOffsetHandlerImpl();
+	}
+
+	@Override
+	public void restartBroker(int leaderId) throws Exception {
+		brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId)));
+	}
+
+	@Override
+	public int getLeaderToShutDown(String topic) throws Exception {
+		ZkUtils zkUtils = getZkUtils();
+		try {
+			MetadataResponse.PartitionMetadata firstPart = null;
+			do {
+				if (firstPart != null) {
+					LOG.info("Unable to find leader. error code {}", firstPart.error().code());
+					// not the first try. Sleep a bit
+					Thread.sleep(150);
+				}
+
+				List<MetadataResponse.PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils).partitionMetadata();
+				firstPart = partitionMetadata.get(0);
+			}
+			while (firstPart.error().code() != 0);
+
+			return firstPart.leader().id();
+		} finally {
+			zkUtils.close();
+		}
+	}
+
+	@Override
+	public int getBrokerId(KafkaServer server) {
+		return server.config().brokerId();
+	}
+
+	@Override
+	public boolean isSecureRunSupported() {
+		return true;
+	}
+
+	@Override
+	public void prepare(Config config) {
+		//increase the timeout since in Travis ZK connection takes long time for secure connection.
+		if (config.isSecureMode()) {
+			//run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout
+			config.setKafkaServersNumber(1);
+			zkTimeout = zkTimeout * 15;
+		}
+		this.config = config;
+
+		File tempDir = new File(System.getProperty("java.io.tmpdir"));
+		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
+		assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
+
+		tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString()));
+		assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
+
+		tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
+		for (int i = 0; i < config.getKafkaServersNumber(); i++) {
+			File tmpDir = new File(tmpKafkaParent, "server-" + i);
+			assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
+			tmpKafkaDirs.add(tmpDir);
+		}
+
+		zookeeper = null;
+		brokers = null;
+
+		try {
+			zookeeper = new TestingServer(-1, tmpZkDir);
+			zookeeperConnectionString = zookeeper.getConnectString();
+			LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
+
+			LOG.info("Starting KafkaServer");
+			brokers = new ArrayList<>(config.getKafkaServersNumber());
+
+			ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
+			for (int i = 0; i < config.getKafkaServersNumber(); i++) {
+				KafkaServer kafkaServer = getKafkaServer(i, tmpKafkaDirs.get(i));
+				brokers.add(kafkaServer);
+				brokerConnectionString += hostAndPortToUrlString(KAFKA_HOST, kafkaServer.socketServer().boundPort(listenerName));
+				brokerConnectionString +=  ",";
+			}
+
+			LOG.info("ZK and KafkaServer started.");
+		}
+		catch (Throwable t) {
+			t.printStackTrace();
+			fail("Test setup failed: " + t.getMessage());
+		}
+
+		standardProps = new Properties();
+		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
+		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
+		standardProps.setProperty("group.id", "flink-tests");
+		standardProps.setProperty("enable.auto.commit", "false");
+		standardProps.setProperty("zookeeper.session.timeout.ms", String.valueOf(zkTimeout));
+		standardProps.setProperty("zookeeper.connection.timeout.ms", String.valueOf(zkTimeout));
+		standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning. (earliest is kafka 0.11 value)
+		standardProps.setProperty("max.partition.fetch.bytes", "256"); // make a lot of fetches (MESSAGES MUST BE SMALLER!)
+	}
+
+	@Override
+	public void shutdown() {
+		for (KafkaServer broker : brokers) {
+			if (broker != null) {
+				broker.shutdown();
+			}
+		}
+		brokers.clear();
+
+		if (zookeeper != null) {
+			try {
+				zookeeper.stop();
+			}
+			catch (Exception e) {
+				LOG.warn("ZK.stop() failed", e);
+			}
+			zookeeper = null;
+		}
+
+		// clean up the temp spaces
+
+		if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
+			try {
+				FileUtils.deleteDirectory(tmpKafkaParent);
+			}
+			catch (Exception e) {
+				// ignore
+			}
+		}
+		if (tmpZkDir != null && tmpZkDir.exists()) {
+			try {
+				FileUtils.deleteDirectory(tmpZkDir);
+			}
+			catch (Exception e) {
+				// ignore
+			}
+		}
+	}
+
+	public ZkUtils getZkUtils() {
+		ZkClient creator = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
+				Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
+		return ZkUtils.apply(creator, false);
+	}
+
+	@Override
+	public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) {
+		// create topic with one client
+		LOG.info("Creating topic {}", topic);
+
+		ZkUtils zkUtils = getZkUtils();
+		try {
+			AdminUtils.createTopic(zkUtils, topic, numberOfPartitions, replicationFactor, topicConfig, kafka.admin.RackAwareMode.Enforced$.MODULE$);
+		} finally {
+			zkUtils.close();
+		}
+
+		// validate that the topic has been created
+		final long deadline = System.nanoTime() + 30_000_000_000L;
+		do {
+			try {
+				if (config.isSecureMode()) {
+					//increase wait time since in Travis ZK timeout occurs frequently
+					int wait = zkTimeout / 100;
+					LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic);
+					Thread.sleep(wait);
+				} else {
+					Thread.sleep(100);
+				}
+			} catch (InterruptedException e) {
+				// restore interrupted state
+			}
+			// we could use AdminUtils.topicExists(zkUtils, topic) here, but it's results are
+			// not always correct.
+
+			// create a new ZK utils connection
+			ZkUtils checkZKConn = getZkUtils();
+			if (AdminUtils.topicExists(checkZKConn, topic)) {
+				checkZKConn.close();
+				return;
+			}
+			checkZKConn.close();
+		}
+		while (System.nanoTime() < deadline);
+		fail("Test topic could not be created");
+	}
+
+	@Override
+	public void deleteTestTopic(String topic) {
+		ZkUtils zkUtils = getZkUtils();
+		try {
+			LOG.info("Deleting topic {}", topic);
+
+			ZkClient zk = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
+				Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new ZooKeeperStringSerializer());
+
+			AdminUtils.deleteTopic(zkUtils, topic);
+
+			zk.close();
+		} finally {
+			zkUtils.close();
+		}
+	}
+
+	/**
+	 * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed).
+	 */
+	protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception {
+		Properties kafkaProperties = new Properties();
+
+		// properties have to be Strings
+		kafkaProperties.put("advertised.host.name", KAFKA_HOST);
+		kafkaProperties.put("broker.id", Integer.toString(brokerId));
+		kafkaProperties.put("log.dir", tmpFolder.toString());
+		kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
+		kafkaProperties.put("message.max.bytes", String.valueOf(50 * 1024 * 1024));
+		kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(50 * 1024 * 1024));
+		kafkaProperties.put("transaction.max.timeout.ms", Integer.toString(1000 * 60 * 60 * 2)); // 2hours
+
+		// for CI stability, increase zookeeper session timeout
+		kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
+		kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout);
+		if (config.getKafkaServerProperties() != null) {
+			kafkaProperties.putAll(config.getKafkaServerProperties());
+		}
+
+		final int numTries = 5;
+
+		for (int i = 1; i <= numTries; i++) {
+			int kafkaPort = NetUtils.getAvailablePort();
+			kafkaProperties.put("port", Integer.toString(kafkaPort));
+
+			if (config.isHideKafkaBehindProxy()) {
+				NetworkFailuresProxy proxy = createProxy(KAFKA_HOST, kafkaPort);
+				kafkaProperties.put("advertised.port", proxy.getLocalPort());
+			}
+
+			//to support secure kafka cluster
+			if (config.isSecureMode()) {
+				LOG.info("Adding Kafka secure configurations");
+				kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+				kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+				kafkaProperties.putAll(getSecureProperties());
+			}
+
+			KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
+
+			try {
+				scala.Option<String> stringNone = scala.Option.apply(null);
+				KafkaServer server = new KafkaServer(kafkaConfig, Time.SYSTEM, stringNone, new ArraySeq<KafkaMetricsReporter>(0));
+				server.startup();
+				return server;
+			}
+			catch (KafkaException e) {
+				if (e.getCause() instanceof BindException) {
+					// port conflict, retry...
+					LOG.info("Port conflict when starting Kafka Broker. Retrying...");
+				}
+				else {
+					throw e;
+				}
+			}
+		}
+
+		throw new Exception("Could not start Kafka after " + numTries + " retries due to port conflicts.");
+	}
+
+	private class KafkaOffsetHandlerImpl implements KafkaOffsetHandler {
+
+		private final KafkaConsumer<byte[], byte[]> offsetClient;
+
+		public KafkaOffsetHandlerImpl() {
+			Properties props = new Properties();
+			props.putAll(standardProps);
+			props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+			props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+
+			offsetClient = new KafkaConsumer<>(props);
+		}
+
+		@Override
+		public Long getCommittedOffset(String topicName, int partition) {
+			OffsetAndMetadata committed = offsetClient.committed(new TopicPartition(topicName, partition));
+			return (committed != null) ? committed.offset() : null;
+		}
+
+		@Override
+		public void setCommittedOffset(String topicName, int partition, long offset) {
+			Map<TopicPartition, OffsetAndMetadata> partitionAndOffset = new HashMap<>();
+			partitionAndOffset.put(new TopicPartition(topicName, partition), new OffsetAndMetadata(offset));
+			offsetClient.commitSync(partitionAndOffset);
+		}
+
+		@Override
+		public void close() {
+			offsetClient.close();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
index 681fe02..c3c9c07 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ProducerITCase.java
@@ -23,6 +23,15 @@ package org.apache.flink.streaming.connectors.kafka;
  */
 @SuppressWarnings("serial")
 public class Kafka08ProducerITCase extends KafkaProducerTestBase {
+	@Override
+	public void testExactlyOnceRegularSink() throws Exception {
+		// Kafka08 does not support exactly once semantic
+	}
+
+	@Override
+	public void testExactlyOnceCustomOperator() throws Exception {
+		// Kafka08 does not support exactly once semantic
+	}
 
 	@Override
 	public void testOneToOneAtLeastOnceRegularSink() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
index 847f818..b34132f 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ProducerITCase.java
@@ -24,6 +24,16 @@ package org.apache.flink.streaming.connectors.kafka;
 @SuppressWarnings("serial")
 public class Kafka09ProducerITCase extends KafkaProducerTestBase {
 	@Override
+	public void testExactlyOnceRegularSink() throws Exception {
+		// Kafka08 does not support exactly once semantic
+	}
+
+	@Override
+	public void testExactlyOnceCustomOperator() throws Exception {
+		// Kafka08 does not support exactly once semantic
+	}
+
+	@Override
 	public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
 		// Disable this test since FlinkKafka09Producer doesn't support custom operator mode
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index fda6832..e9a0331 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -174,7 +174,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			stream.print();
 			see.execute("No broker test");
 		} catch (JobExecutionException jee) {
-			if (kafkaServer.getVersion().equals("0.9") || kafkaServer.getVersion().equals("0.10")) {
+			if (kafkaServer.getVersion().equals("0.9") || kafkaServer.getVersion().equals("0.10") || kafkaServer.getVersion().equals("0.11")) {
 				assertTrue(jee.getCause() instanceof TimeoutException);
 
 				TimeoutException te = (TimeoutException) jee.getCause();

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 35607dd..e1ba074 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -38,26 +38,25 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
+import org.apache.flink.streaming.connectors.kafka.testutils.IntegerSource;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.util.Preconditions;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.Test;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 
 import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.assertEquals;
@@ -295,38 +294,79 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 	}
 
 	/**
-	 * We manually handle the timeout instead of using JUnit's timeout to return failure instead of timeout error.
-	 * After timeout we assume that there are missing records and there is a bug, not that the test has run out of time.
+	 * Tests the exactly-once semantic for the simple writes into Kafka.
 	 */
-	private void assertAtLeastOnceForTopic(
-			Properties properties,
-			String topic,
-			int partition,
-			Set<Integer> expectedElements,
-			long timeoutMillis) throws Exception {
-
-		long startMillis = System.currentTimeMillis();
-		Set<Integer> actualElements = new HashSet<>();
-
-		// until we timeout...
-		while (System.currentTimeMillis() < startMillis + timeoutMillis) {
-			properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
-			properties.put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
-
-			// query kafka for new records ...
-			Collection<ConsumerRecord<Integer, Integer>> records = kafkaServer.getAllRecordsFromTopic(properties, topic, partition, 100);
-
-			for (ConsumerRecord<Integer, Integer> record : records) {
-				actualElements.add(record.value());
-			}
+	@Test
+	public void testExactlyOnceRegularSink() throws Exception {
+		testExactlyOnce(true);
+	}
+
+	/**
+	 * Tests the exactly-once semantic for the simple writes into Kafka.
+	 */
+	@Test
+	public void testExactlyOnceCustomOperator() throws Exception {
+		testExactlyOnce(false);
+	}
+
+	/**
+	 * This test sets KafkaProducer so that it will  automatically flush the data and
+	 * and fails the broker to check whether flushed records since last checkpoint were not duplicated.
+	 */
+	protected void testExactlyOnce(boolean regularSink) throws Exception {
+		final String topic = regularSink ? "exactlyOnceTopicRegularSink" : "exactlyTopicCustomOperator";
+		final int partition = 0;
+		final int numElements = 1000;
+		final int failAfterElements = 333;
+
+		createTestTopic(topic, 1, 1);
+
+		TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
+		KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema);
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.enableCheckpointing(500);
+		env.setParallelism(1);
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+		env.getConfig().disableSysoutLogging();
+
+		Properties properties = new Properties();
+		properties.putAll(standardProps);
+		properties.putAll(secureProps);
 
-			// succeed if we got all expectedElements
-			if (actualElements.containsAll(expectedElements)) {
-				return;
+		// process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application
+		List<Integer> expectedElements = getIntegersSequence(numElements);
+
+		DataStream<Integer> inputStream = env
+			.addSource(new IntegerSource(numElements))
+			.map(new FailingIdentityMapper<Integer>(failAfterElements));
+
+		FlinkKafkaPartitioner<Integer> partitioner = new FlinkKafkaPartitioner<Integer>() {
+			@Override
+			public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+				return partition;
 			}
+		};
+		if (regularSink) {
+			StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, partitioner);
+			inputStream.addSink(kafkaSink.getUserFunction());
+		}
+		else {
+			kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, partitioner);
 		}
 
-		fail(String.format("Expected to contain all of: <%s>, but was: <%s>", expectedElements, actualElements));
+		FailingIdentityMapper.failedBefore = false;
+		TestUtils.tryExecute(env, "Exactly once test");
+
+		// assert that before failure we successfully snapshot/flushed all expected elements
+		assertExactlyOnceForTopic(
+			properties,
+			topic,
+			partition,
+			expectedElements,
+			30000L);
+
+		deleteTestTopic(topic);
 	}
 
 	private List<Integer> getIntegersSequence(int size) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index f8792e5..fcdb59b 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -32,6 +32,7 @@ import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -39,11 +40,18 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import scala.concurrent.duration.FiniteDuration;
 
+import static org.junit.Assert.fail;
+
 /**
  * The base for the Kafka tests. It brings up:
  * <ul>
@@ -209,4 +217,80 @@ public abstract class KafkaTestBase extends TestLogger {
 		kafkaServer.deleteTestTopic(topic);
 	}
 
+	/**
+	 * We manually handle the timeout instead of using JUnit's timeout to return failure instead of timeout error.
+	 * After timeout we assume that there are missing records and there is a bug, not that the test has run out of time.
+	 */
+	protected void assertAtLeastOnceForTopic(
+			Properties properties,
+			String topic,
+			int partition,
+			Set<Integer> expectedElements,
+			long timeoutMillis) throws Exception {
+
+		long startMillis = System.currentTimeMillis();
+		Set<Integer> actualElements = new HashSet<>();
+
+		// until we timeout...
+		while (System.currentTimeMillis() < startMillis + timeoutMillis) {
+			properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
+			properties.put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
+
+			// query kafka for new records ...
+			Collection<ConsumerRecord<Integer, Integer>> records = kafkaServer.getAllRecordsFromTopic(properties, topic, partition, 100);
+
+			for (ConsumerRecord<Integer, Integer> record : records) {
+				actualElements.add(record.value());
+			}
+
+			// succeed if we got all expectedElements
+			if (actualElements.containsAll(expectedElements)) {
+				return;
+			}
+		}
+
+		fail(String.format("Expected to contain all of: <%s>, but was: <%s>", expectedElements, actualElements));
+	}
+
+	/**
+	 * We manually handle the timeout instead of using JUnit's timeout to return failure instead of timeout error.
+	 * After timeout we assume that there are missing records and there is a bug, not that the test has run out of time.
+	 */
+	protected void assertExactlyOnceForTopic(
+			Properties properties,
+			String topic,
+			int partition,
+			List<Integer> expectedElements,
+			long timeoutMillis) throws Exception {
+
+		long startMillis = System.currentTimeMillis();
+		List<Integer> actualElements = new ArrayList<>();
+
+		Properties consumerProperties = new Properties();
+		consumerProperties.putAll(properties);
+		consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
+		consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
+		consumerProperties.put("isolation.level", "read_committed");
+
+		// until we timeout...
+		while (System.currentTimeMillis() < startMillis + timeoutMillis) {
+			// query kafka for new records ...
+			Collection<ConsumerRecord<Integer, Integer>> records = kafkaServer.getAllRecordsFromTopic(consumerProperties, topic, partition, 1000);
+
+			for (ConsumerRecord<Integer, Integer> record : records) {
+				actualElements.add(record.value());
+			}
+
+			// succeed if we got all expectedElements
+			if (actualElements.equals(expectedElements)) {
+				return;
+			}
+			// fail early if we already have too many elements
+			if (actualElements.size() > expectedElements.size()) {
+				break;
+			}
+		}
+
+		fail(String.format("Expected number of elements: <%s>, but was: <%s>", expectedElements.size(), actualElements.size()));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2f651e9a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java
new file mode 100644
index 0000000..ef50766
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/IntegerSource.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A Flink source that servers integers, but it completes only after a completed checkpoint after serving
+ * all of the elements.
+ */
+public class IntegerSource
+	extends RichParallelSourceFunction<Integer>
+	implements ListCheckpointed<Integer>, CheckpointListener {
+
+	/**
+	 * Blocker when the generator needs to wait for the checkpoint to happen.
+	 * Eager initialization means it must be serializable (pick any serializable type).
+	 */
+	private final Object blocker = new SerializableObject();
+
+	/**
+	 * The total number of events to generate.
+	 */
+	private final int numEventsTotal;
+
+	/**
+	 * The current position in the sequence of numbers.
+	 */
+	private int currentPosition = -1;
+
+	private long lastCheckpointTriggered;
+
+	private long lastCheckpointConfirmed;
+
+	private boolean restored;
+
+	private volatile boolean running = true;
+
+	public IntegerSource(int numEventsTotal) {
+		this.numEventsTotal = numEventsTotal;
+	}
+
+	@Override
+	public void run(SourceContext<Integer> ctx) throws Exception {
+
+		// each source subtask emits only the numbers where (num % parallelism == subtask_index)
+		final int stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
+		int current = this.currentPosition >= 0 ? this.currentPosition : getRuntimeContext().getIndexOfThisSubtask();
+
+		while (this.running && current < this.numEventsTotal) {
+			// emit the next element
+			synchronized (ctx.getCheckpointLock()) {
+				ctx.collect(current);
+				current += stepSize;
+				this.currentPosition = current;
+			}
+			// give some time to trigger checkpoint while we are not holding the lock (to prevent starvation)
+			if (!restored && current % 10 == 0) {
+				Thread.sleep(1);
+			}
+		}
+
+		// after we are done, we need to wait for two more checkpoint to complete
+		// before finishing the program - that is to be on the safe side that
+		// the sink also got the "commit" notification for all relevant checkpoints
+		// and committed the data
+		final long lastCheckpoint;
+		synchronized (ctx.getCheckpointLock()) {
+			lastCheckpoint = this.lastCheckpointTriggered;
+		}
+
+		synchronized (this.blocker) {
+			while (this.lastCheckpointConfirmed <= lastCheckpoint + 1) {
+				this.blocker.wait();
+			}
+		}
+	}
+
+	@Override
+	public void cancel() {
+		this.running = false;
+	}
+
+	@Override
+	public List<Integer> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+		this.lastCheckpointTriggered = checkpointId;
+
+		return Collections.singletonList(this.currentPosition);
+	}
+
+	@Override
+	public void restoreState(List<Integer> state) throws Exception {
+		this.currentPosition = state.get(0);
+
+		// at least one checkpoint must have happened so far
+		this.lastCheckpointTriggered = 1L;
+		this.lastCheckpointConfirmed = 1L;
+		this.restored = true;
+	}
+
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) throws Exception {
+		synchronized (blocker) {
+			this.lastCheckpointConfirmed = checkpointId;
+			blocker.notifyAll();
+		}
+	}
+}


Mime
View raw message