Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E50B1200C7D for ; Tue, 16 May 2017 09:21:16 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E3AD9160B9D; Tue, 16 May 2017 07:21:16 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E0A75160BAC for ; Tue, 16 May 2017 09:21:15 +0200 (CEST) Received: (qmail 38501 invoked by uid 500); 16 May 2017 07:21:14 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 38488 invoked by uid 99); 16 May 2017 07:21:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 May 2017 07:21:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B43E5DFD84; Tue, 16 May 2017 07:21:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Tue, 16 May 2017 07:21:14 -0000 Message-Id: <07bd394bd19941849b6ccaff738f2d3f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/4] camel git commit: CAMEL-11215: Add breakOnError to camel-kafka so the consumer stops on first exception and allow the same message to be polled again for retry. archived-at: Tue, 16 May 2017 07:21:17 -0000 Repository: camel Updated Branches: refs/heads/CAMEL-11215 [created] 9945e25d1 CAMEL-11215: Add breakOnError to camel-kafka so the consumer stops on first exception and allow the same message to be polled again for retry. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1504c6e9 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1504c6e9 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1504c6e9 Branch: refs/heads/CAMEL-11215 Commit: 1504c6e9625d304fef0a568d5216b71cb04c17eb Parents: 000e09a Author: Claus Ibsen Authored: Mon May 15 13:37:41 2017 +0200 Committer: Claus Ibsen Committed: Mon May 15 13:37:41 2017 +0200 ---------------------------------------------------------------------- .../src/main/docs/kafka-component.adoc | 3 +- .../component/kafka/KafkaConfiguration.java | 18 ++++++ .../camel/component/kafka/KafkaConsumer.java | 61 ++++++++++++++++---- .../springboot/KafkaComponentConfiguration.java | 20 +++++++ 4 files changed, 90 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1504c6e9/components/camel-kafka/src/main/docs/kafka-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index 5637673..bdbebce 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -66,7 +66,7 @@ with the following path and query parameters: | **topic** | *Required* Name of the topic to use. On the consumer you can use comma to separate multiple topics. A producer can only send a message to a single topic. | | String |======================================================================= -#### Query Parameters (83 parameters): +#### Query Parameters (84 parameters): [width="100%",cols="2,5,^1,2",options="header"] |======================================================================= @@ -77,6 +77,7 @@ with the following path and query parameters: | **autoCommitIntervalMs** (consumer) | The frequency in ms that the consumer offsets are committed to zookeeper. | 5000 | Integer | **autoCommitOnStop** (consumer) | Whether to perform an explicit auto commit when the consumer stops to ensure the broker has a commit from the last consumed message. This requires the option autoCommitEnable is turned on. The possible values are: sync async or none. And sync is the default value. | sync | String | **autoOffsetReset** (consumer) | What to do when there is no initial offset in ZooKeeper or if an offset is out of range: smallest : automatically reset the offset to the smallest offset largest : automatically reset the offset to the largest offset fail: throw exception to the consumer | latest | String +| **breakOnFirstError** (consumer) | This options controls what happens when a consumer is processing an exchange and it fails. If the option is false then the consumer continues to the next message and processes it. If the option is true then the consumer breaks out and will seek back to offset of the message that caused a failure and then re-attempt to process this message. However this can lead to endless processing of the same message if its bound to fail every time eg a poison message. Therefore its recommended to deal with that for example by using Camel's error handler. | true | boolean | **bridgeErrorHandler** (consumer) | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored. | false | boolean | **checkCrcs** (consumer) | Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead so it may be disabled in cases seeking extreme performance. | true | Boolean | **consumerRequestTimeoutMs** (consumer) | The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted. | 40000 | Integer http://git-wip-us.apache.org/repos/asf/camel/blob/1504c6e9/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index bb4acbc..9c288c7 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -115,6 +115,8 @@ public class KafkaConfiguration implements Cloneable { private Boolean autoCommitEnable = true; @UriParam(label = "consumer", defaultValue = "sync", enums = "sync,async,none") private String autoCommitOnStop = "sync"; + @UriParam(label = "consumer", defaultValue = "true") + private boolean breakOnFirstError = true; @UriParam(label = "consumer") private StateRepository offsetRepository; @@ -684,6 +686,22 @@ public class KafkaConfiguration implements Cloneable { this.autoCommitOnStop = autoCommitOnStop; } + public boolean isBreakOnFirstError() { + return breakOnFirstError; + } + + /** + * This options controls what happens when a consumer is processing an exchange and it fails. + * If the option is false then the consumer continues to the next message and processes it. + * If the option is true then the consumer breaks out, and will seek back to offset of the + * message that caused a failure, and then re-attempt to process this message. However this can lead + * to endless processing of the same message if its bound to fail every time, eg a poison message. + * Therefore its recommended to deal with that for example by using Camel's error handler. + */ + public void setBreakOnFirstError(boolean breakOnFirstError) { + this.breakOnFirstError = breakOnFirstError; + } + public String getBrokers() { return brokers; } http://git-wip-us.apache.org/repos/asf/camel/blob/1504c6e9/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 321aebb..7aa6234 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -186,15 +186,23 @@ public class KafkaConsumer extends DefaultConsumer { } } while (isRunAllowed() && !isStoppingOrStopped() && !isSuspendingOrSuspended()) { + + // flag to break out processing on the first exception + boolean breakOnErrorHit = false; + log.trace("Polling {} from topic: {} with timeout: {}", threadId, topicName, pollTimeoutMs); ConsumerRecords allRecords = consumer.poll(pollTimeoutMs); + for (TopicPartition partition : allRecords.partitions()) { Iterator> recordIterator = allRecords.records(partition).iterator(); - if (recordIterator.hasNext()) { - ConsumerRecord record = null; - while (recordIterator.hasNext()) { + if (!breakOnErrorHit && recordIterator.hasNext()) { + ConsumerRecord record; + + long partitionLastOffset = -1; + + while (!breakOnErrorHit && recordIterator.hasNext()) { record = recordIterator.next(); if (LOG.isTraceEnabled()) { - LOG.trace("partition = {}, offset = {}, key = {}, value = {}", record.partition(), record.offset(), record.key(), + LOG.trace("Partition = {}, offset = {}, key = {}, value = {}", record.partition(), record.offset(), record.key(), record.value()); } Exchange exchange = endpoint.createKafkaExchange(record); @@ -204,15 +212,34 @@ public class KafkaConsumer extends DefaultConsumer { try { processor.process(exchange); } catch (Exception e) { - getExceptionHandler().handleException("Error during processing", exchange, e); + exchange.setException(e); + } + + if (exchange.getException() != null) { + // processing failed due to an unhandled exception, what should we do + if (endpoint.getConfiguration().isBreakOnFirstError()) { + // commit last good offset before we try again + commitOffset(offsetRepository, partition, partitionLastOffset); + // we are failing but store last good offset + log.warn("Error during processing {} from topic: {}. Will seek consumer to last good offset: {} and poll again.", exchange, topicName, partitionLastOffset); + if (partitionLastOffset != -1) { + consumer.seek(partition, partitionLastOffset); + } + // continue to next partition + breakOnErrorHit = true; + } else { + // will handle/log the exception and then continue to next + getExceptionHandler().handleException("Error during processing", exchange, exchange.getException()); + } + } else { + // record was success so remember its offset + partitionLastOffset = record.offset(); } } - long partitionLastOffset = record.offset(); - if (offsetRepository != null) { - offsetRepository.setState(serializeOffsetKey(partition), serializeOffsetValue(partitionLastOffset)); - // if autocommit is false - } else if (endpoint.getConfiguration().isAutoCommitEnable() != null && !endpoint.getConfiguration().isAutoCommitEnable()) { - consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(partitionLastOffset + 1))); + + if (!breakOnErrorHit) { + // all records processed from partition so commit them + commitOffset(offsetRepository, partition, partitionLastOffset); } } } @@ -243,6 +270,18 @@ public class KafkaConsumer extends DefaultConsumer { } } + private void commitOffset(StateRepository offsetRepository, TopicPartition partition, long partitionLastOffset) { + if (partitionLastOffset != -1) { + if (offsetRepository != null) { + offsetRepository.setState(serializeOffsetKey(partition), serializeOffsetValue(partitionLastOffset)); + // if autocommit is false + } else if (endpoint.getConfiguration().isAutoCommitEnable() != null && !endpoint.getConfiguration().isAutoCommitEnable()) { + LOG.debug("Auto commitSync {} from topic {} with offset: {}", threadId, topicName, partitionLastOffset); + consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(partitionLastOffset + 1))); + } + } + } + private void shutdown() { // As advised in the KAFKA-1894 ticket, calling this wakeup method breaks the infinite loop consumer.wakeup(); http://git-wip-us.apache.org/repos/asf/camel/blob/1504c6e9/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java index f5dfddd..087d0d0 100644 --- a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java @@ -203,6 +203,18 @@ public class KafkaComponentConfiguration { */ private String autoCommitOnStop = "sync"; /** + * This options controls what happens when a consumer is processing an + * exchange and it fails. If the option is false then the + * consumer continues to the next message and processes it. If the + * option is true then the consumer breaks out, and will seek + * back to offset of the message that caused a failure, and then + * re-attempt to process this message. However this can lead to endless + * processing of the same message if its bound to fail every time, eg a + * poison message. Therefore its recommended to deal with that for + * example by using Camel's error handler. + */ + private Boolean breakOnFirstError = true; + /** * URL of the Kafka brokers to use. The format is * host1:port1,host2:port2, and the list can be a subset of brokers or a * VIP pointing to a subset of brokers. @@ -767,6 +779,14 @@ public class KafkaComponentConfiguration { this.autoCommitOnStop = autoCommitOnStop; } + public Boolean getBreakOnFirstError() { + return breakOnFirstError; + } + + public void setBreakOnFirstError(Boolean breakOnFirstError) { + this.breakOnFirstError = breakOnFirstError; + } + public String getBrokers() { return brokers; }