kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-6086: Provide for custom error handling when Kafka Streams fails to produce
Date Fri, 15 Dec 2017 12:56:03 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 68712dcde -> 69777260e


KAFKA-6086: Provide for custom error handling when Kafka Streams fails to produce

This PR creates and implements the `ProductionExceptionHandler` as described in [KIP-210](https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce).

I've additionally provided a default implementation preserving the existing behavior. I fixed various compile errors in the tests that resulted from my changing of method signatures, and added tests to cover the new behavior.

Author: Matt Farmer <mfarmer@rsglab.com>
Author: Matt Farmer <matt@frmr.me>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Damian Guy <damian.guy@gmail.com>

Closes #4165 from farmdawgnation/msf/kafka-6086


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/69777260
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/69777260
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/69777260

Branch: refs/heads/trunk
Commit: 69777260e05ab12ee8480c23cd2e6acc6e218a12
Parents: 68712dc
Author: Matt Farmer <mfarmer@rsglab.com>
Authored: Fri Dec 15 12:53:17 2017 +0000
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Fri Dec 15 12:53:17 2017 +0000

----------------------------------------------------------------------
 .../org/apache/kafka/streams/StreamsConfig.java |  28 +++--
 .../DefaultProductionExceptionHandler.java      |  37 +++++++
 .../errors/ProductionExceptionHandler.java      |  59 +++++++++++
 .../internals/RecordCollectorImpl.java          |  86 ++++++++++++----
 .../streams/processor/internals/StreamTask.java |  10 +-
 ...lwaysContinueProductionExceptionHandler.java |  37 +++++++
 .../processor/internals/ProcessorNodeTest.java  |   5 +-
 .../internals/RecordCollectorTest.java          | 103 +++++++++++++++++--
 .../processor/internals/RecordQueueTest.java    |   3 +-
 .../processor/internals/SinkNodeTest.java       |   5 +-
 .../processor/internals/StreamTaskTest.java     |   4 +-
 .../streams/state/KeyValueStoreTestDriver.java  |   3 +-
 .../state/internals/RocksDBWindowStoreTest.java |  23 +++--
 .../state/internals/StoreChangeLoggerTest.java  |   3 +-
 .../apache/kafka/test/KStreamTestDriver.java    |   5 +-
 15 files changed, 350 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index d78fc0d..ecc8409 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -31,6 +31,8 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
 import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
+import org.apache.kafka.streams.errors.ProductionExceptionHandler;
+import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
@@ -73,18 +75,18 @@ import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
  *
  * StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
  * }</pre>
