usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [01/35] git commit: pick random queue between 0 and queue size
Date Wed, 01 Oct 2014 14:51:37 GMT
Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-rebuildable-index 1aa04a71d -> 3df5d4d25


pick random queue between 0 and queue size


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/f2a93d7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/f2a93d7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/f2a93d7f

Branch: refs/heads/two-dot-o-rebuildable-index
Commit: f2a93d7f3283a148b2091821f2644506313def8f
Parents: 4cba3b8
Author: Shawn Feldman <sfeldman@apache.org>
Authored: Fri Sep 26 11:02:25 2014 -0600
Committer: Shawn Feldman <sfeldman@apache.org>
Committed: Fri Sep 26 11:02:25 2014 -0600

----------------------------------------------------------------------
 .../notifications/ApplicationQueueManager.java  | 27 +++++++++++---------
 .../services/notifications/QueueListener.java   | 15 +++++------
 .../services/notifications/QueueManager.java    |  4 ---
 .../notifications/SingleQueueTaskManager.java   |  8 +++---
 4 files changed, 26 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f2a93d7f/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index d110b95..198d41a 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -18,15 +18,9 @@ package org.apache.usergrid.services.notifications;
 
 import com.clearspring.analytics.hash.MurmurHash;
 import com.clearspring.analytics.stream.frequency.CountMinSketch;
-import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
 import org.apache.usergrid.batch.JobExecution;
 import org.apache.usergrid.metrics.MetricsFactory;
-import org.apache.usergrid.mq.QueueQuery;
-import org.apache.usergrid.mq.QueueResults;
 import org.apache.usergrid.persistence.*;
 import org.apache.usergrid.persistence.entities.Device;
 import org.apache.usergrid.persistence.entities.Notification;
