Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 72612200D5C for ; Fri, 15 Dec 2017 13:56:08 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 70F83160C14; Fri, 15 Dec 2017 12:56:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 98B6C160C06 for ; Fri, 15 Dec 2017 13:56:06 +0100 (CET) Received: (qmail 67797 invoked by uid 500); 15 Dec 2017 12:56:05 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 67788 invoked by uid 99); 15 Dec 2017 12:56:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Dec 2017 12:56:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3DF25DFD7B; Fri, 15 Dec 2017 12:56:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: damianguy@apache.org To: commits@kafka.apache.org Message-Id: <39e5488ba8b64438a3cbe53d6df205af@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-6086: Provide for custom error handling when Kafka Streams fails to produce Date: Fri, 15 Dec 2017 12:56:03 +0000 (UTC) archived-at: Fri, 15 Dec 2017 12:56:08 -0000 Repository: kafka Updated Branches: refs/heads/trunk 68712dcde -> 69777260e KAFKA-6086: Provide for custom error handling when Kafka Streams fails to produce This PR creates and implements the `ProductionExceptionHandler` as described in [KIP-210](https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce). I've additionally provided a default implementation preserving the existing behavior. I fixed various compile errors in the tests that resulted from my changing of method signatures, and added tests to cover the new behavior. Author: Matt Farmer Author: Matt Farmer Reviewers: Matthias J. Sax , Bill Bejeck , Damian Guy Closes #4165 from farmdawgnation/msf/kafka-6086 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/69777260 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/69777260 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/69777260 Branch: refs/heads/trunk Commit: 69777260e05ab12ee8480c23cd2e6acc6e218a12 Parents: 68712dc Author: Matt Farmer Authored: Fri Dec 15 12:53:17 2017 +0000 Committer: Damian Guy Committed: Fri Dec 15 12:53:17 2017 +0000 ---------------------------------------------------------------------- .../org/apache/kafka/streams/StreamsConfig.java | 28 +++-- .../DefaultProductionExceptionHandler.java | 37 +++++++ .../errors/ProductionExceptionHandler.java | 59 +++++++++++ .../internals/RecordCollectorImpl.java | 86 ++++++++++++---- .../streams/processor/internals/StreamTask.java | 10 +- ...lwaysContinueProductionExceptionHandler.java | 37 +++++++ .../processor/internals/ProcessorNodeTest.java | 5 +- .../internals/RecordCollectorTest.java | 103 +++++++++++++++++-- .../processor/internals/RecordQueueTest.java | 3 +- .../processor/internals/SinkNodeTest.java | 5 +- .../processor/internals/StreamTaskTest.java | 4 +- .../streams/state/KeyValueStoreTestDriver.java | 3 +- .../state/internals/RocksDBWindowStoreTest.java | 23 +++-- .../state/internals/StoreChangeLoggerTest.java | 3 +- .../apache/kafka/test/KStreamTestDriver.java | 5 +- 15 files changed, 350 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index d78fc0d..ecc8409 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -31,6 +31,8 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.errors.LogAndFailExceptionHandler; +import org.apache.kafka.streams.errors.ProductionExceptionHandler; +import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.DefaultPartitionGrouper; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; @@ -73,18 +75,18 @@ import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED; * * StreamsConfig streamsConfig = new StreamsConfig(streamsProperties); * } - * + * * Kafka Streams requires at least the following properties to be set: *
    *
  • {@link #APPLICATION_ID_CONFIG "application.id"}
  • *
  • {@link #BOOTSTRAP_SERVERS_CONFIG "bootstrap.servers"}
  • *
- * + * * By default, Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses): *
    *
  • {@link ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG "enable.auto.commit"} (false) - Streams client will always disable/turn off auto committing
  • *
- * + * * If {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} is set to {@link #EXACTLY_ONCE "exactly_once"}, Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses): *
    *
  • {@link ConsumerConfig#ISOLATION_LEVEL_CONFIG "isolation.level"} (read_committed) - Consumers will always read committed data only
  • @@ -184,6 +186,11 @@ public class StreamsConfig extends AbstractConfig { public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler"; private static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the org.apache.kafka.streams.errors.DeserializationExceptionHandler interface."; + /** + * {@code default.production.exception.handler} + */ + private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.production.exception.handler"; + private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the org.apache.kafka.streams.errors.ProductionExceptionHandler interface."; /** {@code default key.serde} */ public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde"; @@ -361,6 +368,11 @@ public class StreamsConfig extends AbstractConfig { Serdes.ByteArraySerde.class.getName(), Importance.MEDIUM, DEFAULT_KEY_SERDE_CLASS_DOC) + .define(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, + Type.CLASS, + DefaultProductionExceptionHandler.class.getName(), + Importance.MEDIUM, + DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC) .define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, Type.CLASS, FailOnInvalidTimestamp.class.getName(), @@ -668,7 +680,7 @@ public class StreamsConfig extends AbstractConfig { checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS); checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS); - + final Map consumerProps = new HashMap<>(eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES); consumerProps.putAll(clientProvidedProps); @@ -679,7 +691,7 @@ public class StreamsConfig extends AbstractConfig { return consumerProps; } - + private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map clientProvidedProps, final String[] nonConfigurableConfigs) { // Streams does not allow users to configure certain consumer/producer configurations, for example, // enable.auto.commit. In cases where user tries to override such non-configurable @@ -715,7 +727,7 @@ public class StreamsConfig extends AbstractConfig { } } - + /** * Get the configs to the {@link KafkaConsumer consumer}. * Properties using the prefix {@link #CONSUMER_PREFIX} will be used in favor over their non-prefixed versions @@ -894,6 +906,10 @@ public class StreamsConfig extends AbstractConfig { return getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); } + public ProductionExceptionHandler defaultProductionExceptionHandler() { + return getConfiguredInstance(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class); + } + /** * Override any client properties in the original configs with overrides * http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java new file mode 100644 index 0000000..4fdb1a3 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +import java.util.Map; +import org.apache.kafka.clients.producer.ProducerRecord; + +/** + * {@code ProductionExceptionHandler} that always instructs streams to fail when an exception + * happens while attempting to produce result records. + */ +public class DefaultProductionExceptionHandler implements ProductionExceptionHandler { + @Override + public ProductionExceptionHandlerResponse handle(final ProducerRecord record, + final Exception exception) { + return ProductionExceptionHandlerResponse.FAIL; + } + + @Override + public void configure(final Map configs) { + // ignore + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java new file mode 100644 index 0000000..a24f9d2 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Configurable; + +/** + * Interface that specifies how an exception when attempting to produce a result to + * Kafka should be handled. + */ +public interface ProductionExceptionHandler extends Configurable { + /** + * Inspect a record that we attempted to produce, and the exception that resulted + * from attempting to produce it and determine whether or not to continue processing. + * + * @param record The record that failed to produce + * @param exception The exception that occurred during production + */ + ProductionExceptionHandlerResponse handle(final ProducerRecord record, + final Exception exception); + + enum ProductionExceptionHandlerResponse { + /* continue processing */ + CONTINUE(0, "CONTINUE"), + /* fail processing */ + FAIL(1, "FAIL"); + + /** + * an english description of the api--this is for debugging and can change + */ + public final String name; + + /** + * the permanent and immutable id of an API--this can't change ever + */ + public final int id; + + ProductionExceptionHandlerResponse(final int id, + final String name) { + this.id = id; + this.name = name; + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 0cc2699..afdadf2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -23,11 +23,20 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.OffsetMetadataTooLarge; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.errors.ProductionExceptionHandler; +import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StreamPartitioner; import org.slf4j.Logger; @@ -41,18 +50,25 @@ public class RecordCollectorImpl implements RecordCollector { private final Producer producer; private final Map offsets; private final String logPrefix; + private final ProductionExceptionHandler productionExceptionHandler; private final static String LOG_MESSAGE = "Error sending record (key {} value {} timestamp {}) to topic {} due to {}; " + "No more records will be sent and no more offsets will be recorded for this task."; private final static String EXCEPTION_MESSAGE = "%sAbort sending since %s with a previous record (key %s value %s timestamp %d) to topic %s due to %s"; private final static String PARAMETER_HINT = "\nYou can increase producer parameter `retries` and `retry.backoff.ms` to avoid this error."; + private final static String HANDLER_CONTINUED_MESSAGE = "Error sending records (key {} value {} timestamp {}) to topic {} due to {}; " + + "The exception handler chose to CONTINUE processing in spite of this error."; private volatile KafkaException sendException; - public RecordCollectorImpl(final Producer producer, final String streamTaskId, final LogContext logContext) { + public RecordCollectorImpl(final Producer producer, + final String streamTaskId, + final LogContext logContext, + final ProductionExceptionHandler productionExceptionHandler) { this.producer = producer; this.offsets = new HashMap<>(); this.logPrefix = String.format("task [%s] ", streamTaskId); this.log = logContext.logger(getClass()); + this.productionExceptionHandler = productionExceptionHandler; } @Override @@ -78,6 +94,46 @@ public class RecordCollectorImpl implements RecordCollector { send(topic, key, value, partition, timestamp, keySerializer, valueSerializer); } + private boolean productionExceptionIsFatal(final Exception exception) { + boolean securityException = exception instanceof AuthenticationException || + exception instanceof AuthorizationException || + exception instanceof SecurityDisabledException; + + boolean communicationException = exception instanceof InvalidTopicException || + exception instanceof UnknownServerException || + exception instanceof SerializationException || + exception instanceof OffsetMetadataTooLarge || + exception instanceof IllegalStateException; + + return securityException || communicationException; + } + + private void recordSendError( + final K key, + final V value, + final Long timestamp, + final String topic, + final Exception exception + ) { + String errorLogMessage = LOG_MESSAGE; + String errorMessage = EXCEPTION_MESSAGE; + if (exception instanceof RetriableException) { + errorLogMessage += PARAMETER_HINT; + errorMessage += PARAMETER_HINT; + } + log.error(errorLogMessage, key, value, timestamp, topic, exception); + sendException = new StreamsException( + String.format(errorMessage, + logPrefix, + "an error caught", + key, + value, + timestamp, + topic, + exception.getMessage()), + exception); + } + @Override public void send(final String topic, final K key, @@ -118,23 +174,13 @@ public class RecordCollectorImpl implements RecordCollector { topic, exception.getMessage())); } else { - String errorLogMessage = LOG_MESSAGE; - String errorMessage = EXCEPTION_MESSAGE; - if (exception instanceof RetriableException) { - errorLogMessage += PARAMETER_HINT; - errorMessage += PARAMETER_HINT; + if (productionExceptionIsFatal(exception)) { + recordSendError(key, value, timestamp, topic, exception); + } else if (productionExceptionHandler.handle(serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) { + recordSendError(key, value, timestamp, topic, exception); + } else { + log.debug(HANDLER_CONTINUED_MESSAGE, key, value, timestamp, topic, exception); } - log.error(errorLogMessage, key, value, timestamp, topic, exception); - sendException = new StreamsException( - String.format(errorMessage, - logPrefix, - "an error caught", - key, - value, - timestamp, - topic, - exception.getMessage()), - exception); } } } @@ -146,7 +192,7 @@ public class RecordCollectorImpl implements RecordCollector { "its internal buffer fills up. " + "You can increase producer parameter `max.block.ms` to increase this timeout.", topic); throw new StreamsException(String.format("%sFailed to send record to topic %s due to timeout.", logPrefix, topic)); - } catch (final Exception fatalException) { + } catch (final Exception uncaughtException) { throw new StreamsException( String.format(EXCEPTION_MESSAGE, logPrefix, @@ -155,8 +201,8 @@ public class RecordCollectorImpl implements RecordCollector { value, timestamp, topic, - fatalException.getMessage()), - fatalException); + uncaughtException.getMessage()), + uncaughtException); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index f2fa448..7063e74 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.errors.ProductionExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.Cancellable; @@ -118,7 +119,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator this.producer = producer; this.metrics = new TaskMetrics(metrics); - recordCollector = createRecordCollector(logContext); + final ProductionExceptionHandler productionExceptionHandler = config.defaultProductionExceptionHandler(); + + recordCollector = createRecordCollector(logContext, productionExceptionHandler); streamTimePunctuationQueue = new PunctuationQueue(); systemTimePunctuationQueue = new PunctuationQueue(); maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG); @@ -645,7 +648,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator } // visible for testing only - RecordCollector createRecordCollector(final LogContext logContext) { - return new RecordCollectorImpl(producer, id.toString(), logContext); + RecordCollector createRecordCollector(final LogContext logContext, + final ProductionExceptionHandler productionExceptionHandler) { + return new RecordCollectorImpl(producer, id.toString(), logContext, productionExceptionHandler); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/test/java/org/apache/kafka/streams/errors/AlwaysContinueProductionExceptionHandler.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/errors/AlwaysContinueProductionExceptionHandler.java b/streams/src/test/java/org/apache/kafka/streams/errors/AlwaysContinueProductionExceptionHandler.java new file mode 100644 index 0000000..111874d --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/errors/AlwaysContinueProductionExceptionHandler.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +import java.util.Map; +import org.apache.kafka.clients.producer.ProducerRecord; + +/** + * Production exception handler that always instructs streams to continue when an exception + * happens while attempting to produce result records. + */ +public class AlwaysContinueProductionExceptionHandler implements ProductionExceptionHandler { + @Override + public ProductionExceptionHandlerResponse handle(final ProducerRecord record, + final Exception exception) { + return ProductionExceptionHandlerResponse.CONTINUE; + } + + @Override + public void configure(final Map configs) { + // ignore + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index fd9d070..90ef771 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; @@ -109,7 +110,7 @@ public class ProcessorNodeTest { final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class); final Metrics metrics = new Metrics(); - final MockProcessorContext context = new MockProcessorContext(anyStateSerde, new RecordCollectorImpl(null, null, new LogContext("processnode-test ")), metrics); + final MockProcessorContext context = new MockProcessorContext(anyStateSerde, new RecordCollectorImpl(null, null, new LogContext("processnode-test "), new DefaultProductionExceptionHandler()), metrics); final ProcessorNode node = new ProcessorNode("name", new NoOpProcessor(), Collections.emptySet()); node.init(context); @@ -144,4 +145,4 @@ public class ProcessorNodeTest { context.close(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 16400d5..39fe7ab 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -29,6 +29,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.errors.AlwaysContinueProductionExceptionHandler; +import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StreamPartitioner; import org.junit.Test; @@ -71,7 +73,7 @@ public class RecordCollectorTest { final RecordCollectorImpl collector = new RecordCollectorImpl( new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer), - "RecordCollectorTest-TestSpecificPartition", new LogContext("RecordCollectorTest-TestSpecificPartition ")); + "RecordCollectorTest-TestSpecificPartition", new LogContext("RecordCollectorTest-TestSpecificPartition "), new DefaultProductionExceptionHandler()); collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer); collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer); @@ -103,7 +105,7 @@ public class RecordCollectorTest { final RecordCollectorImpl collector = new RecordCollectorImpl( new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer), - "RecordCollectorTest-TestStreamPartitioner", new LogContext("RecordCollectorTest-TestStreamPartitioner ")); + "RecordCollectorTest-TestStreamPartitioner", new LogContext("RecordCollectorTest-TestStreamPartitioner "), new DefaultProductionExceptionHandler()); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); collector.send("topic1", "9", "0", null, stringSerializer, stringSerializer, streamPartitioner); @@ -135,14 +137,15 @@ public class RecordCollectorTest { } }, "test", - logContext); + logContext, + new DefaultProductionExceptionHandler()); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); } @SuppressWarnings("unchecked") @Test - public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFails() { + public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFailsWithDefaultExceptionHandler() { final RecordCollector collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override @@ -152,7 +155,8 @@ public class RecordCollectorTest { } }, "test", - logContext); + logContext, + new DefaultProductionExceptionHandler()); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); try { @@ -163,7 +167,7 @@ public class RecordCollectorTest { @SuppressWarnings("unchecked") @Test - public void shouldThrowStreamsExceptionOnFlushIfASendFailed() { + public void shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContinueExceptionHandler() { final RecordCollector collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override @@ -173,7 +177,27 @@ public class RecordCollectorTest { } }, "test", - logContext); + logContext, + new AlwaysContinueProductionExceptionHandler()); + collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); + + collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldThrowStreamsExceptionOnFlushIfASendFailedWithDefaultExceptionHandler() { + final RecordCollector collector = new RecordCollectorImpl( + new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + @Override + public synchronized Future send(final ProducerRecord record, final Callback callback) { + callback.onCompletion(null, new Exception()); + return null; + } + }, + "test", + logContext, + new DefaultProductionExceptionHandler()); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); try { @@ -184,7 +208,26 @@ public class RecordCollectorTest { @SuppressWarnings("unchecked") @Test - public void shouldThrowStreamsExceptionOnCloseIfASendFailed() { + public void shouldNotThrowStreamsExceptionOnFlushIfASendFailedWithContinueExceptionHandler() { + final RecordCollector collector = new RecordCollectorImpl( + new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + @Override + public synchronized Future send(final ProducerRecord record, final Callback callback) { + callback.onCompletion(null, new Exception()); + return null; + } + }, + "test", + logContext, + new AlwaysContinueProductionExceptionHandler()); + collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); + + collector.flush(); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldThrowStreamsExceptionOnCloseIfASendFailedWithDefaultExceptionHandler() { final RecordCollector collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override @@ -194,7 +237,8 @@ public class RecordCollectorTest { } }, "test", - logContext); + logContext, + new DefaultProductionExceptionHandler()); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); try { @@ -204,8 +248,44 @@ public class RecordCollectorTest { } @SuppressWarnings("unchecked") + @Test + public void shouldNotThrowStreamsExceptionOnCloseIfASendFailedWithContinueExceptionHandler() { + final RecordCollector collector = new RecordCollectorImpl( + new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + @Override + public synchronized Future send(final ProducerRecord record, final Callback callback) { + callback.onCompletion(null, new Exception()); + return null; + } + }, + "test", + logContext, + new AlwaysContinueProductionExceptionHandler()); + collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); + + collector.close(); + } + + @SuppressWarnings("unchecked") + @Test(expected = StreamsException.class) + public void shouldThrowIfTopicIsUnknownWithDefaultExceptionHandler() { + final RecordCollector collector = new RecordCollectorImpl( + new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { + @Override + public List partitionsFor(final String topic) { + return Collections.EMPTY_LIST; + } + + }, + "test", + logContext, + new DefaultProductionExceptionHandler()); + collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); + } + + @SuppressWarnings("unchecked") @Test(expected = StreamsException.class) - public void shouldThrowIfTopicIsUnknown() { + public void shouldThrowIfTopicIsUnknownWithContinueExceptionHandler() { final RecordCollector collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override @@ -215,7 +295,8 @@ public class RecordCollectorTest { }, "test", - logContext); + logContext, + new AlwaysContinueProductionExceptionHandler()); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index d7697cb..2fa1b59 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.errors.LogAndFailExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; @@ -54,7 +55,7 @@ public class RecordQueueTest { private final String[] topics = {"topic"}; final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class), - new RecordCollectorImpl(null, null, new LogContext("record-queue-test "))); + new RecordCollectorImpl(null, null, new LogContext("record-queue-test "), new DefaultProductionExceptionHandler())); private final MockSourceNode mockSourceNodeWithMetrics = new MockSourceNode<>(topics, intDeserializer, intDeserializer); private final RecordQueue queue = new RecordQueue( new TopicPartition(topics[0], 1), http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java index ef99b8a..6792740 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.test.MockProcessorContext; @@ -37,7 +38,7 @@ public class SinkNodeTest { private final Serializer anySerializer = Serdes.Bytes().serializer(); private final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class); private final MockProcessorContext context = new MockProcessorContext(anyStateSerde, - new RecordCollectorImpl(new MockProducer(true, anySerializer, anySerializer), null, new LogContext("sinknode-test "))); + new RecordCollectorImpl(new MockProducer(true, anySerializer, anySerializer), null, new LogContext("sinknode-test "), new DefaultProductionExceptionHandler())); private final SinkNode sink = new SinkNode<>("anyNodeName", "any-output-topic", anySerializer, anySerializer, null); @Before @@ -114,4 +115,4 @@ public class SinkNodeTest { } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 4aee8f5..a3ff328 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.streams.errors.ProductionExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; @@ -513,7 +514,8 @@ public class StreamTaskTest { changelogReader, config, streamsMetrics, stateDirectory, null, time, producer) { @Override - RecordCollector createRecordCollector(final LogContext logContext) { + RecordCollector createRecordCollector(final LogContext logContext, + final ProductionExceptionHandler exHandler) { return new NoOpRecordCollector() { @Override public void flush() { http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index bf433da..a342585 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; @@ -185,7 +186,7 @@ public class KeyValueStoreTestDriver { final ByteArraySerializer rawSerializer = new ByteArraySerializer(); final Producer producer = new MockProducer<>(true, rawSerializer, rawSerializer); - final RecordCollector recordCollector = new RecordCollectorImpl(producer, "KeyValueStoreTestDriver", new LogContext("KeyValueStoreTestDriver ")) { + final RecordCollector recordCollector = new RecordCollectorImpl(producer, "KeyValueStoreTestDriver", new LogContext("KeyValueStoreTestDriver "), new DefaultProductionExceptionHandler()) { @Override public void send(final String topic, final K1 key, http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index 3b7e2c4..39c8f03 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; @@ -76,7 +77,7 @@ public class RocksDBWindowStoreTest { private final ThreadCache cache = new ThreadCache(new LogContext("TestCache "), DEFAULT_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); private final Producer producer = new MockProducer<>(true, Serdes.ByteArray().serializer(), Serdes.ByteArray().serializer()); - private final RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTestTask", new LogContext("RocksDBWindowStoreTestTask ")) { + private final RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTestTask", new LogContext("RocksDBWindowStoreTestTask "), new DefaultProductionExceptionHandler()) { @Override public void send(final String topic, K1 key, @@ -190,51 +191,51 @@ public class RocksDBWindowStoreTest { assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5)); assertNull(entriesByKey.get(6)); } - + @SuppressWarnings("unchecked") @Test public void shouldGetAll() throws IOException { windowStore = createWindowStore(context, false, true); long startTime = segmentSize - 4L; - + putFirstBatch(windowStore, startTime, context); - + final KeyValue, String> zero = windowedPair(0, "zero", startTime + 0); final KeyValue, String> one = windowedPair(1, "one", startTime + 1); final KeyValue, String> two = windowedPair(2, "two", startTime + 2); final KeyValue, String> four = windowedPair(4, "four", startTime + 4); final KeyValue, String> five = windowedPair(5, "five", startTime + 5); - + assertEquals( Utils.mkList(zero, one, two, four, five), StreamsTestUtils.toList(windowStore.all()) ); } - + @SuppressWarnings("unchecked") @Test public void shouldFetchAllInTimeRange() throws IOException { windowStore = createWindowStore(context, false, true); long startTime = segmentSize - 4L; - + putFirstBatch(windowStore, startTime, context); - + final KeyValue, String> zero = windowedPair(0, "zero", startTime + 0); final KeyValue, String> one = windowedPair(1, "one", startTime + 1); final KeyValue, String> two = windowedPair(2, "two", startTime + 2); final KeyValue, String> four = windowedPair(4, "four", startTime + 4); final KeyValue, String> five = windowedPair(5, "five", startTime + 5); - + assertEquals( Utils.mkList(one, two, four), StreamsTestUtils.toList(windowStore.fetchAll(startTime + 1, startTime + 4)) ); - + assertEquals( Utils.mkList(zero, one, two), StreamsTestUtils.toList(windowStore.fetchAll(startTime + 0, startTime + 3)) ); - + assertEquals( Utils.mkList(one, two, four, five), StreamsTestUtils.toList(windowStore.fetchAll(startTime + 1, startTime + 5)) http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java index 8749e92..32b56bb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; import org.apache.kafka.streams.state.StateSerdes; @@ -39,7 +40,7 @@ public class StoreChangeLoggerTest { private final Map logged = new HashMap<>(); private final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class), - new RecordCollectorImpl(null, "StoreChangeLoggerTest", new LogContext("StoreChangeLoggerTest ")) { + new RecordCollectorImpl(null, "StoreChangeLoggerTest", new LogContext("StoreChangeLoggerTest "), new DefaultProductionExceptionHandler()) { @Override public void send(final String topic, final K1 key, http://git-wip-us.apache.org/repos/asf/kafka/blob/69777260/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index 7058f2f..3a9ed75 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilderTest; +import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; @@ -135,7 +136,7 @@ public class KStreamTestDriver extends ExternalResource { } initTopology(topology, topology.stateStores()); } - + @Override protected void after() { if (topology != null) { @@ -275,7 +276,7 @@ public class KStreamTestDriver extends ExternalResource { private class MockRecordCollector extends RecordCollectorImpl { MockRecordCollector() { - super(null, "KStreamTestDriver", new LogContext("KStreamTestDriver ")); + super(null, "KStreamTestDriver", new LogContext("KStreamTestDriver "), new DefaultProductionExceptionHandler()); } @Override