- * 
+ *
  * Kafka Streams requires at least the following properties to be set:
  * <ul>
  *  <li>{@link #APPLICATION_ID_CONFIG "application.id"}</li>
  *  <li>{@link #BOOTSTRAP_SERVERS_CONFIG "bootstrap.servers"}</li>
  * </ul>
- * 
+ *
  * By default, Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
  * <ul>
  *   <li>{@link ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG "enable.auto.commit"} (false) - Streams client will always disable/turn off auto committing</li>
  * </ul>
- * 
+ *
  * If {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} is set to {@link #EXACTLY_ONCE "exactly_once"}, Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
  * <ul>
  *   <li>{@link ConsumerConfig#ISOLATION_LEVEL_CONFIG "isolation.level"} (read_committed) - Consumers will always read committed data only</li>
@@ -184,6 +186,11 @@ public class StreamsConfig extends AbstractConfig {
     public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler";
     private static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface.";
 
+    /**
+     * {@code default.production.exception.handler}
+     */
+    private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.production.exception.handler";
+    private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.ProductionExceptionHandler</code> interface.";
 
     /** {@code default key.serde} */
     public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde";
@@ -361,6 +368,11 @@ public class StreamsConfig extends AbstractConfig {
                     Serdes.ByteArraySerde.class.getName(),
                     Importance.MEDIUM,
                     DEFAULT_KEY_SERDE_CLASS_DOC)
+            .define(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
+                    Type.CLASS,
+                    DefaultProductionExceptionHandler.class.getName(),
+                    Importance.MEDIUM,
+                    DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC)
             .define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
                     Type.CLASS,
                     FailOnInvalidTimestamp.class.getName(),
@@ -668,7 +680,7 @@ public class StreamsConfig extends AbstractConfig {
 
         checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
         checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
-        
+
         final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
         consumerProps.putAll(clientProvidedProps);
 
@@ -679,7 +691,7 @@ public class StreamsConfig extends AbstractConfig {
 
         return consumerProps;
     }
-                 
+
     private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Object> clientProvidedProps, final String[] nonConfigurableConfigs) {
         // Streams does not allow users to configure certain consumer/producer configurations, for example,
         // enable.auto.commit. In cases where user tries to override such non-configurable
@@ -715,7 +727,7 @@ public class StreamsConfig extends AbstractConfig {
 
         }
     }
-    
+
     /**
      * Get the configs to the {@link KafkaConsumer consumer}.
      * Properties using the prefix {@link #CONSUMER_PREFIX} will be used in favor over their non-prefixed versions
@@ -894,6 +906,10 @@ public class StreamsConfig extends AbstractConfig {
         return getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class);
     }
 
+    public ProductionExceptionHandler defaultProductionExceptionHandler() {
+        return getConfiguredInstance(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class);
+    }
+
     /**
      * Override any client properties in the original configs with overrides
      *

http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java
new file mode 100644
index 0000000..4fdb1a3
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+import java.util.Map;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+/**
+ * {@code ProductionExceptionHandler} that always instructs streams to fail when an exception
+ * happens while attempting to produce result records.
+ */
+public class DefaultProductionExceptionHandler implements ProductionExceptionHandler {
+    @Override
+    public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
+                                                     final Exception exception) {
+        return ProductionExceptionHandlerResponse.FAIL;
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs) {
+        // ignore
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
new file mode 100644
index 0000000..a24f9d2
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Configurable;
+
+/**
+ * Interface that specifies how an exception when attempting to produce a result to
+ * Kafka should be handled.
+ */
+public interface ProductionExceptionHandler extends Configurable {
+    /**
+     * Inspect a record that we attempted to produce, and the exception that resulted
+     * from attempting to produce it and determine whether or not to continue processing.
+     *
+     * @param record The record that failed to produce
+     * @param exception The exception that occurred during production
+     */
+    ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
+                                              final Exception exception);
+
+    enum ProductionExceptionHandlerResponse {
+        /* continue processing */
+        CONTINUE(0, "CONTINUE"),
+        /* fail processing */
+        FAIL(1, "FAIL");
+
+        /**
+         * an english description of the api--this is for debugging and can change
+         */
+        public final String name;
+
+        /**
+         * the permanent and immutable id of an API--this can't change ever
+         */
+        public final int id;
+
+        ProductionExceptionHandlerResponse(final int id,
+                                           final String name) {
+            this.id = id;
+            this.name = name;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 0cc2699..afdadf2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -23,11 +23,20 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.errors.ProductionExceptionHandler;
+import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.slf4j.Logger;
@@ -41,18 +50,25 @@ public class RecordCollectorImpl implements RecordCollector {
     private final Producer<byte[], byte[]> producer;
     private final Map<TopicPartition, Long> offsets;
     private final String logPrefix;
+    private final ProductionExceptionHandler productionExceptionHandler;
 
     private final static String LOG_MESSAGE = "Error sending record (key {} value {} timestamp {}) to topic {} due to {}; " +
         "No more records will be sent and no more offsets will be recorded for this task.";
     private final static String EXCEPTION_MESSAGE = "%sAbort sending since %s with a previous record (key %s value %s timestamp %d) to topic %s due to %s";
     private final static String PARAMETER_HINT = "\nYou can increase producer parameter `retries` and `retry.backoff.ms` to avoid this error.";
+    private final static String HANDLER_CONTINUED_MESSAGE = "Error sending records (key {} value {} timestamp {}) to topic {} due to {}; " +
+        "The exception handler chose to CONTINUE processing in spite of this error.";
     private volatile KafkaException sendException;
 
-    public RecordCollectorImpl(final Producer<byte[], byte[]> producer, final String streamTaskId, final LogContext logContext) {
+    public RecordCollectorImpl(final Producer<byte[], byte[]> producer,
+                               final String streamTaskId,
+                               final LogContext logContext,
+                               final ProductionExceptionHandler productionExceptionHandler) {
         this.producer = producer;
         this.offsets = new HashMap<>();
         this.logPrefix = String.format("task [%s] ", streamTaskId);
         this.log = logContext.logger(getClass());
+        this.productionExceptionHandler = productionExceptionHandler;
     }
 
     @Override
@@ -78,6 +94,46 @@ public class RecordCollectorImpl implements RecordCollector {
         send(topic, key, value, partition, timestamp, keySerializer, valueSerializer);
     }
 
+    private boolean productionExceptionIsFatal(final Exception exception) {
+        boolean securityException = exception instanceof AuthenticationException ||
+            exception instanceof AuthorizationException ||
+            exception instanceof SecurityDisabledException;
+
+        boolean communicationException = exception instanceof InvalidTopicException ||
+            exception instanceof UnknownServerException ||
+            exception instanceof SerializationException ||
+            exception instanceof OffsetMetadataTooLarge ||
+            exception instanceof IllegalStateException;
+
+        return securityException || communicationException;
+    }
+
+    private <K, V> void recordSendError(
+        final K key,
+        final V value,
+        final Long timestamp,
+        final String topic,
+        final Exception exception
+    ) {
+        String errorLogMessage = LOG_MESSAGE;
+        String errorMessage = EXCEPTION_MESSAGE;
+        if (exception instanceof RetriableException) {
+            errorLogMessage += PARAMETER_HINT;
+            errorMessage += PARAMETER_HINT;
+        }
+        log.error(errorLogMessage, key, value, timestamp, topic, exception);
+        sendException = new StreamsException(
+            String.format(errorMessage,
+                          logPrefix,
+                          "an error caught",
+                          key,
+                          value,
+                          timestamp,
+                          topic,
+                          exception.getMessage()),
+            exception);
+    }
+
     @Override
     public <K, V> void  send(final String topic,
                              final K key,
@@ -118,23 +174,13 @@ public class RecordCollectorImpl implements RecordCollector {
                                                   topic,
                                                   exception.getMessage()));
                             } else {
-                                String errorLogMessage = LOG_MESSAGE;
-                                String errorMessage = EXCEPTION_MESSAGE;
-                                if (exception instanceof RetriableException) {
-                                    errorLogMessage += PARAMETER_HINT;
-                                    errorMessage += PARAMETER_HINT;
+                                if (productionExceptionIsFatal(exception)) {
+                                    recordSendError(key, value, timestamp, topic, exception);
+                                } else if (productionExceptionHandler.handle(serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) {
+                                    recordSendError(key, value, timestamp, topic, exception);
+                                } else {
+                                    log.debug(HANDLER_CONTINUED_MESSAGE, key, value, timestamp, topic, exception);
                                 }
-                                log.error(errorLogMessage, key, value, timestamp, topic, exception);
-                                sendException = new StreamsException(
-                                    String.format(errorMessage,
-                                                  logPrefix,
-                                                  "an error caught",
-                                                  key,
-                                                  value,
-                                                  timestamp,
-                                                  topic,
-                                                  exception.getMessage()),
-                                    exception);
                             }
                         }
                     }
@@ -146,7 +192,7 @@ public class RecordCollectorImpl implements RecordCollector {
                 "its internal buffer fills up. " +
                 "You can increase producer parameter `max.block.ms` to increase this timeout.", topic);
             throw new StreamsException(String.format("%sFailed to send record to topic %s due to timeout.", logPrefix, topic));
-        } catch (final Exception fatalException) {
+        } catch (final Exception uncaughtException) {
             throw new StreamsException(
                 String.format(EXCEPTION_MESSAGE,
                               logPrefix,
@@ -155,8 +201,8 @@ public class RecordCollectorImpl implements RecordCollector {
                               value,
                               timestamp,
                               topic,
-                              fatalException.getMessage()),
-                fatalException);
+                              uncaughtException.getMessage()),
+                uncaughtException);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index f2fa448..7063e74 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.errors.ProductionExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.Cancellable;
@@ -118,7 +119,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         this.producer = producer;
         this.metrics = new TaskMetrics(metrics);
 
-        recordCollector = createRecordCollector(logContext);
+        final ProductionExceptionHandler productionExceptionHandler = config.defaultProductionExceptionHandler();
+
+        recordCollector = createRecordCollector(logContext, productionExceptionHandler);
         streamTimePunctuationQueue = new PunctuationQueue();
         systemTimePunctuationQueue = new PunctuationQueue();
         maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
@@ -645,7 +648,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     }
 
     // visible for testing only
-    RecordCollector createRecordCollector(final LogContext logContext) {
-        return new RecordCollectorImpl(producer, id.toString(), logContext);
+    RecordCollector createRecordCollector(final LogContext logContext,
+                                          final ProductionExceptionHandler productionExceptionHandler) {
+        return new RecordCollectorImpl(producer, id.toString(), logContext, productionExceptionHandler);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/test/java/org/apache/kafka/streams/errors/AlwaysContinueProductionExceptionHandler.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/errors/AlwaysContinueProductionExceptionHandler.java b/streams/src/test/java/org/apache/kafka/streams/errors/AlwaysContinueProductionExceptionHandler.java
new file mode 100644
index 0000000..111874d
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/errors/AlwaysContinueProductionExceptionHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+import java.util.Map;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+/**
+ * Production exception handler that always instructs streams to continue when an exception
+ * happens while attempting to produce result records.
+ */
+public class AlwaysContinueProductionExceptionHandler implements ProductionExceptionHandler {
+    @Override
+    public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
+                                                     final Exception exception) {
+        return ProductionExceptionHandlerResponse.CONTINUE;
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs) {
+        // ignore
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index fd9d070..90ef771 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -109,7 +110,7 @@ public class ProcessorNodeTest {
         final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
 
         final Metrics metrics = new Metrics();
-        final MockProcessorContext context = new MockProcessorContext(anyStateSerde,  new RecordCollectorImpl(null, null, new LogContext("processnode-test ")), metrics);
+        final MockProcessorContext context = new MockProcessorContext(anyStateSerde,  new RecordCollectorImpl(null, null, new LogContext("processnode-test "), new DefaultProductionExceptionHandler()), metrics);
         final ProcessorNode node = new ProcessorNode("name", new NoOpProcessor(), Collections.emptySet());
         node.init(context);
 
@@ -144,4 +145,4 @@ public class ProcessorNodeTest {
         context.close();
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 16400d5..39fe7ab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -29,6 +29,8 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.errors.AlwaysContinueProductionExceptionHandler;
+import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.junit.Test;
@@ -71,7 +73,7 @@ public class RecordCollectorTest {
 
         final RecordCollectorImpl collector = new RecordCollectorImpl(
                 new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
-                "RecordCollectorTest-TestSpecificPartition", new LogContext("RecordCollectorTest-TestSpecificPartition "));
+                "RecordCollectorTest-TestSpecificPartition", new LogContext("RecordCollectorTest-TestSpecificPartition "), new DefaultProductionExceptionHandler());
 
         collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
         collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
@@ -103,7 +105,7 @@ public class RecordCollectorTest {
 
         final RecordCollectorImpl collector = new RecordCollectorImpl(
                 new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
-                "RecordCollectorTest-TestStreamPartitioner", new LogContext("RecordCollectorTest-TestStreamPartitioner "));
+                "RecordCollectorTest-TestStreamPartitioner", new LogContext("RecordCollectorTest-TestStreamPartitioner "), new DefaultProductionExceptionHandler());
 
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
         collector.send("topic1", "9", "0", null, stringSerializer, stringSerializer, streamPartitioner);
@@ -135,14 +137,15 @@ public class RecordCollectorTest {
                     }
                 },
                 "test",
-                logContext);
+                logContext,
+                new DefaultProductionExceptionHandler());
 
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
     }
 
     @SuppressWarnings("unchecked")
     @Test
-    public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFails() {
+    public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFailsWithDefaultExceptionHandler() {
         final RecordCollector collector = new RecordCollectorImpl(
                 new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                     @Override
@@ -152,7 +155,8 @@ public class RecordCollectorTest {
                     }
                 },
                 "test",
-                logContext);
+                logContext,
+                new DefaultProductionExceptionHandler());
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
 
         try {
@@ -163,7 +167,7 @@ public class RecordCollectorTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void shouldThrowStreamsExceptionOnFlushIfASendFailed() {
+    public void shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContinueExceptionHandler() {
         final RecordCollector collector = new RecordCollectorImpl(
                 new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                     @Override
@@ -173,7 +177,27 @@ public class RecordCollectorTest {
                     }
                 },
                 "test",
-                logContext);
+                logContext,
+                new AlwaysContinueProductionExceptionHandler());
+        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+
+        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldThrowStreamsExceptionOnFlushIfASendFailedWithDefaultExceptionHandler() {
+        final RecordCollector collector = new RecordCollectorImpl(
+                new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+                    @Override
+                    public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
+                        callback.onCompletion(null, new Exception());
+                        return null;
+                    }
+                },
+                "test",
+                logContext,
+                new DefaultProductionExceptionHandler());
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
 
         try {
@@ -184,7 +208,26 @@ public class RecordCollectorTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void shouldThrowStreamsExceptionOnCloseIfASendFailed() {
+    public void shouldNotThrowStreamsExceptionOnFlushIfASendFailedWithContinueExceptionHandler() {
+        final RecordCollector collector = new RecordCollectorImpl(
+                new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+                    @Override
+                    public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
+                        callback.onCompletion(null, new Exception());
+                        return null;
+                    }
+                },
+                "test",
+                logContext,
+                new AlwaysContinueProductionExceptionHandler());
+        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+
+        collector.flush();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldThrowStreamsExceptionOnCloseIfASendFailedWithDefaultExceptionHandler() {
         final RecordCollector collector = new RecordCollectorImpl(
                 new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                     @Override
@@ -194,7 +237,8 @@ public class RecordCollectorTest {
                     }
                 },
                 "test",
-                logContext);
+                logContext,
+                new DefaultProductionExceptionHandler());
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
 
         try {
@@ -204,8 +248,44 @@ public class RecordCollectorTest {
     }
 
     @SuppressWarnings("unchecked")
+    @Test
+    public void shouldNotThrowStreamsExceptionOnCloseIfASendFailedWithContinueExceptionHandler() {
+        final RecordCollector collector = new RecordCollectorImpl(
+                new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+                    @Override
+                    public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
+                        callback.onCompletion(null, new Exception());
+                        return null;
+                    }
+                },
+                "test",
+                logContext,
+                new AlwaysContinueProductionExceptionHandler());
+        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+
+        collector.close();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(expected = StreamsException.class)
+    public void shouldThrowIfTopicIsUnknownWithDefaultExceptionHandler() {
+        final RecordCollector collector = new RecordCollectorImpl(
+            new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+                @Override
+                public List<PartitionInfo> partitionsFor(final String topic) {
+                    return Collections.EMPTY_LIST;
+                }
+
+            },
+            "test",
+            logContext,
+            new DefaultProductionExceptionHandler());
+        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+    }
+
+    @SuppressWarnings("unchecked")
     @Test(expected = StreamsException.class)
-    public void shouldThrowIfTopicIsUnknown() {
+    public void shouldThrowIfTopicIsUnknownWithContinueExceptionHandler() {
         final RecordCollector collector = new RecordCollectorImpl(
             new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                 @Override
@@ -215,7 +295,8 @@ public class RecordCollectorTest {
 
             },
             "test",
-                logContext);
+            logContext,
+            new AlwaysContinueProductionExceptionHandler());
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index d7697cb..2fa1b59 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
 import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -54,7 +55,7 @@ public class RecordQueueTest {
     private final String[] topics = {"topic"};
 
     final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
-            new RecordCollectorImpl(null, null,  new LogContext("record-queue-test ")));
+            new RecordCollectorImpl(null, null,  new LogContext("record-queue-test "), new DefaultProductionExceptionHandler()));
     private final MockSourceNode mockSourceNodeWithMetrics = new MockSourceNode<>(topics, intDeserializer, intDeserializer);
     private final RecordQueue queue = new RecordQueue(
         new TopicPartition(topics[0], 1),

http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index ef99b8a..6792740 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.MockProcessorContext;
@@ -37,7 +38,7 @@ public class SinkNodeTest {
     private final Serializer anySerializer = Serdes.Bytes().serializer();
     private final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
     private final MockProcessorContext context = new MockProcessorContext(anyStateSerde,
-        new RecordCollectorImpl(new MockProducer<byte[], byte[]>(true, anySerializer, anySerializer), null, new LogContext("sinknode-test ")));
+        new RecordCollectorImpl(new MockProducer<byte[], byte[]>(true, anySerializer, anySerializer), null, new LogContext("sinknode-test "), new DefaultProductionExceptionHandler()));
     private final SinkNode sink = new SinkNode<>("anyNodeName", "any-output-topic", anySerializer, anySerializer, null);
 
     @Before
@@ -114,4 +115,4 @@ public class SinkNodeTest {
         }
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 4aee8f5..a3ff328 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.ProductionExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
@@ -513,7 +514,8 @@ public class StreamTaskTest {
             changelogReader, config, streamsMetrics, stateDirectory, null, time, producer) {
 
             @Override
-            RecordCollector createRecordCollector(final LogContext logContext) {
+            RecordCollector createRecordCollector(final LogContext logContext,
+                                                  final ProductionExceptionHandler exHandler) {
                 return new NoOpRecordCollector() {
                     @Override
                     public void flush() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index bf433da..a342585 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
@@ -185,7 +186,7 @@ public class KeyValueStoreTestDriver<K, V> {
         final ByteArraySerializer rawSerializer = new ByteArraySerializer();
         final Producer<byte[], byte[]> producer = new MockProducer<>(true, rawSerializer, rawSerializer);
 
-        final RecordCollector recordCollector = new RecordCollectorImpl(producer, "KeyValueStoreTestDriver", new LogContext("KeyValueStoreTestDriver ")) {
+        final RecordCollector recordCollector = new RecordCollectorImpl(producer, "KeyValueStoreTestDriver", new LogContext("KeyValueStoreTestDriver "), new DefaultProductionExceptionHandler()) {
             @Override
             public <K1, V1> void send(final String topic,
                                       final K1 key,

http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 3b7e2c4..39c8f03 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -76,7 +77,7 @@ public class RocksDBWindowStoreTest {
     private final ThreadCache cache = new ThreadCache(new LogContext("TestCache "), DEFAULT_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
 
     private final Producer<byte[], byte[]> producer = new MockProducer<>(true, Serdes.ByteArray().serializer(), Serdes.ByteArray().serializer());
-    private final RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTestTask", new LogContext("RocksDBWindowStoreTestTask ")) {
+    private final RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTestTask", new LogContext("RocksDBWindowStoreTestTask "), new DefaultProductionExceptionHandler()) {
         @Override
         public <K1, V1> void send(final String topic,
                                   K1 key,
@@ -190,51 +191,51 @@ public class RocksDBWindowStoreTest {
         assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
         assertNull(entriesByKey.get(6));
     }
-    
+
     @SuppressWarnings("unchecked")
     @Test
     public void shouldGetAll() throws IOException {
         windowStore = createWindowStore(context, false, true);
         long startTime = segmentSize - 4L;
-        
+
         putFirstBatch(windowStore, startTime, context);
-        
+
         final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
         final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
         final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
         final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
         final KeyValue<Windowed<Integer>, String> five = windowedPair(5, "five", startTime + 5);
-        
+
         assertEquals(
                 Utils.mkList(zero, one, two, four, five),
                 StreamsTestUtils.toList(windowStore.all())
         );
     }
-    
+
     @SuppressWarnings("unchecked")
     @Test
     public void shouldFetchAllInTimeRange() throws IOException {
         windowStore = createWindowStore(context, false, true);
         long startTime = segmentSize - 4L;
-        
+
         putFirstBatch(windowStore, startTime, context);
-        
+
         final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
         final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
         final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
         final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
         final KeyValue<Windowed<Integer>, String> five = windowedPair(5, "five", startTime + 5);
-        
+
         assertEquals(
                 Utils.mkList(one, two, four),
                 StreamsTestUtils.toList(windowStore.fetchAll(startTime + 1, startTime + 4))
         );
-        
+
         assertEquals(
                 Utils.mkList(zero, one, two),
                 StreamsTestUtils.toList(windowStore.fetchAll(startTime + 0, startTime + 3))
         );
-        
+
         assertEquals(
                 Utils.mkList(one, two, four, five),
                 StreamsTestUtils.toList(windowStore.fetchAll(startTime + 1, startTime + 5))

http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 8749e92..32b56bb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -39,7 +40,7 @@ public class StoreChangeLoggerTest {
     private final Map<Integer, String> logged = new HashMap<>();
 
     private final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
-            new RecordCollectorImpl(null, "StoreChangeLoggerTest", new LogContext("StoreChangeLoggerTest ")) {
+            new RecordCollectorImpl(null, "StoreChangeLoggerTest", new LogContext("StoreChangeLoggerTest "), new DefaultProductionExceptionHandler()) {
                 @Override
                 public <K1, V1> void send(final String topic,
                                           final K1 key,

http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 7058f2f..3a9ed75 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
@@ -135,7 +136,7 @@ public class KStreamTestDriver extends ExternalResource {
         }
         initTopology(topology, topology.stateStores());
     }
-    
+
     @Override
     protected void after() {
         if (topology != null) {
@@ -275,7 +276,7 @@ public class KStreamTestDriver extends ExternalResource {
 
     private class MockRecordCollector extends RecordCollectorImpl {
         MockRecordCollector() {
-            super(null, "KStreamTestDriver", new LogContext("KStreamTestDriver "));
+            super(null, "KStreamTestDriver", new LogContext("KStreamTestDriver "), new DefaultProductionExceptionHandler());
         }
 
         @Override


Mime
View raw message