@@ -66,7 +60,7 @@ public class ApplicationQueueManager implements QueueManager {
     private final org.apache.usergrid.mq.QueueManager qm;
     private final JobScheduler jobScheduler;
     private final MetricsFactory metricsFactory;
-    private final String queueName;
+    private final String[] queueNames;
 
     HashMap<Object, Notifier> notifierHashMap; // only retrieve notifiers once
 
@@ -87,7 +81,7 @@ public class ApplicationQueueManager implements QueueManager {
         this.qm = queueManager;
         this.jobScheduler = jobScheduler;
         this.metricsFactory = metricsFactory;
-        this.queueName = properties.getProperty(DEFAULT_QUEUE_PROPERTY, DEFAULT_QUEUE_NAME);
+        this.queueNames = getQueueNames(properties);
     }
 
 
@@ -115,6 +109,7 @@ public class ApplicationQueueManager implements QueueManager {
         final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<String>();
//build up list of issues
 
         final HashMap<Object,Notifier> notifierMap =  getNotifierMap();
+        final String queueName = getRandomQueue(queueNames);
 
         //get devices in querystring, and make sure you have access
         if (pathQuery != null) {
@@ -235,7 +230,7 @@ public class ApplicationQueueManager implements QueueManager {
 
         //do i have devices, and have i already started batching.
         if (deviceCount.get() <= 0) {
-            SingleQueueTaskManager taskManager = new SingleQueueTaskManager(em, qm, this,
notification);
+            SingleQueueTaskManager taskManager = new SingleQueueTaskManager(em, qm, this,
notification,queueName);
             //if i'm in a test value will be false, do not mark finished for test orchestration,
not ideal need real tests
             taskManager.finishedBatch();
         }
@@ -288,7 +283,7 @@ public class ApplicationQueueManager implements QueueManager {
      * @param messages
      * @throws Exception
      */
-    public Observable sendBatchToProviders( final List<ApplicationQueueMessage> messages)
{
+    public Observable sendBatchToProviders( final List<ApplicationQueueMessage> messages,
final String queuePath) {
         LOG.info("sending batch of {} notifications.", messages.size());
         final Meter sendMeter = metricsFactory.getMeter(NotificationsService.class, "send");
 
@@ -313,7 +308,7 @@ public class ApplicationQueueManager implements QueueManager {
                     SingleQueueTaskManager taskManager;
                     taskManager = taskMap.get(message.getNotificationId());
                     if (taskManager == null) {
-                        taskManager = new SingleQueueTaskManager(em, qm, proxy, notification);
+                        taskManager = new SingleQueueTaskManager(em, qm, proxy, notification,queuePath);
                         taskMap.putIfAbsent(message.getNotificationId(), taskManager);
                         taskManager = taskMap.get(message.getNotificationId());
                     }
@@ -427,6 +422,15 @@ public class ApplicationQueueManager implements QueueManager {
         return translatedPayloads;
     }
 
+    public static String[] getQueueNames(Properties properties) {
+        String[] names = properties.getProperty(ApplicationQueueManager.DEFAULT_QUEUE_PROPERTY,ApplicationQueueManager.DEFAULT_QUEUE_NAME).split(";");
+        return names;
+    }
+    public static String getRandomQueue(String[] queueNames) {
+        int size = queueNames.length;
+        Random random = new Random();
+        return queueNames[random.nextInt(size)];
+    }
 
     private static final class IteratorObservable<T> implements rx.Observable.OnSubscribe<T>
{
         private final Iterator<T> input;
@@ -571,6 +575,5 @@ public class ApplicationQueueManager implements QueueManager {
         }
     }
 
-    public String getQueuePath(){return queueName;}
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f2a93d7f/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 6e9a7ef..1d4c5cb 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -35,8 +35,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class QueueListener  {
-    public static int MAX_CONSECUTIVE_FAILS = 10000;
-
     public static final long MESSAGE_TRANSACTION_TIMEOUT = 60 * 5 * 1000;
 
     private static final Logger LOG = LoggerFactory.getLogger(QueueListener.class);
@@ -66,7 +64,7 @@ public class QueueListener  {
 
     public static final String MAX_THREADS = "1";
     private Integer batchSize = 1000;
-    private String queueName;
+    private String[] queueNames;
 
     public QueueListener() {
         pool = Executors.newFixedThreadPool(1);
@@ -95,7 +93,7 @@ public class QueueListener  {
                 sleepBetweenRuns = new Long(properties.getProperty("usergrid.notifications.listener.sleep.between",
"0")).longValue();
                 sleepWhenNoneFound = new Long(properties.getProperty("usergrid.notifications.listener.sleep.after",
"5000")).longValue();
                 batchSize = new Integer(properties.getProperty("usergrid.notifications.listener.batchSize",
(""+batchSize)));
-                queueName = properties.getProperty(ApplicationQueueManager.DEFAULT_QUEUE_PROPERTY,ApplicationQueueManager.DEFAULT_QUEUE_NAME);
+                queueNames = ApplicationQueueManager.getQueueNames(properties);
 
                 int maxThreads = new Integer(properties.getProperty("usergrid.notifications.listener.maxThreads",
MAX_THREADS));
                 futures = new ArrayList<Future>(maxThreads);
@@ -133,7 +131,8 @@ public class QueueListener  {
         // run until there are no more active jobs
         while ( true ) {
             try {
-                QueueResults results = getDeliveryBatch(queueManager);
+                String queueName = ApplicationQueueManager.getRandomQueue(queueNames);
+                QueueResults results = getDeliveryBatch(queueManager,queueName);
                 LOG.info("QueueListener: retrieved batch of {} messages", results.size());
 
                 List<Message> messages = results.getMessages();
@@ -167,7 +166,7 @@ public class QueueListener  {
                         );
 
                         LOG.info("QueueListener: send batch for app {} of {} messages", entry.getKey(),
entry.getValue().size());
-                        Observable current = manager.sendBatchToProviders(entry.getValue());
+                        Observable current = manager.sendBatchToProviders(entry.getValue(),queueName);
                         if(merge == null)
                             merge = current;
                         else {
@@ -211,11 +210,11 @@ public class QueueListener  {
         }
     }
 
-    private  QueueResults getDeliveryBatch(org.apache.usergrid.mq.QueueManager queueManager)
throws Exception {
+    private  QueueResults getDeliveryBatch(org.apache.usergrid.mq.QueueManager queueManager,String
queuePath) throws Exception {
         QueueQuery qq = new QueueQuery();
         qq.setLimit(this.getBatchSize());
         qq.setTimeout(MESSAGE_TRANSACTION_TIMEOUT);
-        QueueResults results = queueManager.getFromQueue(queueName, qq);
+        QueueResults results = queueManager.getFromQueue(queuePath, qq);
         LOG.debug("got batch of {} devices", results.size());
         return results;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f2a93d7f/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
index f92d463..0024417 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
@@ -26,10 +26,6 @@ import java.util.Set;
  */
 public interface QueueManager {
 
-    public HashMap<Object,Notifier> getNotifierMap();
-
     public void asyncCheckForInactiveDevices(Set<Notifier> notifiers) throws Exception
;
 
-    public String getQueuePath();
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f2a93d7f/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
index 8b4866f..f87f497 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
@@ -34,8 +34,8 @@ public class SingleQueueTaskManager implements NotificationsTaskManager
{
 
     private static final Logger LOG = LoggerFactory
             .getLogger(SingleQueueTaskManager.class);
-    private final String path;
     private final QueueManager proxy;
+    private final String queuePath;
 
     private Notification notification;
     private AtomicLong successes = new AtomicLong();
@@ -45,14 +45,14 @@ public class SingleQueueTaskManager implements NotificationsTaskManager
{
     private ConcurrentHashMap<UUID, ApplicationQueueMessage> messageMap;
     private boolean hasFinished;
 
-    public SingleQueueTaskManager(EntityManager em, org.apache.usergrid.mq.QueueManager qm,
QueueManager proxy, Notification notification) {
+    public SingleQueueTaskManager(EntityManager em, org.apache.usergrid.mq.QueueManager qm,
QueueManager proxy, Notification notification,String queuePath) {
         this.em = em;
         this.qm = qm;
-        this.path = proxy.getQueuePath();
         this.notification = notification;
         this.proxy = proxy;
         this.messageMap = new ConcurrentHashMap<UUID, ApplicationQueueMessage>();
         hasFinished = false;
+        this.queuePath = queuePath;
     }
 
     public void addMessage(UUID deviceId, ApplicationQueueMessage message) {
@@ -72,7 +72,7 @@ public class SingleQueueTaskManager implements NotificationsTaskManager
{
             }
 
             LOG.debug("notification {} removing device {} from remaining", notification.getUuid(),
deviceUUID);
-            qm.commitTransaction(path, messageMap.get(deviceUUID).getTransaction(), null);
+            qm.commitTransaction(queuePath, messageMap.get(deviceUUID).getTransaction(),
null);
             if (newProviderId != null) {
                 LOG.debug("notification {} replacing device {} notifierId", notification.getUuid(),
deviceUUID);
                 replaceProviderId(deviceRef, notifier, newProviderId);


Mime
View raw message