Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C0FA51043E for ; Mon, 6 Jan 2014 09:18:20 +0000 (UTC) Received: (qmail 54479 invoked by uid 500); 6 Jan 2014 09:18:18 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 54343 invoked by uid 500); 6 Jan 2014 09:18:16 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 54319 invoked by uid 99); 6 Jan 2014 09:18:14 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Jan 2014 09:18:14 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 1348C44CFB; Mon, 6 Jan 2014 09:18:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ningjiang@apache.org To: commits@camel.apache.org Date: Mon, 06 Jan 2014 09:18:14 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/3] git commit: CAMEL-7105 Added ability to auto reconnect for sqs queues with thanks to Adrian Updated Branches: refs/heads/master e60335d48 -> 6f7450080 CAMEL-7105 Added ability to auto reconnect for sqs queues with thanks to Adrian Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/27ba5718 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/27ba5718 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/27ba5718 Branch: refs/heads/master Commit: 27ba57189c857d5d6d1885999be6db177514e04d Parents: e60335d Author: Willem Jiang Authored: Mon Jan 6 17:11:04 2014 +0800 Committer: Willem Jiang Committed: Mon Jan 6 17:12:11 2014 +0800 ---------------------------------------------------------------------- .../camel/component/aws/sqs/SqsConsumer.java | 28 +++++++++++++++++++- .../camel/component/aws/sqs/SqsEndpoint.java | 4 +-- 2 files changed, 29 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/27ba5718/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java index 1163743..7daa8a8 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java @@ -29,6 +29,8 @@ import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest; import com.amazonaws.services.sqs.model.DeleteMessageRequest; import com.amazonaws.services.sqs.model.Message; import com.amazonaws.services.sqs.model.MessageNotInflightException; +import com.amazonaws.services.sqs.model.QueueDeletedRecentlyException; +import com.amazonaws.services.sqs.model.QueueDoesNotExistException; import com.amazonaws.services.sqs.model.ReceiptHandleIsInvalidException; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.amazonaws.services.sqs.model.ReceiveMessageResult; @@ -46,6 +48,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * A Consumer of messages from the Amazon Web Service Simple Queue Service * AWS SQS @@ -74,7 +77,14 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { LOG.trace("Receiving messages with request [{}]...", request); - ReceiveMessageResult messageResult = getClient().receiveMessage(request); + ReceiveMessageResult messageResult = null; + try { + messageResult = getClient().receiveMessage(request); + } catch (QueueDoesNotExistException e) { + LOG.info("Queue does not exist....recreating now..."); + reConnectToQueue(); + messageResult = getClient().receiveMessage(request); + } if (LOG.isTraceEnabled()) { LOG.trace("Received {} messages", messageResult.getMessages().size()); @@ -83,6 +93,22 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer { Queue exchanges = createExchanges(messageResult.getMessages()); return processBatch(CastUtils.cast(exchanges)); } + + public void reConnectToQueue() { + try { + getEndpoint().createQueue(getClient()); + } catch (QueueDeletedRecentlyException qdr) { + LOG.debug("Queue recently deleted, will retry in 30 seconds."); + try { + Thread.sleep(30000); + getEndpoint().createQueue(getClient()); + } catch (Exception e) { + LOG.error("failed to retry queue connection.", e); + } + } catch (Exception e) { + LOG.error("Could not connect to queue in amazon.", e); + } + } protected Queue createExchanges(List messages) { if (LOG.isTraceEnabled()) { http://git-wip-us.apache.org/repos/asf/camel/blob/27ba5718/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java index ca3ff0a..aa01c72 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java @@ -110,7 +110,7 @@ public class SqsEndpoint extends ScheduledPollEndpoint { } } - private void createQueue(AmazonSQS client) { + protected void createQueue(AmazonSQS client) { LOG.trace("Queue '{}' doesn't exist. Will create it...", configuration.getQueueName()); // creates a new queue, or returns the URL of an existing one @@ -220,7 +220,7 @@ public class SqsEndpoint extends ScheduledPollEndpoint { protected String getQueueUrl() { return queueUrl; } - + public int getMaxMessagesPerPoll() { return maxMessagesPerPoll; }