flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject flink git commit: [FLINK-4027] Flush FlinkKafkaProducer on checkpoints
Date Mon, 04 Jul 2016 09:55:18 GMT
Repository: flink
Updated Branches:
  refs/heads/master ffaf10d22 -> 7206b0ed2


[FLINK-4027] Flush FlinkKafkaProducer on checkpoints

This closes #2108

This closes #2058 because its an invalid pull request.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7206b0ed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7206b0ed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7206b0ed

Branch: refs/heads/master
Commit: 7206b0ed2adb10c94e1ffd3dbe851250b44edcf4
Parents: ffaf10d
Author: Robert Metzger <rmetzger@apache.org>
Authored: Wed Jun 15 17:50:38 2016 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Mon Jul 4 11:55:09 2016 +0200

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer08.java  |  18 ++
 .../kafka/KafkaTestEnvironmentImpl.java         |   4 +-
 .../connectors/kafka/FlinkKafkaProducer09.java  |   7 +
 .../kafka/KafkaTestEnvironmentImpl.java         |   4 +-
 .../kafka/FlinkKafkaProducerBase.java           | 120 ++++++++--
 .../kafka/AtLeastOnceProducerTest.java          | 218 +++++++++++++++++++
 .../connectors/kafka/KafkaConsumerTestBase.java |   3 +-
 7 files changed, 358 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7206b0ed/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
index 4975f9a..e509d2f 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
@@ -125,4 +125,22 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>
 {
 		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}
 
