usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sfeld...@apache.org
Subject [4/4] git commit: queues!!!
Date Fri, 03 Oct 2014 19:10:26 GMT
queues!!!


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2570c800
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2570c800
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2570c800

Branch: refs/heads/sqs_queues
Commit: 2570c800f8c61f606be102a8b9cdc050419a27c0
Parents: 92cac3d
Author: Shawn Feldman <sfeldman@apache.org>
Authored: Fri Oct 3 13:09:49 2014 -0600
Committer: Shawn Feldman <sfeldman@apache.org>
Committed: Fri Oct 3 13:09:49 2014 -0600

----------------------------------------------------------------------
 .../util/UsergridAwsCredentialsProvider.java    | 57 ++++++++++++++++++++
 .../usergrid/persistence/queue/QueueFig.java    |  2 +-
 .../persistence/queue/QueueManager.java         |  2 +
 .../queue/impl/QueueManagerImpl.java            | 49 +++++++++++++++--
 .../persistence/queue/QueueManagerTest.java     | 24 +++++++--
 5 files changed, 125 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2570c800/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/util/UsergridAwsCredentialsProvider.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/util/UsergridAwsCredentialsProvider.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/util/UsergridAwsCredentialsProvider.java
new file mode 100644
index 0000000..6792bde
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/util/UsergridAwsCredentialsProvider.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.core.util;
+
+import com.amazonaws.SDKGlobalConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+
+
+public class UsergridAwsCredentialsProvider implements AWSCredentialsProvider {
+
+    private AWSCredentials creds;
+
+    public  UsergridAwsCredentialsProvider(){
+        init();
+    }
+
+    private void init() {
+        creds = new AWSCredentials() {
+            @Override
+            public String getAWSAccessKeyId() {
+                return System.getProperty(SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR);
+            }
+
+            @Override
+            public String getAWSSecretKey() {
+                return System.getProperty(SDKGlobalConfiguration.SECRET_KEY_ENV_VAR);
+            }
+        };
+    }
+
+    @Override
+    public AWSCredentials getCredentials() {
+        return creds;
+    }
+
+
+    @Override
+    public void refresh() {
+        init();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2570c800/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
index 479fb97..fd71f9e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
@@ -9,7 +9,7 @@ import org.safehaus.guicyfig.Key;
 public interface QueueFig extends GuicyFig {
 
     @Key( "queue.region" )
-    @Default("US_EAST_1")
+    @Default("us-east-1")
     public String getRegion();
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2570c800/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index e29310b..992b533 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -28,4 +28,6 @@ public interface QueueManager {
     List<QueueMessage> getMessages(int limit,int timeout);
     void commitMessage( QueueMessage queueMessage);
     void commitMessages( List<QueueMessage> queueMessages);
+    void sendMessages(List<String> bodies);
+    void sendMessage(String body);
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2570c800/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerImpl.java
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerImpl.java
index 6547f9f..26d127f 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerImpl.java
@@ -1,12 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
 package org.apache.usergrid.persistence.queue.impl;
 
+import com.amazonaws.SDKGlobalConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
+import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
 import com.amazonaws.regions.Region;
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.sqs.AmazonSQSClient;
 import com.amazonaws.services.sqs.model.*;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
+import org.apache.usergrid.persistence.core.util.UsergridAwsCredentialsProvider;
 import org.apache.usergrid.persistence.queue.*;
 
 import java.util.ArrayList;
@@ -22,8 +44,8 @@ public class QueueManagerImpl implements QueueManager {
     public QueueManagerImpl(@Assisted QueueScope scope, QueueFig fig){
         this.fig = fig;
         this.scope = scope;
-        EnvironmentVariableCredentialsProvider credsProvider = new EnvironmentVariableCredentialsProvider();
-        this.sqs = new AmazonSQSClient(credsProvider.getCredentials());
+        UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+        this.sqs = new AmazonSQSClient(ugProvider.getCredentials());
         Regions regions = Regions.fromName(fig.getRegion());
         Region region = Region.getRegion(regions);
         sqs.setRegion(region);
@@ -38,7 +60,7 @@ public class QueueManagerImpl implements QueueManager {
     }
 
     private String getName() {
-        return scope.getApplication().getUuid().toString()+ scope.getName();
+        return scope.getApplication().getType() + scope.getApplication().getUuid().toString()
+ scope.getName();
     }
 
     public Queue getQueue(){
@@ -51,9 +73,29 @@ public class QueueManagerImpl implements QueueManager {
                 }
             }
         }
+        if(queue == null) {
+            queue = createQueue();
+        }
         return queue;
     }
 
+    public void sendMessage(String body){
+        SendMessageRequest request = new SendMessageRequest(getQueue().getUrl(),body);
+        sqs.sendMessage(request);
+    }
+
+    public void sendMessages(List<String> bodies){
+        SendMessageBatchRequest request = new SendMessageBatchRequest(getQueue().getUrl());
+        List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size());
+        for(String body : bodies){
+            SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
+            entry.setMessageBody(body);
+            entries.add(entry);
+        }
+        request.setEntries(entries);
+        sqs.sendMessageBatch(request);
+    }
+
     public  List<QueueMessage> getMessages( int limit,int timeout){
         System.out.println("Receiving messages from MyQueue.\n");
         ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(getQueue().getUrl());
@@ -73,6 +115,7 @@ public class QueueManagerImpl implements QueueManager {
                 .withQueueUrl(getQueue().getUrl())
                 .withReceiptHandle(queueMessage.getHandle()));
     }
+
     public void commitMessages( List<QueueMessage> queueMessages){
         List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
         for(QueueMessage message : queueMessages){

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2570c800/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index 8892e3c..e5f3832 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -36,10 +36,10 @@ import org.apache.usergrid.persistence.model.entity.SimpleId;
 
 import com.google.inject.Inject;
 
+import java.util.List;
 import java.util.UUID;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.*;
 
 
 @RunWith( ITRunner.class )
@@ -51,19 +51,33 @@ public class QueueManagerTest {
     protected QueueManagerFactory qmf;
 
     protected QueueScope scope;
+    private QueueManager qm;
 
 
     @Before
     public void mockApp() {
         this.scope = new QueueScopeImpl( new SimpleId( "application" ), "testQueue" );
+        qm = qmf.getQueueManager(scope);
     }
 
 
     @Test
-    public void createQueue() {
-        QueueManager qm = qmf.getQueueManager(scope);
-        qm.createQueue();
+    public void get() {
         Queue queue = qm.getQueue();
+        assertNotNull(queue);
+    }
+
+    @Test
+    public void send(){
+        qm.sendMessage("bodytest");
+        List<QueueMessage> messageList = qm.getMessages(1,5000);
+        assertTrue(messageList.size() >= 1);
+        for(QueueMessage message : messageList){
+            qm.commitMessage(message);
+        }
+        messageList = qm.getMessages(1,5000);
+        assertTrue(messageList.size() <= 0);
+
     }
 
 }


Mime
View raw message