usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [40/50] incubator-usergrid git commit: Updated futures impl for different queues
Date Wed, 18 Mar 2015 20:56:40 GMT
Updated futures impl for different queues

Added onError to catch errors

Added a gauge so we can track index operation in flight

Updated queue scope to only be a name, since we only use them at the system level.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2d6ae369
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2d6ae369
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2d6ae369

Branch: refs/heads/USERGRID-460
Commit: 2d6ae3698e2faff733e73d17aff569a3850a6485
Parents: 361060e
Author: Todd Nine <tnine@apigee.com>
Authored: Fri Mar 13 16:19:40 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Fri Mar 13 16:19:40 2015 -0600

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    |   7 +-
 .../corepersistence/StaleIndexCleanupTest.java  |   4 +
 .../persistence/core/future/BetterFuture.java   |  43 ++++--
 .../core/metrics/MetricsFactory.java            |   9 ++
 .../core/metrics/MetricsFactoryImpl.java        | 121 ++++++++++------
 stack/corepersistence/queryindex/pom.xml        |   8 +-
 .../persistence/index/IndexBufferConsumer.java  |  11 ++
 .../usergrid/persistence/index/IndexFig.java    |  11 ++
 .../index/IndexOperationMessage.java            |   7 +-
 .../persistence/index/guice/IndexModule.java    |  11 +-
 .../persistence/index/impl/BufferQueue.java     |  22 ++-
 .../index/impl/BufferQueueInMemoryImpl.java     |  53 ++++---
 .../index/impl/BufferQueueSQSImpl.java          |   8 +-
 .../index/impl/EsEntityIndexImpl.java           | 142 ++-----------------
 .../index/impl/EsIndexBufferConsumerImpl.java   | 131 ++++++++++-------
 .../index/guice/TestIndexModule.java            |   8 +-
 .../index/impl/BufferQueueSQSImplTest.java      |   5 +
 .../impl/EntityConnectionIndexImplTest.java     |   3 -
 .../persistence/index/impl/EntityIndexTest.java |   1 -
 .../persistence/index/impl/EsTestUtils.java     |  48 -------
 .../cassandra/ManagementServiceImpl.java        |   9 +-
 .../notifications/NotificationsService.java     |  50 ++++---
 22 files changed, 359 insertions(+), 353 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 3230faa..161037b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -71,12 +71,7 @@ public class CoreModule  extends AbstractModule {
         install( new CommonModule());
         install(new CollectionModule());
         install(new GraphModule());
-        install( new IndexModule() {
-            @Override
-            public void wireBufferQueue() {
-                bind(BufferQueue.class).to( BufferQueueSQSImpl.class );
-            }
-        } );
+        install( new IndexModule() );
 //        install(new MapModule());   TODO, re-enable when index module doesn't depend on queue
 //        install(new QueueModule());
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index 82d8f93..8d31b69 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -310,6 +310,10 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
             em.delete( thing );
         }
 