+	@Override
+	protected void flush() {
+		// The Kafka 0.8 producer doesn't support flushing, we wait here
+		// until all pending records are confirmed
+		//noinspection SynchronizeOnNonFinalField
+		synchronized (pendingRecordsLock) {
+			while (pendingRecords > 0) {
+				try {
+					pendingRecordsLock.wait();
+				} catch (InterruptedException e) {
+					// this can be interrupted when the Task has been cancelled.
+					// by throwing an exception, we ensure that this checkpoint doesn't get confirmed
+					throw new RuntimeException("Flushing got interrupted while checkpointing", e);
+				}
+			}
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7206b0ed/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 77d41ac..75ca9ed 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -97,7 +97,9 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
 	@Override
 	public <T> FlinkKafkaProducerBase<T> getProducer(String topic, KeyedSerializationSchema<T>
serSchema, Properties props, KafkaPartitioner<T> partitioner) {
-		return new FlinkKafkaProducer08<T>(topic, serSchema, props, partitioner);
+		FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<T>(topic, serSchema,
props, partitioner);
+		prod.setFlushOnCheckpoint(true);
+		return prod;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7206b0ed/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
index 6f7f687..eb3440a 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
@@ -127,4 +127,11 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN>
{
 	public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
 		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}
+
+	@Override
+	protected void flush() {
+		if (this.producer != null) {
+			producer.flush();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7206b0ed/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index b5df6e0..0dbe865 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -92,7 +92,9 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
 	@Override
 	public <T> FlinkKafkaProducerBase<T> getProducer(String topic, KeyedSerializationSchema<T>
serSchema, Properties props, KafkaPartitioner<T> partitioner) {
-		return new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
+		FlinkKafkaProducer09<T> prod = new FlinkKafkaProducer09<>(topic, serSchema,
props, partitioner);
+		prod.setFlushOnCheckpoint(true);
+		return prod;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7206b0ed/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index 6005dff..a9d4917 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -20,7 +20,9 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
@@ -28,6 +30,7 @@ import org.apache.flink.util.NetUtils;
 
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
@@ -39,6 +42,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -50,11 +54,14 @@ import static java.util.Objects.requireNonNull;
 /**
  * Flink Sink to produce data into a Kafka topic.
  *
- * Please note that this producer does not have any reliability guarantees.
+ * Please note that this producer provides at-least-once reliability guarantees when
+ * checkpoints are enabled and setFlushOnCheckpoint(true) is set.
+ * Otherwise, the producer doesn't provide any reliability guarantees.
  *
  * @param <IN> Type of the messages to write into Kafka.
  */
-public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>
 {
+@SuppressWarnings("SynchronizeOnNonFinalField")
+public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>
implements Checkpointed<Serializable> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
 
@@ -101,6 +108,11 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>
 {
 	 * Flag indicating whether to accept failures (and log them), or to fail on failures
 	 */
 	protected boolean logFailuresOnly;
+
+	/**
+	 * If true, the producer will wait until all outstanding records have been send to the broker.
+	 */
+	private boolean flushOnCheckpoint = false;
 	
 	// -------------------------------- Runtime fields ------------------------------------------
 
@@ -113,6 +125,16 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>
 {
 	/** Errors encountered in the async producer are stored here */
 	protected transient volatile Exception asyncException;
 
+	/**
+	 * Number of unacknowledged records.
+	 */
+	protected long pendingRecords = 0;
+
+	/**
+	 * Lock for accessing the pending records
+	 */
+	protected transient Object pendingRecordsLock;
+
 
 	/**
 	 * The main constructor for creating a FlinkKafkaProducer.
@@ -150,7 +172,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>
 {
 
 		// create a local KafkaProducer to get the list of partitions.
 		// this will also ensure locally that all required ProducerConfig values are set.
-		try (KafkaProducer<Void, IN> getPartitionsProd = new KafkaProducer<>(this.producerConfig))
{
+		try (Producer<Void, IN> getPartitionsProd = getKafkaProducer(this.producerConfig))
{
 			List<PartitionInfo> partitionsList = getPartitionsProd.partitionsFor(defaultTopicId);
 
 			this.partitions = new int[partitionsList.size()];
@@ -178,6 +200,24 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>
 {
 		this.logFailuresOnly = logFailuresOnly;
 	}
 
+	/**
+	 * If set to true, the Flink producer will wait for all outstanding messages in the Kafka
buffers
+	 * to be acknowledged by the Kafka producer on a checkpoint.
+	 * This way, the producer can guarantee that messages in the Kafka buffers are part of the
checkpoint.
+	 *
+	 * @param flush Flag indicating the flushing mode (true = flush on checkpoint)
+	 */
+	public void setFlushOnCheckpoint(boolean flush) {
+		this.flushOnCheckpoint = flush;
+	}
+
+	/**
+	 * Used for testing only
+	 */
+	protected <K,V> KafkaProducer<K,V> getKafkaProducer(Properties props) {
+		return new KafkaProducer<>(props);
+	}
+
 	// ----------------------------------- Utilities --------------------------
 	
 	/**
@@ -185,10 +225,10 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>
 {
 	 */
 	@Override
 	public void open(Configuration configuration) {
-		producer = new org.apache.kafka.clients.producer.KafkaProducer<>(this.producerConfig);
+		producer = getKafkaProducer(this.producerConfig);
 
 		RuntimeContext ctx = getRuntimeContext();
-		if(partitioner != null) {
+		if (partitioner != null) {
 			partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), partitions);
 		}
 
@@ -196,32 +236,40 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>
 {
 				ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), defaultTopicId);
 
 		// register Kafka metrics to Flink accumulators
-		if(!Boolean.valueOf(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
+		if (!Boolean.valueOf(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
 			Map<MetricName, ? extends Metric> metrics = this.producer.metrics();
 
-			if(metrics == null) {
+			if (metrics == null) {
 				// MapR's Kafka implementation returns null here.
 				LOG.info("Producer implementation does not support metrics");
 			} else {
-				for(Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
+				for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
 					String name = producerId + "-producer-" + metric.getKey().name();
 					DefaultKafkaMetricAccumulator kafkaAccumulator = DefaultKafkaMetricAccumulator.createFor(metric.getValue());
 					// best effort: we only add the accumulator if available.
-					if(kafkaAccumulator != null) {
+					if (kafkaAccumulator != null) {
 						getRuntimeContext().addAccumulator(name, kafkaAccumulator);
 					}
 				}
 			}
 		}
 
+		if (flushOnCheckpoint && !((StreamingRuntimeContext)this.getRuntimeContext()).isCheckpointingEnabled())
{
+			LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling
flushing.");
+			flushOnCheckpoint = false;
+		}
+		if (flushOnCheckpoint) {
+			pendingRecordsLock = new Object();
+		}
+
 		if (logFailuresOnly) {
 			callback = new Callback() {
-				
 				@Override
 				public void onCompletion(RecordMetadata metadata, Exception e) {
 					if (e != null) {
 						LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
 					}
+					acknowledgeMessage();
 				}
 			};
 		}
@@ -232,6 +280,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>
 {
 					if (exception != null && asyncException == null) {
 						asyncException = exception;
 					}
+					acknowledgeMessage();
 				}
 			};
 		}
@@ -251,17 +300,21 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>
 {
 		byte[] serializedKey = schema.serializeKey(next);
 		byte[] serializedValue = schema.serializeValue(next);
 		String targetTopic = schema.getTargetTopic(next);
-		if(targetTopic == null) {
+		if (targetTopic == null) {
 			targetTopic = defaultTopicId;
 		}
 
 		ProducerRecord<byte[], byte[]> record;
-		if(partitioner == null) {
+		if (partitioner == null) {
 			record = new ProducerRecord<>(targetTopic, serializedKey, serializedValue);
 		} else {
 			record = new ProducerRecord<>(targetTopic, partitioner.partition(next, serializedKey,
serializedValue, partitions.length), serializedKey, serializedValue);
 		}
-
+		if (flushOnCheckpoint) {
+			synchronized (pendingRecordsLock) {
+				pendingRecords++;
+			}
+		}
 		producer.send(record, callback);
 	}
 
@@ -276,6 +329,47 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>
 {
 		checkErroneous();
 	}
 
+	// ------------------- Logic for handling checkpoint flushing --------------------------
//
+
+	private void acknowledgeMessage() {
+		if (!flushOnCheckpoint) {
+			// the logic is disabled
+			return;
+		}
+		synchronized (pendingRecordsLock) {
+			pendingRecords--;
+			if (pendingRecords == 0) {
+				pendingRecordsLock.notifyAll();
+			}
+		}
+	}
+
+	/**
+	 * Flush pending records.
+	 */
+	protected abstract void flush();
+
+	@Override
+	public Serializable snapshotState(long checkpointId, long checkpointTimestamp) {
+		if (flushOnCheckpoint) {
+			// flushing is activated: We need to wait until pendingRecords is 0
+			flush();
+			synchronized (pendingRecordsLock) {
+				if (pendingRecords != 0) {
+					throw new IllegalStateException("Pending record count must be zero at this point: "
+ pendingRecords);
+				}
+				// pending records count is 0. We can now confirm the checkpoint
+			}
+		}
+		// return empty state
+		return null;
+	}
+
+	@Override
+	public void restoreState(Serializable state) {
+		// nothing to do here
+	}
+
 
 	// ----------------------------------- Utilities --------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7206b0ed/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
new file mode 100644
index 0000000..b02593c
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.junit.Assert;
+import org.junit.Test;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Test ensuring that the producer is not dropping buffered records
+ */
+@SuppressWarnings("unchecked")
+public class AtLeastOnceProducerTest {
+
+	// we set a timeout because the test will not finish if the logic is broken
+	@Test(timeout=5000)
+	public void testAtLeastOnceProducer() throws Throwable {
+		runTest(true);
+	}
+
+	// This test ensures that the actual test fails if the flushing is disabled
+	@Test(expected = AssertionError.class, timeout=5000)
+	public void ensureTestFails() throws Throwable {
+		runTest(false);
+	}
+
+	private void runTest(boolean flushOnCheckpoint) throws Throwable {
+		Properties props = new Properties();
+		final AtomicBoolean snapshottingFinished = new AtomicBoolean(false);
+		final TestingKafkaProducer<String> producer = new TestingKafkaProducer<>("someTopic",
new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props,
+				snapshottingFinished);
+		producer.setFlushOnCheckpoint(flushOnCheckpoint);
+		producer.setRuntimeContext(new MockRuntimeContext(0, 1));
+
+		producer.open(new Configuration());
+
+		for (int i = 0; i < 100; i++) {
+			producer.invoke("msg-" + i);
+		}
+		// start a thread confirming all pending records
+		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
+		final Thread threadA = Thread.currentThread();
+
+		Runnable confirmer = new Runnable() {
+			@Override
+			public void run() {
+				try {
+					MockProducer mp = producer.getProducerInstance();
+					List<Callback> pending = mp.getPending();
+
+					// we need to find out if the snapshot() method blocks forever
+					// this is not possible. If snapshot() is running, it will
+					// start removing elements from the pending list.
+					synchronized (threadA) {
+						threadA.wait(500L);
+					}
+					// we now check that no records have been confirmed yet
+					Assert.assertEquals(100, pending.size());
+					Assert.assertFalse("Snapshot method returned before all records were confirmed",
+							snapshottingFinished.get());
+
+					// now confirm all checkpoints
+					for (Callback c: pending) {
+						c.onCompletion(null, null);
+					}
+					pending.clear();
+				} catch(Throwable t) {
+					runnableError.f0 = t;
+				}
+			}
+		};
+		Thread threadB = new Thread(confirmer);
+		threadB.start();
+		// this should block:
+		producer.snapshotState(0, 0);
+		synchronized (threadA) {
+			threadA.notifyAll(); // just in case, to let the test fail faster
+		}
+		Assert.assertEquals(0, producer.getProducerInstance().getPending().size());
+		Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
+		while (deadline.hasTimeLeft() && threadB.isAlive()) {
+			threadB.join(500);
+		}
+		Assert.assertFalse("Thread A is expected to be finished at this point. If not, the test
is prone to fail", threadB.isAlive());
+		if (runnableError.f0 != null) {
+			throw runnableError.f0;
+		}
+
+		producer.close();
+	}
+
+
+	private static class TestingKafkaProducer<T> extends FlinkKafkaProducerBase<T>
{
+		private MockProducer prod;
+		private AtomicBoolean snapshottingFinished;
+
+		public TestingKafkaProducer(String defaultTopicId, KeyedSerializationSchema<T> serializationSchema,
Properties producerConfig, AtomicBoolean snapshottingFinished) {
+			super(defaultTopicId, serializationSchema, producerConfig, null);
+			this.snapshottingFinished = snapshottingFinished;
+		}
+
+		@Override
+		protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) {
+			this.prod = new MockProducer();
+			return this.prod;
+		}
+
+		@Override
+		public Serializable snapshotState(long checkpointId, long checkpointTimestamp) {
+			// call the actual snapshot state
+			Serializable ret = super.snapshotState(checkpointId, checkpointTimestamp);
+			// notify test that snapshotting has been done
+			snapshottingFinished.set(true);
+			return ret;
+		}
+
+		@Override
+		protected void flush() {
+			this.prod.flush();
+		}
+
+		public MockProducer getProducerInstance() {
+			return this.prod;
+		}
+	}
+
+	private static class MockProducer<K, V> extends KafkaProducer<K, V> {
+		List<Callback> pendingCallbacks = new ArrayList<>();
+
+		private static Properties getFakeProperties() {
+			Properties p = new Properties();
+			p.setProperty("bootstrap.servers", "localhost:12345");
+			p.setProperty("key.serializer", ByteArraySerializer.class.getName());
+			p.setProperty("value.serializer", ByteArraySerializer.class.getName());
+			return p;
+		}
+		public MockProducer() {
+			super(getFakeProperties());
+		}
+
+		@Override
+		public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
+			throw new UnsupportedOperationException("Unexpected");
+		}
+
+		@Override
+		public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
{
+			pendingCallbacks.add(callback);
+			return null;
+		}
+
+		@Override
+		public List<PartitionInfo> partitionsFor(String topic) {
+			List<PartitionInfo> list = new ArrayList<>();
+			list.add(new PartitionInfo(topic, 0, null, null, null));
+			return list;
+		}
+
+		@Override
+		public Map<MetricName, ? extends Metric> metrics() {
+			return null;
+		}
+
+
+		public List<Callback> getPending() {
+			return this.pendingCallbacks;
+		}
+
+		public void flush() {
+			while (pendingCallbacks.size() > 0) {
+				try {
+					Thread.sleep(10);
+				} catch (InterruptedException e) {
+					throw new RuntimeException("Unable to flush producer, task was interrupted");
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7206b0ed/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 220f061..d49b48b 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -268,7 +268,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		});
 		Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
 		producerProperties.setProperty("retries", "3");
-		stream.addSink(kafkaServer.getProducer(topic, new KeyedSerializationSchemaWrapper<>(sinkSchema),
producerProperties, null));
+		FlinkKafkaProducerBase<Tuple2<Long, String>> prod = kafkaServer.getProducer(topic,
new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null);
+		stream.addSink(prod);
 
 		// ----------- add consumer dataflow ----------
 


Mime
View raw message