Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9486D18B70 for ; Fri, 18 Dec 2015 17:13:45 +0000 (UTC) Received: (qmail 80641 invoked by uid 500); 18 Dec 2015 17:13:45 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 80384 invoked by uid 500); 18 Dec 2015 17:13:45 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 80316 invoked by uid 99); 18 Dec 2015 17:13:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Dec 2015 17:13:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 13396E07D9; Fri, 18 Dec 2015 17:13:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Fri, 18 Dec 2015 17:13:47 -0000 Message-Id: <926bdc674e9c4a06ab431c91b1855d0d@git.apache.org> In-Reply-To: <31ab1163bd274ec5b889ebf0dbd47131@git.apache.org> References: <31ab1163bd274ec5b889ebf0dbd47131@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/6] camel git commit: Revert "CAMEL-9055: camel-aws - SQS should not allow handover the delete task" CAMEL-9405: Amazon SQS message deletion behaviour change on exception Revert "CAMEL-9055: camel-aws - SQS should not allow handover the delete task" CAMEL-9405: Amazon SQS message deletion behaviour change on exception This reverts commit 47c64ec9c6b9609a71113ad82b15d2c66463c4cd. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2382fd5e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2382fd5e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2382fd5e Branch: refs/heads/camel-2.16.x Commit: 2382fd5edd2147a8627ca9cf3ca0f7e4f39b49e3 Parents: 3f0af0a Author: Claus Ibsen Authored: Fri Dec 18 18:07:50 2015 +0100 Committer: Claus Ibsen Committed: Fri Dec 18 18:13:08 2015 +0100 ---------------------------------------------------------------------- .../camel/component/aws/sqs/SqsConsumer.java | 40 ++++++++------------ .../camel/component/aws/sqs/SqsProducer.java | 2 +- 2 files changed, 17 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/2382fd5e/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java index d3e0a25..0a8d024 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java @@ -34,13 +34,13 @@ import com.amazonaws.services.sqs.model.QueueDoesNotExistException; import com.amazonaws.services.sqs.model.ReceiptHandleIsInvalidException; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.amazonaws.services.sqs.model.ReceiveMessageResult; + import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.NoFactoryAvailableException; import org.apache.camel.Processor; import org.apache.camel.impl.ScheduledBatchPollingConsumer; import org.apache.camel.spi.Synchronization; -import org.apache.camel.support.SynchronizationAdapter; import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.URISupport; @@ -141,20 +141,17 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { // schedule task to extend visibility if enabled Integer visibilityTimeout = getConfiguration().getVisibilityTimeout(); - if (this.scheduledExecutor != null && visibilityTimeout != null && (visibilityTimeout / 2) > 0) { - int delay = visibilityTimeout / 2; - int period = visibilityTimeout; + if (this.scheduledExecutor != null && visibilityTimeout != null && (visibilityTimeout.intValue() / 2) > 0) { + int delay = visibilityTimeout.intValue() / 2; + int period = visibilityTimeout.intValue(); int repeatSeconds = new Double(visibilityTimeout.doubleValue() * 1.5).intValue(); if (LOG.isDebugEnabled()) { LOG.debug("Scheduled TimeoutExtender task to start after {} delay, and run with {}/{} period/repeat (seconds), to extend exchangeId: {}", new Object[]{delay, period, repeatSeconds, exchange.getExchangeId()}); } - final ScheduledFuture scheduledFuture = this.scheduledExecutor.scheduleAtFixedRate( new TimeoutExtender(exchange, repeatSeconds), delay, period, TimeUnit.SECONDS); - - // as the AWS client is not thread-safe we cannot handover the task - exchange.addOnCompletion(new SynchronizationAdapter() { + exchange.addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { cancelExtender(exchange); @@ -165,11 +162,6 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { cancelExtender(exchange); } - @Override - public boolean allowHandover() { - return false; - } - private void cancelExtender(Exchange exchange) { // cancel task as we are done LOG.trace("Processing done so cancelling TimeoutExtender task for exchangeId: {}", exchange.getExchangeId()); @@ -178,30 +170,24 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { }); } + // add on completion to handle after work when the exchange is done - // as the AWS client is not thread-safe we cannot handover the task - exchange.addOnCompletion(new SynchronizationAdapter() { - @Override + exchange.addOnCompletion(new Synchronization() { public void onComplete(Exchange exchange) { processCommit(exchange); } - @Override public void onFailure(Exchange exchange) { processRollback(exchange); } @Override - public boolean allowHandover() { - return false; - } - - @Override public String toString() { return "SqsConsumerOnCompletion"; } }); + LOG.trace("Processing exchange [{}]...", exchange); getAsyncProcessor().process(exchange, new AsyncCallback() { @Override @@ -221,6 +207,7 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { */ protected void processCommit(Exchange exchange) { try { + if (shouldDelete(exchange)) { String receiptHandle = exchange.getIn().getHeader(SqsConstants.RECEIPT_HANDLE, String.class); DeleteMessageRequest deleteRequest = new DeleteMessageRequest(getQueueUrl(), receiptHandle); @@ -237,7 +224,10 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { } private boolean shouldDelete(Exchange exchange) { - return getConfiguration().isDeleteAfterRead() && (getConfiguration().isDeleteIfFiltered() || passedThroughFilter(exchange)); + return getConfiguration().isDeleteAfterRead() + && (getConfiguration().isDeleteIfFiltered() + || (!getConfiguration().isDeleteIfFiltered() + && passedThroughFilter(exchange))); } private boolean passedThroughFilter(Exchange exchange) { @@ -252,7 +242,9 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { protected void processRollback(Exchange exchange) { Exception cause = exchange.getException(); if (cause != null) { - getExceptionHandler().handleException("Error during processing exchange. Will attempt to process the message on next poll.", exchange, cause); + LOG.warn("Exchange failed, so rolling back message status: " + exchange, cause); + } else { + LOG.warn("Exchange failed, so rolling back message status: {}", exchange); } } http://git-wip-us.apache.org/repos/asf/camel/blob/2382fd5e/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java index cd16707..69b1eb3 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java @@ -66,7 +66,7 @@ public class SqsProducer extends DefaultProducer { private void addDelay(SendMessageRequest request, Exchange exchange) { Integer headerValue = exchange.getIn().getHeader(SqsConstants.DELAY_HEADER, Integer.class); - Integer delayValue; + Integer delayValue = Integer.valueOf(0); if (headerValue == null) { LOG.trace("Using the config delay"); delayValue = getEndpoint().getConfiguration().getDelaySeconds();