Return-Path: X-Original-To: apmail-usergrid-commits-archive@minotaur.apache.org Delivered-To: apmail-usergrid-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8EE13172A1 for ; Wed, 11 Mar 2015 13:41:22 +0000 (UTC) Received: (qmail 18643 invoked by uid 500); 11 Mar 2015 13:41:22 -0000 Delivered-To: apmail-usergrid-commits-archive@usergrid.apache.org Received: (qmail 18624 invoked by uid 500); 11 Mar 2015 13:41:22 -0000 Mailing-List: contact commits-help@usergrid.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@usergrid.incubator.apache.org Delivered-To: mailing list commits@usergrid.incubator.apache.org Received: (qmail 18615 invoked by uid 99); 11 Mar 2015 13:41:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Mar 2015 13:41:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0B77BE0C0F; Wed, 11 Mar 2015 13:41:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sfeldman@apache.org To: commits@usergrid.apache.org Message-Id: <326ffe5475c440c4a7ec169f9aed8bdc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-usergrid git commit: add deadletter queue Date: Wed, 11 Mar 2015 13:41:21 +0000 (UTC) Repository: incubator-usergrid Updated Branches: refs/heads/USERGRID-473 [created] c021d5d6a add deadletter queue Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c021d5d6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c021d5d6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c021d5d6 Branch: refs/heads/USERGRID-473 Commit: c021d5d6a4c73e8dcb7d40c8d432c13a55b4b068 Parents: 5c7a5f8 Author: Shawn Feldman Authored: Tue Mar 10 17:35:35 2015 -0600 Committer: Shawn Feldman Committed: Tue Mar 10 17:35:35 2015 -0600 ---------------------------------------------------------------------- .../usergrid/persistence/queue/QueueFig.java | 3 ++ .../queue/impl/SQSQueueManagerImpl.java | 35 +++++++++++++++----- 2 files changed, 30 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c021d5d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java index 197b791..b6c6ad7 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java @@ -16,4 +16,7 @@ public interface QueueFig extends GuicyFig { @Default("usergrid") public String getPrefix(); + @Key( "usergrid.queue.max.receive.count" ) + @Default("5") + public Integer getMaxReceiveCount(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c021d5d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java index f202fda..e859437 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java @@ -36,12 +36,11 @@ import com.google.inject.assistedinject.Assisted; import org.apache.commons.lang.StringUtils; import org.apache.usergrid.persistence.model.util.UUIDGenerator; import org.apache.usergrid.persistence.queue.*; +import org.apache.usergrid.persistence.queue.Queue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.*; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -74,15 +73,30 @@ public class SQSQueueManagerImpl implements QueueManager { CreateQueueRequest createQueueRequest = new CreateQueueRequest() .withQueueName(name); CreateQueueResult result = queueLoader.getClient().createQueue(createQueueRequest); - String url = result.getQueueUrl(); - queue = new Queue(url); - LOG.info("Created queue with url {}", url); + String queueUrl = result.getQueueUrl(); + setDeadLetterQueue(queueLoader.client,queueLoader.config(), queueUrl, name+"_dead_letter"); + queue = new Queue(queueUrl); + LOG.info("Created queue with url {}", queueUrl); } return queue; } } ); + private static void setDeadLetterQueue(AmazonSQSClient client, QueueFig fig, String deadLetterName, String queueUrl) { + CreateQueueRequest deadLetterQueueRequest = new CreateQueueRequest() + .withQueueName(deadLetterName); + CreateQueueResult deadLetterResult = client.createQueue(deadLetterQueueRequest); + String deadLetterUrl = deadLetterResult.getQueueUrl(); + String redrivePolicy = String.format("{\"maxReceiveCount\":\"%s\", \"deadLetterTargetArn\":\"%s\"}", fig.getMaxReceiveCount(), deadLetterUrl); + SetQueueAttributesRequest queueAttributes = new SetQueueAttributesRequest(); + Map attributes = new HashMap<>(); + attributes.put("RedrivePolicy", redrivePolicy); + queueAttributes.setAttributes(attributes); + queueAttributes.setQueueUrl(queueUrl); + client.setQueueAttributes(queueAttributes); + } + @Inject public SQSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig){ this.fig = fig; @@ -111,7 +125,7 @@ public class SQSQueueManagerImpl implements QueueManager { public Queue getQueue() { try { - Queue queue = urlMap.get(new SqsLoader(getName(),sqs)); + Queue queue = urlMap.get(new SqsLoader(getName(),sqs,fig)); return queue; } catch (ExecutionException ee) { throw new RuntimeException(ee); @@ -127,6 +141,7 @@ public class SQSQueueManagerImpl implements QueueManager { waitTime = waitTime/1000; String url = getQueue().getUrl(); LOG.info("Getting {} messages from {}", limit, url); + ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url); receiveMessageRequest.setMaxNumberOfMessages(limit); receiveMessageRequest.setVisibilityTimeout(transactionTimeout); @@ -230,10 +245,12 @@ public class SQSQueueManagerImpl implements QueueManager { public class SqsLoader { private final String key; private final AmazonSQSClient client; + private final QueueFig fig; - public SqsLoader(String key, AmazonSQSClient client) { + public SqsLoader(String key, AmazonSQSClient client,QueueFig fig) { this.key = key; this.client = client; + this.fig = fig; } public AmazonSQSClient getClient() { @@ -265,5 +282,7 @@ public class SQSQueueManagerImpl implements QueueManager { return getKey(); } + public QueueFig config(){return fig;} + } }