kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5006: Improve thrown exception error logs
Date Wed, 02 Aug 2017 21:34:23 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk db306ec36 -> 630e9c567


KAFKA-5006: Improve thrown exception error logs

1. Only log an ERROR on the first encountered exception from the callback.

2. Wrap the exception message with the first thrown message information, and throw the exception
whenever `checkException` is called.

Therefore, for the `store.put` call, it will throw a `KafkaException` with the error message
a bit more intuitive.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Xavier Léauté <xavier@confluent.io>, Matthias J. Sax <matthias@confluent.io>,
Damian Guy <damian.guy@gmail.com>

Closes #3534 from guozhangwang/K5006-exception-record-collector


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/630e9c56
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/630e9c56
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/630e9c56

Branch: refs/heads/trunk
Commit: 630e9c5679efa0916a7bfca849dfc8850737681c
Parents: db306ec
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Wed Aug 2 14:34:20 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Aug 2 14:34:20 2017 -0700

----------------------------------------------------------------------
 .../internals/RecordCollectorImpl.java          | 27 ++++++++++----------
 1 file changed, 14 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/630e9c56/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index d49cf58..79e3350 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
@@ -44,8 +45,8 @@ public class RecordCollectorImpl implements RecordCollector {
     private final Producer<byte[], byte[]> producer;
     private final Map<TopicPartition, Long> offsets;
     private final String logPrefix;
-    private volatile Exception sendException;
 
+    private volatile KafkaException sendException;
 
     public RecordCollectorImpl(final Producer<byte[], byte[]> producer, final String
streamTaskId) {
         this.producer = producer;
@@ -106,11 +107,15 @@ public class RecordCollectorImpl implements RecordCollector {
                             offsets.put(tp, metadata.offset());
                         } else {
                             if (sendException == null) {
-                                sendException = exception;
-                                if (sendException instanceof ProducerFencedException) {
-                                    log.error("{} Error sending record to topic {}. No more
offsets will be recorded for this task and it will be closed as it is a zombie.", logPrefix,
topic, exception);
+                                log.error("{} Error sending record (key {} value {} timestamp
{}) to topic {} due to {}; " +
+                                                "No more records will be sent and no more
offsets will be recorded for this task.",
+                                        logPrefix, key, value, timestamp, topic, exception);
+                                if (exception instanceof ProducerFencedException) {
+                                    sendException = new ProducerFencedException(String.format("%s
Abort sending since producer got fenced with a previous record (key %s value %s timestamp
%d) to topic %s, error message: %s",
+                                            logPrefix, key, value, timestamp, topic, exception.getMessage()));
                                 } else {
-                                    log.error("{} Error sending record to topic {}. No more
offsets will be recorded for this task and the exception will eventually be thrown", logPrefix,
topic, exception);
+                                    sendException = new StreamsException(String.format("%s
Abort sending since an error caught with a previous record (key %s value %s timestamp %d)
to topic %s due to %s.",
+                                            logPrefix, key, value, timestamp, topic, exception),
exception);
                                 }
                             }
                         }
@@ -119,21 +124,17 @@ public class RecordCollectorImpl implements RecordCollector {
                 return;
             } catch (final TimeoutException e) {
                 if (attempt == MAX_SEND_ATTEMPTS) {
-                    throw new StreamsException(String.format("%s Failed to send record to
topic %s after %d attempts", logPrefix, topic, attempt));
+                    throw new StreamsException(String.format("%s Failed to send record to
topic %s due to timeout after %d attempts", logPrefix, topic, attempt));
                 }
-                log.warn("{} Timeout exception caught when sending record to topic {} attempt
{}", logPrefix, topic, attempt);
+                log.warn("{} Timeout exception caught when sending record to topic {}; retrying
with {} attempt", logPrefix, topic, attempt);
                 Utils.sleep(SEND_RETRY_BACKOFF);
             }
-
         }
     }
 
-    private void checkForException() {
+    private void checkForException()  {
         if (sendException != null) {
-            if (sendException instanceof ProducerFencedException) {
-                throw (ProducerFencedException) sendException;
-            }
-            throw new StreamsException(String.format("%s exception caught when producing",
logPrefix), sendException);
+            throw sendException;
         }
     }
 


Mime
View raw message