+
+        //put this into the top of the queue, once it's acked we've been flushed
+        em.refreshIndex();
+
         // wait for indexes to be cleared for the deleted entities
         count = 0;
         do {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
index 201fa9a..777ac52 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
@@ -16,28 +16,53 @@
  */
 package org.apache.usergrid.persistence.core.future;
 
+
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.FutureTask;
 
+
 /**
  * Future without the exception nastiness
  */
-public  class BetterFuture<T> extends FutureTask<T> {
-    public BetterFuture(Callable<T> callable){
-        super(callable);
+public class BetterFuture<T> extends FutureTask<T> {
+
+    private Throwable error;
+
+
+    public BetterFuture( Callable<T> callable ) {
+        super( callable );
     }
 
-    public void done(){
+
+    public void setError( final Throwable t ) {
+        this.error = t;
+    }
+
+
+    public void done() {
         run();
     }
 
-    public T get(){
+
+    public T get() {
+
+        T returnValue = null;
+
         try {
-            return super.get();
-        }catch (Exception e){
-            throw new RuntimeException(e);
+            returnValue = super.get();
+        }
+        catch ( InterruptedException e ) {
+            //swallow
+        }
+        catch ( ExecutionException e ) {
+            //swallow
         }
-    }
 
+        if ( error != null ) {
+           throw new RuntimeException( "Error in getting future", error );
+        }
 
+        return returnValue;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactory.java
index 453e556..62a5cb9 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactory.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactory.java
@@ -31,4 +31,13 @@ public interface MetricsFactory {
     Counter getCounter(Class<?> klass, String name);
 
     Meter getMeter(Class<?> klass, String name);
+
+    /**
+     * Get a gauge and create it
+     * @param clazz
+     * @param name
+     * @param gauge
+     * @return
+     */
+    void addGauge( Class<?> clazz, String name, Gauge<?> gauge );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java
index 6d0881b..904e56a 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/metrics/MetricsFactoryImpl.java
@@ -17,17 +17,27 @@
 package org.apache.usergrid.persistence.core.metrics;
 
 
-import com.codahale.metrics.*;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
 import com.codahale.metrics.graphite.Graphite;
 import com.codahale.metrics.graphite.GraphiteReporter;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.net.InetSocketAddress;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Singleton class to manage metrics.
@@ -39,74 +49,101 @@ public class MetricsFactoryImpl implements MetricsFactory {
     private MetricRegistry registry;
     private GraphiteReporter graphiteReporter;
     private JmxReporter jmxReporter;
-    private ConcurrentHashMap<String,Metric> hashMap;
-    private static final Logger LOG = LoggerFactory.getLogger(MetricsFactoryImpl.class);
+    private ConcurrentHashMap<String, Metric> hashMap;
+    private static final Logger LOG = LoggerFactory.getLogger( MetricsFactoryImpl.class );
+
 
     @Inject
-    public MetricsFactoryImpl(MetricsFig metricsFig) {
+    public MetricsFactoryImpl( MetricsFig metricsFig ) {
         registry = new MetricRegistry();
         String metricsHost = metricsFig.getHost();
-        if(!metricsHost.equals("false")) {
-            Graphite graphite = new Graphite(new InetSocketAddress(metricsHost, 2003));
-            graphiteReporter = GraphiteReporter.forRegistry(registry)
-                    .prefixedWith("usergrid-metrics")
-                    .convertRatesTo(TimeUnit.SECONDS)
-                    .convertDurationsTo(TimeUnit.MILLISECONDS)
-                    .filter(MetricFilter.ALL)
-                    .build(graphite);
-            graphiteReporter.start(30, TimeUnit.SECONDS);
-        }else {
-            LOG.warn("MetricsService:Logger not started.");
+        if ( !metricsHost.equals( "false" ) ) {
+            Graphite graphite = new Graphite( new InetSocketAddress( metricsHost, 2003 ) );
+            graphiteReporter = GraphiteReporter.forRegistry( registry ).prefixedWith( "usergrid-metrics" )
+                                               .convertRatesTo( TimeUnit.SECONDS )
+                                               .convertDurationsTo( TimeUnit.MILLISECONDS ).filter( MetricFilter.ALL )
+                                               .build( graphite );
+            graphiteReporter.start( 30, TimeUnit.SECONDS );
+        }
+        else {
+            LOG.warn( "MetricsService:Logger not started." );
         }
         hashMap = new ConcurrentHashMap<String, Metric>();
 
-        jmxReporter = JmxReporter.forRegistry(registry).build();
+        jmxReporter = JmxReporter.forRegistry( registry ).build();
         jmxReporter.start();
     }
 
+
     @Override
     public MetricRegistry getRegistry() {
         return registry;
     }
 
+
     @Override
-    public Timer getTimer(Class<?> klass, String name) {
-        return getMetric(Timer.class, klass, name);
+    public Timer getTimer( Class<?> klass, String name ) {
+        return getMetric( Timer.class, klass, name );
     }
 
+
     @Override
-    public Histogram getHistogram(Class<?> klass, String name) {
-        return getMetric(Histogram.class, klass, name);
+    public Histogram getHistogram( Class<?> klass, String name ) {
+        return getMetric( Histogram.class, klass, name );
     }
 
+
     @Override
-    public Counter getCounter(Class<?> klass, String name) {
-        return getMetric(Counter.class, klass, name);
+    public Counter getCounter( Class<?> klass, String name ) {
+        return getMetric( Counter.class, klass, name );
     }
 
+
     @Override
-    public Meter getMeter(Class<?> klass, String name) {
-        return getMetric(Meter.class, klass, name);
+    public Meter getMeter( Class<?> klass, String name ) {
+        return getMetric( Meter.class, klass, name );
     }
 
-    private <T> T getMetric(Class<T> metricClass, Class<?> klass, String name) {
+
+    @Override
+    public void addGauge( final Class<?> clazz, final String name, final Gauge<?> gauge ) {
+
+        this.getRegistry().register( MetricRegistry.name( clazz, name ), gauge );
+    }
+
+
+    private <T> T getMetric( Class<T> metricClass, Class<?> klass, String name ) {
         String key = metricClass.getName() + klass.getName() + name;
-        Metric metric = hashMap.get(key);
-        if (metric == null) {
-            if (metricClass == Histogram.class) {
-                metric = this.getRegistry().histogram(MetricRegistry.name(klass, name));
+        Metric metric = hashMap.get( key );
+        if ( metric == null ) {
+            if ( metricClass == Histogram.class ) {
+                metric = this.getRegistry().histogram( MetricRegistry.name( klass, name ) );
             }
-            if (metricClass == Timer.class) {
-                metric = this.getRegistry().timer(MetricRegistry.name(klass, name));
+            if ( metricClass == Timer.class ) {
+                metric = this.getRegistry().timer( MetricRegistry.name( klass, name ) );
             }
-            if (metricClass == Meter.class) {
-                metric = this.getRegistry().meter(MetricRegistry.name(klass, name));
+            if ( metricClass == Meter.class ) {
+                metric = this.getRegistry().meter( MetricRegistry.name( klass, name ) );
             }
-            if (metricClass == Counter.class) {
-                metric = this.getRegistry().counter(MetricRegistry.name(klass, name));
+            if ( metricClass == Counter.class ) {
+                metric = this.getRegistry().counter( MetricRegistry.name( klass, name ) );
             }
-            hashMap.put(key, metric);
+
+
+            hashMap.put( key, metric );
         }
-        return (T) metric;
+        return ( T ) metric;
+    }
+
+
+    /**
+     *
+     * @param metricClass
+     * @param klass
+     * @param name
+     * @return
+     */
+    private String getKey( Class<?> metricClass, Class<?> klass, String name ) {
+        return metricClass.getName() + klass.getName() + name;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/pom.xml b/stack/corepersistence/queryindex/pom.xml
index af843ad..5f01ee7 100644
--- a/stack/corepersistence/queryindex/pom.xml
+++ b/stack/corepersistence/queryindex/pom.xml
@@ -139,7 +139,13 @@
             <classifier>tests</classifier>
         </dependency>
 
-
+        <dependency>
+            <groupId>${project.parent.groupId}</groupId>
+            <artifactId>queue</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
 
         <dependency>
             <groupId>org.slf4j</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java
index ac7489c..40c7852 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBufferConsumer.java
@@ -23,4 +23,15 @@ package org.apache.usergrid.persistence.index;
  * Classy class class.
  */
 public interface IndexBufferConsumer {
+
+
+    /**
+     * Start the consumer
+     */
+    public void start();
+
+    /**
+     * Stop the consumers
+     */
+    public void stop();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index 445789f..6be8234 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -24,6 +24,8 @@ import org.safehaus.guicyfig.FigSingleton;
 import org.safehaus.guicyfig.GuicyFig;
 import org.safehaus.guicyfig.Key;
 
+import org.apache.usergrid.persistence.index.guice.QueueProvider;
+
 
 @FigSingleton
 public interface IndexFig extends GuicyFig {
@@ -86,6 +88,11 @@ public interface IndexFig extends GuicyFig {
      */
     public static final String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count";
 
+    /**
+     * The queue implementation to use.  Values come from <class>QueueProvider.Implementations</class>
+     */
+    public static final String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl";
+
     public static final String QUERY_LIMIT_DEFAULT = "index.query.limit.default";
 
     @Default( "127.0.0.1" )
@@ -190,4 +197,8 @@ public interface IndexFig extends GuicyFig {
     @Key( ELASTICSEARCH_WORKER_COUNT )
     int getWorkerCount();
 
+    @Default( "LOCAL" )
+    @Key( ELASTICSEARCH_QUEUE_IMPL )
+    String getQueueImplementation();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
index 33b68cd..5686e26 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
@@ -128,6 +128,11 @@ public class IndexOperationMessage implements Serializable {
     }
 
     public void done() {
-        getFuture().done();
+        //if this has been serialized, it could be null. don't NPE if it is, there's nothing to ack
+        final BetterFuture<IndexOperationMessage> future = getFuture();
+
+        if(future != null ){
+            future.done();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index b03e1c0..95f3bd4 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -23,6 +23,7 @@ import org.apache.usergrid.persistence.index.*;
 import com.google.inject.AbstractModule;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 
+import org.apache.usergrid.persistence.index.impl.BufferQueue;
 import org.apache.usergrid.persistence.index.impl.EsEntityIndexFactoryImpl;
 import org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl;
 import org.apache.usergrid.persistence.index.impl.EsIndexBufferConsumerImpl;
@@ -33,7 +34,7 @@ import org.apache.usergrid.persistence.queue.guice.QueueModule;
 import org.safehaus.guicyfig.GuicyFigModule;
 
 
-public abstract class IndexModule extends AbstractModule {
+public class IndexModule extends AbstractModule {
 
     @Override
     protected void configure() {
@@ -50,14 +51,10 @@ public abstract class IndexModule extends AbstractModule {
         bind(IndexBufferProducer.class).to(EsIndexBufferProducerImpl.class);
         bind(IndexBufferConsumer.class).to(EsIndexBufferConsumerImpl.class).asEagerSingleton();
 
-        wireBufferQueue();
-    }
 
+        bind( BufferQueue.class).toProvider( QueueProvider.class );
+    }
 
-    /**
-     * Write the <class>BufferQueue</class> for this implementation
-     */
-    public abstract void wireBufferQueue();
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
index ffc3b90..76b49c2 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
@@ -32,25 +32,37 @@ import org.apache.usergrid.persistence.index.IndexOperationMessage;
 public interface BufferQueue {
 
     /**
-     * Offer the indexoperation message
+     * Offer the indexoperation message.  Some queues may support not returning the future until ack or fail.
+     * Other queues may return the future after ack on the offer.  See the implementation documentation for details.
      * @param operation
      */
     public void offer(final IndexOperationMessage operation);
 
 
     /**
-     * Perform a take, potentially blocking.  Until takesize is available, or timeout has elapsed
+     * Perform a take, potentially blocking until up to takesize is available, or timeout has elapsed.
+     * May return less than the take size, but will never return null
+     *
      * @param takeSize
      * @param timeout
      * @param timeUnit
-     * @return
+     * @return A null safe lid
      */
     public List<IndexOperationMessage> take(final int takeSize, final long timeout, final TimeUnit timeUnit );
 
 
     /**
-     * Ack all messages so they do not appear again.  Meant for transactional queues, and may or may not be implemented
+     * Ack all messages so they do not appear again.  Meant for transactional queues, and may or may not be implemented.
+     * This will set the future as done in in memory operations
+     *
      * @param messages
      */
-    public void ack(List<IndexOperationMessage> messages);
+    public void ack(final List<IndexOperationMessage> messages);
+
+    /**
+     * Mark these message as failed.  Set the exception in the future on local operation
+     *
+     * @param messages
+     */
+    public void fail(final List<IndexOperationMessage> messages, final Throwable t);
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
index 6716fd1..998c086 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.usergrid.persistence.core.future.BetterFuture;
 import org.apache.usergrid.persistence.index.IndexFig;
 import org.apache.usergrid.persistence.index.IndexOperationMessage;
 
@@ -47,7 +48,6 @@ public class BufferQueueInMemoryImpl implements BufferQueue {
     @Override
     public void offer( final IndexOperationMessage operation ) {
         messages.offer( operation );
-        operation.done();
     }
 
 
@@ -55,30 +55,30 @@ public class BufferQueueInMemoryImpl implements BufferQueue {
     public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) {
 
         final List<IndexOperationMessage> response = new ArrayList<>( takeSize );
+        try {
 
-        final long endTime = System.currentTimeMillis() + timeUnit.toMillis( timeout );
 
-        //loop until we're we're full or we time out
-        do {
-            try {
+            messages.drainTo( response, takeSize );
 
-                final long remaining = endTime - System.currentTimeMillis();
+            //we got something, go process it
+            if ( response.size() > 0 ) {
+                return response;
+            }
 
-                //we received 1, try to drain
-                IndexOperationMessage polled = messages.poll( remaining, timeUnit );
 
-                //drain
-                if ( polled != null ) {
-                    response.add( polled );
-                    messages.drainTo( response, takeSize - response.size() );
-                }
-            }
-            catch ( InterruptedException ie ) {
-                //swallow
+            final IndexOperationMessage polled = messages.poll( timeout, timeUnit );
 
+            if ( polled != null ) {
+                response.add( polled );
+
+                //try to add more
+                messages.drainTo( response, takeSize - 1 );
             }
         }
-        while ( response.size() < takeSize && System.currentTimeMillis() < endTime );
+        catch ( InterruptedException e ) {
+            //swallow
+        }
+
 
         return response;
     }
@@ -86,6 +86,23 @@ public class BufferQueueInMemoryImpl implements BufferQueue {
 
     @Override
     public void ack( final List<IndexOperationMessage> messages ) {
-         //no op for this
+        //if we have a future ack it
+        for ( final IndexOperationMessage op : messages ) {
+            op.done();
+        }
+    }
+
+
+    @Override
+    public void fail( final List<IndexOperationMessage> messages, final Throwable t ) {
+
+
+        for ( final IndexOperationMessage op : messages ) {
+            final BetterFuture<IndexOperationMessage> future = op.getFuture();
+
+            if ( future != null ) {
+                future.setError( t );
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
index 4a07704..b8d162b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
@@ -101,7 +101,7 @@ public class BufferQueueSQSImpl implements BufferQueue {
     public BufferQueueSQSImpl( final QueueManagerFactory queueManagerFactory, final IndexFig indexFig,
                                final MapManagerFactory mapManagerFactory, final MetricsFactory metricsFactory ) {
         final QueueScope queueScope =
-            new QueueScopeImpl( new SimpleId( MANAGEMENT_APPLICATION_ID, "application" ), QUEUE_NAME );
+            new QueueScopeImpl( QUEUE_NAME );
 
         this.queue = queueManagerFactory.getQueueManager( queueScope );
         this.indexFig = indexFig;
@@ -260,6 +260,12 @@ public class BufferQueueSQSImpl implements BufferQueue {
     }
 
 
+    @Override
+    public void fail( final List<IndexOperationMessage> messages, final Throwable t ) {
+        //no op, just let it retry after the queue timeout
+    }
+
+
     /** Read the object from Base64 string. */
     private IndexOperationMessage fromString( String s ) throws IOException {
         IndexOperationMessage o = mapper.readValue( s, IndexOperationMessage.class );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index fa50734..8be044f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -47,6 +47,8 @@ import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import org.elasticsearch.action.ActionFuture;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ListenableActionFuture;
 import org.elasticsearch.action.ShardOperationFailedException;
@@ -111,8 +113,6 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
     private final Timer addWriteAliasTimer;
     private final Timer addReadAliasTimer;
     private final Timer searchTimer;
-    private final Timer allVersionsTimerFuture;
-    private final Timer deletePreviousTimerFuture;
 
     /**
      * We purposefully make this per instance. Some indexes may work, while others may fail
@@ -127,7 +127,6 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
 
     private final IndexFig config;
 
-    private final MetricsFactory metricsFactory;
 
 
     //number of times to wait for the index to refresh properly.
@@ -148,8 +147,6 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
     private Timer refreshTimer;
     private Timer cursorTimer;
     private Timer getVersionsTimer;
-    private Timer allVersionsTimer;
-    private Timer deletePreviousTimer;
 
     private final MapManager mapManager;
 
@@ -168,11 +165,10 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
         this.esProvider = provider;
         this.config = config;
         this.cursorTimeout = config.getQueryCursorTimeout();
-        this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, appScope);
+        this.indexIdentifier = IndexingUtils.createIndexIdentifier( config, appScope );
         this.alias = indexIdentifier.getAlias();
         this.failureMonitor = new FailureMonitorImpl( config, provider );
         this.aliasCache = indexCache;
-        this.metricsFactory = metricsFactory;
         this.addTimer = metricsFactory
             .getTimer( EsEntityIndexImpl.class, "es.entity.index.add.index.timer" );
         this.removeAliasTimer = metricsFactory
@@ -191,14 +187,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
             .getTimer( EsEntityIndexImpl.class, "es.entity.index.search.cursor.timer" );
         this.getVersionsTimer =metricsFactory
             .getTimer( EsEntityIndexImpl.class, "es.entity.index.get.versions.timer" );
-        this.allVersionsTimer =  metricsFactory
-            .getTimer( EsEntityIndexImpl.class, "es.entity.index.delete.all.versions.timer" );
-        this.deletePreviousTimer = metricsFactory
-            .getTimer( EsEntityIndexImpl.class, "es.entity.index.delete.previous.versions.timer" );
-        this.allVersionsTimerFuture =  metricsFactory
-            .getTimer( EsEntityIndexImpl.class, "es.entity.index.delete.all.versions.timer.future" );
-        this.deletePreviousTimerFuture = metricsFactory
-            .getTimer( EsEntityIndexImpl.class, "es.entity.index.delete.previous.versions.timer.future" );
+
 
         final MapScope mapScope = new MapScopeImpl( appScope.getApplication(), "cursorcache" );
 
@@ -394,8 +383,8 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
      */
     private void createMappings(final String indexName) throws IOException {
 
-        XContentBuilder xcb = IndexingUtils.createDoubleStringIndexMapping(
-            XContentFactory.jsonBuilder(), DEFAULT_TYPE );
+        XContentBuilder xcb = IndexingUtils.createDoubleStringIndexMapping( XContentFactory.jsonBuilder(),
+            DEFAULT_TYPE );
 
 
         //Added For Graphite Metrics
@@ -421,7 +410,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
     public CandidateResults search(final IndexScope indexScope, final SearchTypes searchTypes,
             final Query query ) {
 
-        final String context = IndexingUtils.createContextName(indexScope);
+        final String context = IndexingUtils.createContextName( indexScope );
         final String[] entityTypes = searchTypes.getTypeNames();
 
         QueryBuilder qb = query.createQueryBuilder(context);
@@ -632,7 +621,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
     public int getPendingTasks() {
 
         final PendingClusterTasksResponse tasksResponse = esProvider.getClient().admin()
-                .cluster().pendingClusterTasks(new PendingClusterTasksRequest()).actionGet();
+                .cluster().pendingClusterTasks( new PendingClusterTasksRequest() ).actionGet();
 
         return tasksResponse.pendingTasks().size();
     }
@@ -674,114 +663,6 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
     }
 
 
-//    @Override
-//    public ListenableActionFuture deleteAllVersionsOfEntity(final Id entityId ) {
-//        String idString = IndexingUtils.idString(entityId).toLowerCase();
-//
-//        final TermQueryBuilder tqb = QueryBuilders.termQuery(ENTITYID_ID_FIELDNAME, idString);
-//
-//        //Added For Graphite Metrics
-//        final Timer.Context timeDeleteAllVersions =allVersionsTimer.time();
-//        final Timer.Context timeDeleteAllVersionsFuture = allVersionsTimerFuture.time();
-//
-//        final ListenableActionFuture<DeleteByQueryResponse> response = esProvider.getClient()
-//            .prepareDeleteByQuery( alias.getWriteAlias() ).setQuery( tqb ).execute();
-//
-//        response.addListener( new ActionListener<DeleteByQueryResponse>() {
-//
-//            @Override
-//            public void onResponse( DeleteByQueryResponse response) {
-//                timeDeleteAllVersions.stop();
-//                logger
-//                    .debug( "Deleted entity {}:{} from all index scopes with response status = {}", entityId.getType(),
-//                        entityId.getUuid(), response.status().toString() );
-//
-//                checkDeleteByQueryResponse(tqb, response);
-//            }
-//
-//
-//            @Override
-//            public void onFailure( Throwable e ) {
-//                timeDeleteAllVersions.stop();
-//                logger.error( "Deleted entity {}:{} from all index scopes with error {}", entityId.getType(),
-//                    entityId.getUuid(), e);
-//
-//
-//            }
-//        });
-//        timeDeleteAllVersionsFuture.stop();
-//        return response;
-//    }
-//
-//
-//    @Override
-//    public ListenableActionFuture deletePreviousVersions(final Id entityId, final UUID version) {
-//
-//        String idString = IndexingUtils.idString( entityId ).toLowerCase();
-//
-//        final FilteredQueryBuilder fqb = QueryBuilders.filteredQuery(
-//                QueryBuilders.termQuery(ENTITYID_ID_FIELDNAME, idString),
-//            FilterBuilders.rangeFilter(ENTITY_VERSION_FIELDNAME).lt(version.timestamp())
-//        );
-//
-//        //Added For Graphite Metrics
-//        //Checks the time from the execute to the response below
-//        final Timer.Context timeDeletePreviousVersions = deletePreviousTimer.time();
-//        final Timer.Context timeDeletePreviousVersionFuture = deletePreviousTimerFuture.time();
-//        final ListenableActionFuture<DeleteByQueryResponse> response = esProvider.getClient()
-//            .prepareDeleteByQuery(alias.getWriteAlias()).setQuery(fqb).execute();
-//
-//        //Added For Graphite Metrics
-//        response.addListener(new ActionListener<DeleteByQueryResponse>() {
-//            @Override
-//            public void onResponse(DeleteByQueryResponse response) {
-//                timeDeletePreviousVersions.stop();
-//                //error message needs to be retooled so that it describes the entity more throughly
-//                logger
-//                    .debug("Deleted entity {}:{} with version {} from all " + "index scopes with response status = {}",
-//                        entityId.getType(), entityId.getUuid(), version, response.status().toString());
-//
-//                checkDeleteByQueryResponse( fqb, response );
-//            }
-//
-//
-//            @Override
-//            public void onFailure( Throwable e ) {
-//                timeDeletePreviousVersions.stop();
-//                logger.error( "Deleted entity {}:{} from all index scopes with error {}", entityId.getType(),
-//                    entityId.getUuid(), e );
-//            }
-//        } );
-//
-//        timeDeletePreviousVersionFuture.stop();
-//
-//        return response;
-//    }
-
-
-    /**
-     * Validate the response doesn't contain errors, if it does, fail fast at the first error we encounter
-     */
-    private void checkDeleteByQueryResponse(
-            final QueryBuilder query, final DeleteByQueryResponse response ) {
-
-        for ( IndexDeleteByQueryResponse indexDeleteByQueryResponse : response ) {
-            final ShardOperationFailedException[] failures = indexDeleteByQueryResponse.getFailures();
-
-            for ( ShardOperationFailedException failedException : failures ) {
-                logger.error( String.format("Unable to delete by query %s. "
-                        + "Failed with code %d and reason %s on shard %s in index %s",
-                    query.toString(),
-                    failedException.status().getStatus(),
-                    failedException.reason(),
-                        failedException.shardId(),
-                    failedException.index() )
-                );
-            }
-
-        }
-    }
-
 
     /**
      * Completely delete an index.
@@ -856,8 +737,11 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
     public Health getIndexHealth() {
 
         try {
-            ClusterHealthResponse chr = esProvider.getClient().admin().cluster().health(
-                    new ClusterHealthRequest(new String[]{indexIdentifier.getIndex(null)})).get();
+           final ActionFuture<ClusterHealthResponse> future =  esProvider.getClient().admin().cluster().health(
+               new ClusterHealthRequest( new String[] { indexIdentifier.getIndex( null ) } ) );
+
+            //only wait 2 seconds max
+            ClusterHealthResponse chr = future.actionGet( 2000 );
             return Health.valueOf( chr.getStatus().name() );
         }
         catch ( Exception ex ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 836ec3d..5259a26 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -20,31 +20,29 @@
 package org.apache.usergrid.persistence.index.impl;
 
 import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+
+import org.apache.usergrid.persistence.core.future.BetterFuture;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.index.IndexBufferConsumer;
 import org.apache.usergrid.persistence.index.IndexFig;
 import org.apache.usergrid.persistence.index.IndexOperationMessage;
-import org.elasticsearch.action.ActionRequestBuilder;
+
 import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequestBuilder;
-import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
-import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Observable;
 import rx.Subscriber;
 import rx.Subscription;
 import rx.functions.Action1;
-import rx.functions.Action2;
 import rx.functions.Func1;
 import rx.functions.Func2;
 import rx.schedulers.Schedulers;
@@ -52,7 +50,6 @@ import rx.schedulers.Schedulers;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 
@@ -69,6 +66,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
 
     private final Timer flushTimer;
     private final Counter indexSizeCounter;
+    private final Counter indexErrorCounter;
     private final Meter flushMeter;
     private final Timer produceTimer;
     private final BufferQueue bufferQueue;
@@ -80,13 +78,28 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
 
     private Object mutex = new Object();
 
+
+    private AtomicLong inFlight = new AtomicLong(  );
+
     @Inject
     public EsIndexBufferConsumerImpl( final IndexFig config, final EsProvider provider, final MetricsFactory
         metricsFactory, final BufferQueue bufferQueue, final IndexFig indexFig ){
 
-        this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index.buffer.flush");
-        this.flushMeter = metricsFactory.getMeter(EsIndexBufferConsumerImpl.class, "index.buffer.meter");
-        this.indexSizeCounter =  metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "index.buffer.size");
+        this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "buffer.flush");
+        this.flushMeter = metricsFactory.getMeter(EsIndexBufferConsumerImpl.class, "buffer.meter");
+        this.indexSizeCounter =  metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "buffer.size");
+        this.indexErrorCounter =  metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "error.count");
+
+        //wire up the gauge of inflight messages
+        metricsFactory.addGauge( EsIndexBufferConsumerImpl.class, "inflight.meter", new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return inFlight.longValue();
+            }
+        } );
+
+
+
         this.config = config;
         this.failureMonitor = new FailureMonitorImpl(config,provider);
         this.client = provider.getClient();
@@ -130,66 +143,67 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
     private void startWorker(){
         synchronized ( mutex) {
 
-            final AtomicInteger countFail = new AtomicInteger();
+            Observable<List<IndexOperationMessage>> consumer = Observable.create(
+                new Observable.OnSubscribe<List<IndexOperationMessage>>() {
+                    @Override
+                    public void call( final Subscriber<? super List<IndexOperationMessage>> subscriber ) {
 
-            Observable<List<IndexOperationMessage>> consumer = Observable.create( new Observable.OnSubscribe<List<IndexOperationMessage>>() {
-                @Override
-                public void call( final Subscriber<? super List<IndexOperationMessage>> subscriber ) {
+                        //name our thread so it's easy to see
+                        Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
 
-                    //name our thread so it's easy to see
-                    Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
+                        List<IndexOperationMessage> drainList;
+                        do {
+                            try {
 
-                    List<IndexOperationMessage> drainList;
-                    do {
-                        try {
 
+                                Timer.Context timer = produceTimer.time();
 
-                            Timer.Context timer = produceTimer.time();
+                                drainList = bufferQueue
+                                    .take( config.getIndexBufferSize(), config.getIndexBufferTimeout(),
+                                        TimeUnit.MILLISECONDS );
 
-                            drainList = bufferQueue.take( config.getIndexBufferSize(), config.getIndexBufferTimeout(),
-                                TimeUnit.MILLISECONDS );
 
+                                subscriber.onNext( drainList );
 
-                            subscriber.onNext( drainList );
+                                //take since  we're in flight
+                                inFlight.addAndGet( drainList.size() );
 
 
-                            timer.stop();
+                                timer.stop();
+                            }
 
-                            countFail.set( 0 );
-                        }
-                        catch ( EsRejectedExecutionException err ) {
-                            countFail.incrementAndGet();
-                            log.error(
-                                "Elasticsearch rejected our request, sleeping for {} milliseconds before retrying.  " + "Failed {} consecutive times", config.getFailRefreshCount(),
-                                countFail.get() );
+                            catch ( Exception e ) {
+                                final long sleepTime = config.getFailureRetryTime();
 
-                            //es  rejected the exception, sleep and retry in the queue
-                            try {
-                                Thread.sleep( config.getFailureRetryTime() );
-                            }
-                            catch ( InterruptedException e ) {
-                                //swallow
-                            }
-                        }
-                        catch ( Exception e ) {
-                            int count = countFail.incrementAndGet();
-                            log.error( "failed to dequeue", e );
-                            if ( count > 200 ) {
-                                log.error( "Shutting down index drain due to repetitive failures" );
+                                log.error( "Failed to dequeue.  Sleeping for {} milliseconds", sleepTime, e );
+
+                                try {
+                                    Thread.sleep( sleepTime );
+                                }
+                                catch ( InterruptedException ie ) {
+                                    //swallow
+                                }
+
+                                indexErrorCounter.inc();
                             }
                         }
+                        while ( true );
                     }
-                    while ( true );
-                }
-            } ).subscribeOn( Schedulers.newThread() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
+                } ).subscribeOn( Schedulers.newThread() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
                 @Override
                 public void call( List<IndexOperationMessage> containerList ) {
-                    if ( containerList.size() > 0 ) {
-                        flushMeter.mark( containerList.size() );
-                        Timer.Context time = flushTimer.time();
-                        execute( containerList );
-                        time.stop();
+                    if ( containerList.size() == 0 ) {
+                        return;
                     }
+
+                    flushMeter.mark( containerList.size() );
+                    Timer.Context time = flushTimer.time();
+
+
+                    execute( containerList );
+
+                    time.stop();
+
                 }
             } )
                 //ack after we process
@@ -197,6 +211,16 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                     @Override
                     public void call( final List<IndexOperationMessage> indexOperationMessages ) {
                         bufferQueue.ack( indexOperationMessages );
+                        //release  so we know we've done processing
+                        inFlight.addAndGet( -1 * indexOperationMessages.size() );
+                    }
+                } ).doOnError( new Action1<Throwable>() {
+                    @Override
+                    public void call( final Throwable throwable ) {
+
+                        log.error( "An exception occurred when trying to deque and write to elasticsearch.  Ignoring",
+                            throwable );
+                        indexErrorCounter.inc();
                     }
                 } );
 
@@ -236,7 +260,8 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
       //collection all the operations into a single stream
        .reduce( initRequest(), new Func2<BulkRequestBuilder, BatchRequest, BulkRequestBuilder>() {
            @Override
-           public BulkRequestBuilder call( final BulkRequestBuilder bulkRequestBuilder, final BatchRequest batchRequest ) {
+           public BulkRequestBuilder call( final BulkRequestBuilder bulkRequestBuilder,
+                                           final BatchRequest batchRequest ) {
                batchRequest.doOperation( client, bulkRequestBuilder );
 
                return bulkRequestBuilder;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 57c2fab..50b994d 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@ -35,12 +35,6 @@ public class TestIndexModule extends TestModule {
 
         // configure collections and our core astyanax framework
         install( new CollectionModule() );
-        install( new IndexModule() {
-            @Override
-            public void wireBufferQueue() {
-                bind( BufferQueue.class).to( BufferQueueInMemoryImpl.class );
-//                bind( BufferQueue.class).to( BufferQueueSQSImpl.class );
-            }
-        } );
+        install( new IndexModule()  );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
index 6922c15..9a362cb 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
@@ -38,6 +38,7 @@ import org.apache.usergrid.persistence.index.IndexFig;
 import org.apache.usergrid.persistence.index.IndexOperationMessage;
 import org.apache.usergrid.persistence.index.guice.TestIndexModule;
 import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.queue.NoAWSCredsRule;
 import org.apache.usergrid.persistence.queue.QueueManagerFactory;
 import org.apache.usergrid.persistence.queue.impl.UsergridAwsCredentialsProvider;
 
@@ -59,6 +60,10 @@ public class BufferQueueSQSImplTest {
     @Rule
     public MigrationManagerRule migrationManagerRule;
 
+
+    @Rule
+    public NoAWSCredsRule noAwsCredsRule = new NoAWSCredsRule();
+
     @Inject
     public QueueManagerFactory queueManagerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
index c5f3488..a399809 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
@@ -140,7 +140,6 @@ public class EntityConnectionIndexImplTest extends BaseIT {
         personLikesIndex.refresh();
 
 
-        EsTestUtils.waitForTasks(personLikesIndex);
         Thread.sleep( 2000 );
 
         // now, let's search for muffins
@@ -270,8 +269,6 @@ public class EntityConnectionIndexImplTest extends BaseIT {
         batch.execute().get();
         personLikesIndex.refresh();
 
-        EsTestUtils.waitForTasks( personLikesIndex );
-        Thread.sleep( 2000 );
 
         // now, let's search for muffins
         CandidateResults likes = personLikesIndex.search( searchScope,

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index a2135a3..ca9bf79 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -94,7 +94,6 @@ public class EntityIndexTest extends BaseIT {
 
         entityIndex.refresh();
 
-        Thread.sleep(100000000);
 
         testQueries( indexScope, searchTypes, entityIndex );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EsTestUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EsTestUtils.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EsTestUtils.java
deleted file mode 100644
index 30f0ed0..0000000
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EsTestUtils.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.index.impl;
-
-
-import org.apache.usergrid.persistence.index.EntityIndex;
-
-
-/**
- * Utilities to make testing ES easier
- */
-public class EsTestUtils {
-
-
-    /**
-     * Checks to see if we have pending tasks in the cluster.  If so waits until they are finished.  Adding
-     * new types can cause lag even after refresh since the type mapping needs applied
-     * @param index
-     */
-    public static void waitForTasks(final EntityIndex index){
-
-        while(index.getPendingTasks() > 0){
-            try {
-                Thread.sleep( 100 );
-            }
-            catch ( InterruptedException e ) {
-                //swallow
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
index 35ed091..854c3e0 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
@@ -540,7 +540,6 @@ public class ManagementServiceImpl implements ManagementService {
 
         em.addToCollection( organizationEntity, "users", new SimpleEntityRef( User.ENTITY_TYPE, user.getUuid() ) );
 
-        em.refreshIndex();
 
         writeUserToken( smf.getManagementAppId(), organizationEntity, encryptionService
                 .plainTextCredentials( generateOAuthSecretKey( AuthPrincipalType.ORGANIZATION ), user.getUuid(),
@@ -557,7 +556,7 @@ public class ManagementServiceImpl implements ManagementService {
 
         startOrganizationActivationFlow( organization );
 
-        em.refreshIndex();
+
 
         return organization;
     }
@@ -1649,7 +1648,7 @@ public class ManagementServiceImpl implements ManagementService {
         properties.put( "appUuid", applicationId );
         Entity appInfo = em.create( applicationId, APPLICATION_INFO, properties );
 
-        em.refreshIndex();
+
 
         writeUserToken( smf.getManagementAppId(), appInfo, encryptionService
                 .plainTextCredentials( generateOAuthSecretKey( AuthPrincipalType.APPLICATION ), null,
@@ -1670,7 +1669,7 @@ public class ManagementServiceImpl implements ManagementService {
                             + ")</a> created a new application named " + applicationName, null );
         }
 
-        em.refreshIndex();
+
 
         return new ApplicationInfo( applicationId, appInfo.getName() );
     }
@@ -2376,7 +2375,7 @@ public class ManagementServiceImpl implements ManagementService {
         if ( sendEmail ) {
             startOrganizationActivationFlow( organization );
         }
-        em.refreshIndex();
+
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d6ae369/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index 5eed002..abd77ee 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -16,39 +16,56 @@
  */
 package org.apache.usergrid.services.notifications;
 
-import java.util.*;
 
-import com.codahale.metrics.*;
-import com.codahale.metrics.Timer;
-import com.google.inject.Injector;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.corepersistence.CpSetup;
 import org.apache.usergrid.mq.Message;
-import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.PathQuery;
+import org.apache.usergrid.persistence.SimpleEntityRef;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.entities.Device;
 import org.apache.usergrid.persistence.entities.Notification;
 import org.apache.usergrid.persistence.entities.Notifier;
 import org.apache.usergrid.persistence.entities.Receipt;
+import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.persistence.index.query.Query;
 import org.apache.usergrid.persistence.queue.QueueManager;
 import org.apache.usergrid.persistence.queue.QueueManagerFactory;
 import org.apache.usergrid.persistence.queue.QueueScope;
-import org.apache.usergrid.persistence.queue.QueueScopeFactory;
-import org.apache.usergrid.services.*;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
+import org.apache.usergrid.services.AbstractCollectionService;
+import org.apache.usergrid.services.ServiceAction;
+import org.apache.usergrid.services.ServiceContext;
+import org.apache.usergrid.services.ServiceInfo;
+import org.apache.usergrid.services.ServiceManagerFactory;
+import org.apache.usergrid.services.ServiceParameter;
+import org.apache.usergrid.services.ServicePayload;
+import org.apache.usergrid.services.ServiceRequest;
+import org.apache.usergrid.services.ServiceResults;
+import org.apache.usergrid.services.exceptions.ForbiddenServiceOperationException;
 import org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.persistence.entities.Device;
-import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
-import org.apache.usergrid.services.exceptions.ForbiddenServiceOperationException;
-import static org.apache.usergrid.utils.InflectionUtils.pluralize;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+import com.google.inject.Injector;
 
 import rx.Observable;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
+import static org.apache.usergrid.utils.InflectionUtils.pluralize;
+
 public class NotificationsService extends AbstractCollectionService {
 
 
@@ -90,9 +107,8 @@ public class NotificationsService extends AbstractCollectionService {
         postMeter = metricsService.getMeter(NotificationsService.class, "requests");
         postTimer = metricsService.getTimer(this.getClass(), "execution_rest");
         JobScheduler jobScheduler = new JobScheduler(sm,em);
-        String name = ApplicationQueueManagerImpl.getQueueNames(props);
-        QueueScopeFactory queueScopeFactory = getApplicationContext().getBean( Injector.class ).getInstance(QueueScopeFactory.class);
-        QueueScope queueScope = queueScopeFactory.getScope(smf.getManagementAppId(), name);
+        String name = ApplicationQueueManagerImpl.getQueueNames( props );
+        QueueScope queueScope = new QueueScopeImpl( name );
         queueManagerFactory = getApplicationContext().getBean( Injector.class ).getInstance(QueueManagerFactory.class);
         QueueManager queueManager = TEST_QUEUE_MANAGER !=null ? TEST_QUEUE_MANAGER : queueManagerFactory.getQueueManager(queueScope);
         notificationQueueManager = new ApplicationQueueManagerImpl(jobScheduler,em,queueManager,metricsService,props);


Mime
View raw message