flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-5728] [kafka] Let FlinkKafkaProducers flush on checkpoints by default
Date Fri, 16 Feb 2018 08:39:54 GMT
Repository: flink
Updated Branches:
  refs/heads/master 52751159c -> 93176e5e8


[FLINK-5728] [kafka] Let FlinkKafkaProducers flush on checkpoints by default


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/93176e5e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/93176e5e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/93176e5e

Branch: refs/heads/master
Commit: 93176e5e889d0d51c4d06145a261ed89b06a8381
Parents: 5275115
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Thu Feb 15 16:12:22 2018 +0800
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Fri Feb 16 09:37:19 2018 +0100

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md                                   | 6 +++---
 .../streaming/connectors/kafka/FlinkKafkaProducerBase.java     | 2 +-
 .../streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java | 3 ++-
 3 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/93176e5e/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 8e38146..f28195c 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -542,14 +542,14 @@ methods `setLogFailuresOnly(boolean)` and `setFlushOnCheckpoint(boolean)`
approp
  instead of catching and rethrowing them. This essentially accounts the record
  to have succeeded, even if it was never written to the target Kafka topic. This
  must be disabled for at-least-once.
- * `setFlushOnCheckpoint(boolean)`: by default, this is set to `false`.
+ * `setFlushOnCheckpoint(boolean)`: by default, this is set to `true`.
  With this enabled, Flink's checkpoints will wait for any
  on-the-fly records at the time of the checkpoint to be acknowledged by Kafka before
  succeeding the checkpoint. This ensures that all records before the checkpoint have
  been written to Kafka. This must be enabled for at-least-once.
  
-In conclusion, to configure the Kafka producer to have at-least-once guarantees for versions
-0.9 and 0.10, `setLogFailureOnly` must be set to `false` and `setFlushOnCheckpoint` must
be set
+In conclusion, the Kafka producer by default has at-least-once guarantees for versions
+0.9 and 0.10, with `setLogFailureOnly` set to `false` and `setFlushOnCheckpoint` set
 to `true`.
 
 **Note**: By default, the number of retries is set to "0". This means that when `setLogFailuresOnly`
is set to `false`,

http://git-wip-us.apache.org/repos/asf/flink/blob/93176e5e/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index a4437d4..7ec7578 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -112,7 +112,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>
im
 	/**
 	 * If true, the producer will wait until all outstanding records have been send to the broker.
 	 */
-	protected boolean flushOnCheckpoint;
+	protected boolean flushOnCheckpoint = true;
 
 	// -------------------------------- Runtime fields ------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/93176e5e/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
index ad118ae..708b3be 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
 import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -96,7 +97,7 @@ public class FlinkKafkaProducerBaseTest {
 	public void testPartitionerInvokedWithDeterminatePartitionList() throws Exception {
 		FlinkKafkaPartitioner<String> mockPartitioner = mock(FlinkKafkaPartitioner.class);
 
-		RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
+		RuntimeContext mockRuntimeContext = mock(StreamingRuntimeContext.class);
 		when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
 		when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
 


Mime
View raw message