kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-2778: Use zero loss settings by default for Connect source producers.
Date Mon, 09 Nov 2015 18:37:27 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.9.0 e627558a5 -> 4069011ee


KAFKA-2778: Use zero loss settings by default for Connect source producers.

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Gwen Shapira

Closes #459 from ewencp/kafka-2778-connect-source-zero-loss-settings

(cherry picked from commit 13ba57dcfc45e60de281cc55125e7446322308ba)
Signed-off-by: Gwen Shapira <cshapi@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4069011e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4069011e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4069011e

Branch: refs/heads/0.9.0
Commit: 4069011ee4dc0f3500190bb93a3b79180cd34eda
Parents: e627558
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Mon Nov 9 10:36:57 2015 -0800
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Mon Nov 9 10:37:14 2015 -0800

----------------------------------------------------------------------
 .../java/org/apache/kafka/connect/runtime/Worker.java   |  9 +++++++++
 .../apache/kafka/connect/runtime/WorkerSourceTask.java  | 12 ++++++++++--
 2 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4069011e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 2e359d6..359a79c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -92,6 +92,15 @@ public class Worker {
         producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG),
","));
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+
+        // These settings are designed to ensure there is no data loss. They *may* be overridden
via configs passed to the
+        // worker, but this may compromise the delivery guarantees of Kafka Connect.
+        producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, ((Integer) Integer.MAX_VALUE).toString());
+        producerProps.put(ProducerConfig.RETRIES_CONFIG, ((Integer) Integer.MAX_VALUE).toString());
+        producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, ((Long) Long.MAX_VALUE).toString());
+        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
+
         producerProps.putAll(config.unusedConfigs());
 
         producer = new KafkaProducer<>(producerProps);

http://git-wip-us.apache.org/repos/asf/kafka/blob/4069011e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 141e430..6cf1dd7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -122,7 +122,7 @@ class WorkerSourceTask implements WorkerTask {
      * @param records
      */
     private synchronized void sendRecords(List<SourceRecord> records) {
-        for (SourceRecord record : records) {
+        for (final SourceRecord record : records) {
             byte[] key = keyConverter.fromConnectData(record.topic(), record.keySchema(),
record.key());
             byte[] value = valueConverter.fromConnectData(record.topic(), record.valueSchema(),
record.value());
             final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(record.topic(),
record.kafkaPartition(), key, value);
@@ -138,7 +138,15 @@ class WorkerSourceTask implements WorkerTask {
                         @Override
                         public void onCompletion(RecordMetadata recordMetadata, Exception
e) {
                             if (e != null) {
-                                log.error("Failed to send record: ", e);
+                                // Given the default settings for zero data loss, this should
basically never happen --
+                                // between "infinite" retries, indefinite blocking on full
buffers, and "infinite" request
+                                // timeouts, callbacks with exceptions should never be invoked
in practice. If the
+                                // user overrode these settings, the best we can do is notify
them of the failure via
+                                // logging.
+                                log.error("{} failed to send record to {}: {}", id, record.topic(),
e);
+                                log.debug("Failed record: topic {}, Kafka partition {}, key
{}, value {}, source offset {}, source partition {}",
+                                        record.topic(), record.kafkaPartition(), record.key(),
record.value(),
+                                        record.sourceOffset(), record.sourcePartition());
                             } else {
                                 log.trace("Wrote record successfully: topic {} partition
{} offset {}",
                                         recordMetadata.topic(), recordMetadata.partition(),


Mime
View raw message