usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [05/27] incubator-usergrid git commit: add comments
Date Tue, 03 Mar 2015 00:21:53 GMT
add comments

removing metrics factory

remove synchronization

moving towards futures

moving towards futures

Adding Batch Buffer


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/090eb1bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/090eb1bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/090eb1bc

Branch: refs/heads/USERGRID-280
Commit: 090eb1bc9884dabee1ab2a4f12b889b1ac84aa6d
Parents: bf63116
Author: Shawn Feldman <sfeldman@apache.org>
Authored: Tue Feb 17 17:10:28 2015 -0800
Committer: Shawn Feldman <sfeldman@apache.org>
Committed: Sat Feb 21 08:39:21 2015 -0500

----------------------------------------------------------------------
 .../batch/service/JobSchedulerService.java      |   2 +-
 .../apache/usergrid/metrics/MetricsFactory.java | 113 ---------
 .../main/resources/usergrid-core-context.xml    |   4 +-
 .../persistence/core/future/BetterFuture.java   |  42 ++++
 .../core/future/ObservableFuture.java           |  42 ++++
 .../persistence/index/EntityIndexBatch.java     |  26 ++-
 .../persistence/index/IndexBatchBuffer.java     |  28 +--
 .../usergrid/persistence/index/IndexFig.java    |  24 +-
 .../index/impl/EsEntityIndexBatchImpl.java      |  38 ++--
 .../index/impl/IndexBatchBufferImpl.java        | 227 ++++++++++---------
 .../persistence/index/impl/EntityIndexTest.java |  52 ++++-
 .../usergrid/services/guice/ServiceModule.java  |   2 -
 .../notifications/NotificationsService.java     |   4 +-
 .../services/notifications/QueueJob.java        |   6 +-
 .../services/notifications/QueueListener.java   |   6 +-
 .../impl/ApplicationQueueManagerImpl.java       |   2 +-
 .../services/queues/ImportQueueListener.java    |   7 +-
 .../usergrid/services/queues/QueueListener.java |   7 +-
 .../resources/usergrid-services-context.xml     |   1 -
 .../apns/NotificationsServiceIT.java            |   2 +-
 .../gcm/NotificationsServiceIT.java             |   4 +-
 21 files changed, 349 insertions(+), 290 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java b/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
index 2ec0be0..5d57ab7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,7 +37,6 @@ import org.apache.usergrid.batch.JobFactory;
 import org.apache.usergrid.batch.JobNotFoundException;
 import org.apache.usergrid.batch.repository.JobAccessor;
 import org.apache.usergrid.batch.repository.JobDescriptor;
-import org.apache.usergrid.metrics.MetricsFactory;
 
 import com.codahale.metrics.Counter;
 import com.codahale.metrics.Timer;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/core/src/main/java/org/apache/usergrid/metrics/MetricsFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/metrics/MetricsFactory.java b/stack/core/src/main/java/org/apache/usergrid/metrics/MetricsFactory.java
deleted file mode 100644
index fccc296..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/metrics/MetricsFactory.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.metrics;
-import com.codahale.metrics.*;
-import com.codahale.metrics.graphite.Graphite;
-import com.codahale.metrics.graphite.GraphiteReporter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.PostConstruct;
-import java.net.InetSocketAddress;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Singleton class to manage metrics.
- * TODO:remove this in favor of cp one
- */
-@Component("metricsFactory")
-public class MetricsFactory implements org.apache.usergrid.persistence.core.metrics.MetricsFactory {
-    @Autowired
-    private Properties properties;
-    public MetricRegistry registry;
-    private GraphiteReporter graphiteReporter;
-    private JmxReporter jmxReporter;
-    private ConcurrentHashMap<String,Metric> hashMap;
-    private static final Logger LOG = LoggerFactory.getLogger(MetricsFactory.class);
-
-    public MetricsFactory() {
-
-    }
-
-    @PostConstruct
-    void init() {
-        registry = new MetricRegistry();
-        String badHost = "false";
-        String metricsHost = properties.getProperty("usergrid.metrics.graphite.host", badHost);
-        if(!metricsHost.equals(badHost)) {
-            Graphite graphite = new Graphite(new InetSocketAddress(metricsHost, 2003));
-            graphiteReporter = GraphiteReporter.forRegistry(registry)
-                    .prefixedWith("notifications")
-                    .convertRatesTo(TimeUnit.SECONDS)
-                    .convertDurationsTo(TimeUnit.MILLISECONDS)
-                    .filter(MetricFilter.ALL)
-                    .build(graphite);
-            graphiteReporter.start(30, TimeUnit.SECONDS);
-        }else {
-            LOG.warn("MetricsService:Logger not started.");
-        }
-        hashMap = new ConcurrentHashMap<String, Metric>();
-
-        jmxReporter = JmxReporter.forRegistry(registry).build();
-        jmxReporter.start();
-    }
-
-    public MetricRegistry getRegistry() {
-        return registry;
-    }
-
-    public Timer getTimer(Class<?> klass, String name) {
-        return getMetric(Timer.class, klass, name);
-    }
-
-    public Histogram getHistogram(Class<?> klass, String name) {
-        return getMetric(Histogram.class, klass, name);
-    }
-
-    public Counter getCounter(Class<?> klass, String name) {
-        return getMetric(Counter.class, klass, name);
-    }
-
-    public Meter getMeter(Class<?> klass, String name) {
-        return getMetric(Meter.class, klass, name);
-    }
-
-    private <T> T getMetric(Class<T> metricClass, Class<?> klass, String name) {
-        String key = metricClass.getName() + klass.getName() + name;
-        Metric metric = hashMap.get(key);
-        if (metric == null) {
-            if (metricClass == Histogram.class) {
-                metric = this.getRegistry().histogram(MetricRegistry.name(klass, name));
-            }
-            if (metricClass == Timer.class) {
-                metric = this.getRegistry().timer(MetricRegistry.name(klass, name));
-            }
-            if (metricClass == Meter.class) {
-                metric = this.getRegistry().meter(MetricRegistry.name(klass, name));
-            }
-            if (metricClass == Counter.class) {
-                metric = this.getRegistry().counter(MetricRegistry.name(klass, name));
-            }
-            hashMap.put(key, metric);
-        }
-        return (T) metric;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/core/src/main/resources/usergrid-core-context.xml
----------------------------------------------------------------------
diff --git a/stack/core/src/main/resources/usergrid-core-context.xml b/stack/core/src/main/resources/usergrid-core-context.xml
index 60f69af..ece6215 100644
--- a/stack/core/src/main/resources/usergrid-core-context.xml
+++ b/stack/core/src/main/resources/usergrid-core-context.xml
@@ -184,7 +184,6 @@
     <bean id="jobSchedulerBackgroundService" class="org.apache.usergrid.batch.service.JobSchedulerService">
       <property name="jobFactory" ref="jobFactory" />
       <property name="jobAccessor" ref="schedulerService" />
-      <property name="metricsFactory" ref="metricsFactory"/>
       <property name="workerSize" value="${usergrid.scheduler.job.workers}" />
       <property name="interval" value="${usergrid.scheduler.job.interval}" />
       <property name="maxFailCount" value="${usergrid.scheduler.job.maxfail}" />
@@ -196,8 +195,7 @@
     </bean>
 
     <bean id="jobFactory" class="org.apache.usergrid.batch.UsergridJobFactory" />
-
-    <bean id="metricsFactory" class="org.apache.usergrid.metrics.MetricsFactory" scope="singleton"/>
+ 
 
     <context:component-scan base-package="org.apache.usergrid.batch.job" />
     <context:annotation-config />

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
new file mode 100644
index 0000000..6146fe8
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.core.future;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+
+/**
+ * Future without the exception nastiness
+ */
+public  class BetterFuture<T>{
+    FutureTask<T> future;
+    public BetterFuture(Callable<T> callable){
+        future = new FutureTask<>(callable);
+    }
+
+    public void done(){
+        future.run();
+    }
+
+    public T get(){
+        try {
+            return future.get();
+        }catch (Exception e){
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java
new file mode 100644
index 0000000..d08dd92
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/ObservableFuture.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.core.future;
+
+import rx.Observable;
+import rx.Subscriber;
+
+/**
+ *
+ */
+public class ObservableFuture<T> implements Observable.OnSubscribe<T> {
+
+    private Subscriber<? super T> subscriber;
+
+    @Override
+    public void call(Subscriber<? super T> subscriber) {
+        this.subscriber = subscriber;
+    }
+
+    public void done(T t){
+        this.subscriber.onCompleted();
+    }
+
+    public void emit(T t){
+        this.subscriber.onNext(t);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
index 68008bf..f5b8abc 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -29,37 +29,41 @@ public interface EntityIndexBatch {
 
     /**
      * Create index for Entity
+     *
      * @param indexScope The scope for the index
-     * @param entity Entity to be indexed.
+     * @param entity     Entity to be indexed.
      */
-    public EntityIndexBatch index( final IndexScope indexScope, final Entity entity );
+    public EntityIndexBatch index(final IndexScope indexScope, final Entity entity);
 
     /**
      * Remove index of entity
-     * @param scope The scope for the entity
+     *
+     * @param scope  The scope for the entity
      * @param entity Entity to be removed from index.
      */
-    public EntityIndexBatch deindex(final IndexScope scope, final Entity entity );
+    public EntityIndexBatch deindex(final IndexScope scope, final Entity entity);
 
     /**
      * Remove index of entity.
-     * @param scope The scope to use for removal
+     *
+     * @param scope  The scope to use for removal
      * @param result CandidateResult to be removed from index.
      */
-    public EntityIndexBatch deindex(final IndexScope scope, final CandidateResult result );
+    public EntityIndexBatch deindex(final IndexScope scope, final CandidateResult result);
 
     /**
      * Remove index of entity.
-     * @param scope The scope to remove
-     * @param id Id to be removed from index.
+     *
+     * @param scope   The scope to remove
+     * @param id      Id to be removed from index.
      * @param version Version to be removed from index.
      */
     public EntityIndexBatch deindex(final IndexScope scope, final Id id, final UUID version);
 
 
-        /**
-         * Execute the batch
-         */
+    /**
+     * Execute the batch
+     */
     public void execute();
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
index 4a384b4..b24d37b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
@@ -16,34 +16,34 @@
  */
 package org.apache.usergrid.persistence.index;
 
+import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.index.impl.IndexBatchBufferImpl;
 import org.elasticsearch.action.delete.DeleteRequestBuilder;
 import org.elasticsearch.action.index.IndexRequestBuilder;
+import rx.Observable;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 
 /**
- * Buffer for index operations,
+ * * Buffer index requests into sets to send,
  */
 public interface IndexBatchBuffer {
 
     /**
-     * flush out buffer and execute
-     */
-    public void flush();
-
-    /**
-     * flush out buffer and execute
-     */
-    public void flushAndRefresh();
-
-    /**
-     * put request into buffer
+     * put request into buffer, retu
+     *
      * @param builder
      */
-    public void put(IndexRequestBuilder builder);
+    public BetterFuture put(IndexRequestBuilder builder);
 
     /**
      * put request into buffer
+     *
      * @param builder
      */
-    public void put(DeleteRequestBuilder builder);
+    public BetterFuture put(DeleteRequestBuilder builder);
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index 36ca588..9bdac36 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -53,13 +53,17 @@ public interface IndexFig extends GuicyFig {
 
     public static final String INDEX_BUFFER_TIMEOUT = "elasticsearch.buffer_size";
 
+    public static final String INDEX_BATCH_SIZE = "elasticsearch.batch_size";
+
+    public static final String INDEX_WRITE_CONSISTENCY_LEVEL = "elasticsearch.write_consistency_level";
+
     /**
      * the number of times we can fail before we refresh the client
      */
     public static final String ELASTICSEARCH_FAIL_REFRESH = "elasticsearch.fail_refresh";
 
     public static final String QUERY_LIMIT_DEFAULT = "index.query.limit.default";
-    
+
     @Default( "127.0.0.1" )
     @Key( ELASTICSEARCH_HOSTS )
     String getHosts();
@@ -68,10 +72,10 @@ public interface IndexFig extends GuicyFig {
     @Key( ELASTICSEARCH_PORT )
     int getPort();
 
-    @Default( "usergrid" ) 
+    @Default( "usergrid" )
     @Key( ELASTICSEARCH_CLUSTER_NAME)
     String getClusterName();
-    
+
     @Default( "usergrid" ) // no underbars allowed
     @Key( ELASTICSEARCH_INDEX_PREFIX )
     String getIndexPrefix();
@@ -79,7 +83,7 @@ public interface IndexFig extends GuicyFig {
     @Default( "alias" ) // no underbars allowed
     @Key( ELASTICSEARCH_ALIAS_POSTFIX )
     String getAliasPostfix();
-    
+
     @Default( "1" ) // TODO: does this timeout get extended on each query?
     @Key( QUERY_CURSOR_TIMEOUT_MINUTES )
     int getQueryCursorTimeout();
@@ -93,7 +97,7 @@ public interface IndexFig extends GuicyFig {
     @Key( QUERY_LIMIT_DEFAULT )
     int getQueryLimitDefault();
 
-    @Default( "false" ) 
+    @Default( "false" )
     @Key( ELASTICSEARCH_FORCE_REFRESH )
     public boolean isForcedRefresh();
 
@@ -117,11 +121,19 @@ public interface IndexFig extends GuicyFig {
     @Default("2")
     int getIndexCacheMaxWorkers();
 
-    @Default("1000")
+    @Default("250")
     @Key( INDEX_BUFFER_TIMEOUT )
     int getIndexBufferTimeout();
 
     @Default("100")
     @Key( INDEX_BUFFER_SIZE )
     int getIndexBufferSize();
+
+    @Default("300")
+    @Key( INDEX_BATCH_SIZE)
+    int getIndexBatchSize();
+
+    @Default("one")
+    @Key( INDEX_WRITE_CONSISTENCY_LEVEL )
+    String getWriteConsistencyLevel();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 1bc21d3..896c038 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -18,15 +18,12 @@
  */
 package org.apache.usergrid.persistence.index.impl;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.util.concurrent.Futures;
+import org.apache.usergrid.persistence.core.future.BetterFuture;
 import org.apache.usergrid.persistence.index.*;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -93,6 +90,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     private final FailureMonitor failureMonitor;
 
     private final AliasedEntityIndex entityIndex;
+    private final ConcurrentLinkedQueue<BetterFuture> promises;
 
 
     public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,final IndexBatchBuffer indexBatchBuffer,
@@ -106,13 +104,12 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, applicationScope);
         this.alias = indexIdentifier.getAlias();
         this.refresh = config.isForcedRefresh();
+        this.promises = new ConcurrentLinkedQueue<>();
     }
 
 
     @Override
     public EntityIndexBatch index( final IndexScope indexScope, final Entity entity ) {
-
-
         IndexValidationUtils.validateIndexScope( indexScope );
         ValidationUtils.verifyEntityWrite( entity );
 
@@ -141,7 +138,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         final String entityType = entity.getId().getType();
         IndexRequestBuilder builder =
                 client.prepareIndex(alias.getWriteAlias(), entityType, indexId).setSource( entityAsMap );
-        indexBatchBuffer.put(builder);
+        promises.add(indexBatchBuffer.put(builder));
         return this;
     }
 
@@ -187,7 +184,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
                    public Object call(String index) {
                        try {
                            DeleteRequestBuilder builder = client.prepareDelete(index, entityType, indexId).setRefresh(refresh);
-                           indexBatchBuffer.put(builder);
+                           promises.add(indexBatchBuffer.put(builder));
                        }catch (Exception e){
                            log.error("failed to deindex",e);
                            throw e;
@@ -220,14 +217,29 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
     @Override
     public void execute() {
-        indexBatchBuffer.flush();
+//        indexBatchBuffer.flush();
+        Observable.from(promises)
+                .doOnNext(new Action1<BetterFuture>() {
+                    @Override
+                    public void call(BetterFuture betterFuture) {
+                        betterFuture.get();
+                    }
+                }).toBlocking().lastOrDefault(null);
+        promises.clear();
     }
 
 
 
     @Override
     public void executeAndRefresh() {
-        indexBatchBuffer.flushAndRefresh();
+//        indexBatchBuffer.flushAndRefresh();
+        Iterator<BetterFuture> iterator = promises.iterator();
+        while(iterator.hasNext()){
+            iterator.next().get();
+        }
+        promises.clear();
+        entityIndex.refresh();
+
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
index 08ed17b..d7e6bf1 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
@@ -20,9 +20,11 @@ import com.codahale.metrics.Counter;
 import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.future.BetterFuture;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.index.IndexBatchBuffer;
 import org.apache.usergrid.persistence.index.IndexFig;
+import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
@@ -34,146 +36,179 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Observable;
 import rx.Subscriber;
-import rx.Subscription;
+import rx.functions.Action0;
 import rx.functions.Action1;
 
-import java.util.Iterator;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 
 /**
- * Classy class class.
+ * Buffer index requests into sets to send.
  */
 @Singleton
 public class IndexBatchBufferImpl implements IndexBatchBuffer {
 
     private static final Logger log = LoggerFactory.getLogger(IndexBatchBufferImpl.class);
+    private final MetricsFactory metricsFactory;
+    private final Counter indexSizeCounter;
     private final Client client;
-    private final FailureMonitor failureMonitor;
+    private final FailureMonitorImpl failureMonitor;
     private final IndexFig config;
-    private final boolean refresh;
-    private final int timeout;
-    private final int bufferSize;
-    private final MetricsFactory metricsFactory;
     private final Timer flushTimer;
-    private final Counter indexSizeCounter;
+    private final ArrayBlockingQueue<RequestBuilderContainer> blockingQueue;
+    private Observable<List<RequestBuilderContainer>> consumer;
     private Producer producer;
-    private Subscription producerObservable;
-    private ArrayBlockingQueue blockingQueue;
 
     @Inject
-    public IndexBatchBufferImpl(final IndexFig config, final EsProvider provider, MetricsFactory metricsFactory){
+    public IndexBatchBufferImpl(final IndexFig config, final EsProvider provider, final MetricsFactory metricsFactory){
         this.metricsFactory = metricsFactory;
-        this.client = provider.getClient();
-        this.failureMonitor = new FailureMonitorImpl( config, provider );
-        this.config = config;
-        this.producer = new Producer();
-        this.refresh = config.isForcedRefresh();
-        this.timeout = config.getIndexBufferTimeout();
-        this.bufferSize = config.getIndexBufferSize();
         this.flushTimer = metricsFactory.getTimer(IndexBatchBuffer.class, "index.buffer.flush");
         this.indexSizeCounter =  metricsFactory.getCounter(IndexBatchBuffer.class, "index.buffer.size");
-        blockingQueue = new ArrayBlockingQueue(500);
-        init();
-    }
-
+        this.config = config;
+        this.blockingQueue = new ArrayBlockingQueue<>(config.getIndexBatchSize());
+        this.failureMonitor = new FailureMonitorImpl(config,provider);
+        this.producer = new Producer();
+        this.client = provider.getClient();
 
+        consumer();
+    }
 
-    private void init() {
-        this.producerObservable = Observable.create(producer)
-                .doOnNext(new Action1<RequestBuilderContainer>() {
-                    @Override
-                    public void call(RequestBuilderContainer container) {
-                        try {
-                            blockingQueue.offer(container, 2500, TimeUnit.MILLISECONDS);
-                        }catch (InterruptedException ie){
-                            throw new RuntimeException(ie);
-                        }
-                    }
-                })
-                .buffer(timeout, TimeUnit.MILLISECONDS, bufferSize)
+    private void consumer() {
+        //batch up sets of some size and send them in batch
+        this.consumer = Observable.create(producer)
+                .buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
                 .doOnNext(new Action1<List<RequestBuilderContainer>>() {
                     @Override
-                    public void call(List<RequestBuilderContainer> builderContainerList) {
+                    public void call(List<RequestBuilderContainer> containerList) {
+                        for (RequestBuilderContainer container : containerList) {
+                            blockingQueue.add(container);
+                        }
                         flushTimer.time();
-                        indexSizeCounter.dec(builderContainerList.size());
-                        execute();
+                        indexSizeCounter.dec(containerList.size());
+                        execute(config.isForcedRefresh());
                     }
-                })
-                .subscribe();
+                });
+        consumer.subscribe();
     }
 
-    public void put(IndexRequestBuilder builder){
+    @Override
+    public BetterFuture<ShardReplicationOperationRequestBuilder> put(IndexRequestBuilder builder){
+        RequestBuilderContainer container = new RequestBuilderContainer(builder);
         metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").inc();
-        producer.put(new RequestBuilderContainer(builder));
+        producer.put(container);
+        return container.getFuture();
     }
 
-    public void put(DeleteRequestBuilder builder){
+    @Override
+    public BetterFuture<ShardReplicationOperationRequestBuilder> put(DeleteRequestBuilder builder){
+        RequestBuilderContainer container = new RequestBuilderContainer(builder);
         metricsFactory.getCounter(IndexBatchBuffer.class,"index.buffer.size").inc();
-        producer.put(new RequestBuilderContainer(builder));
+        producer.put(container);
+        return container.getFuture();
     }
 
-    public void flushAndRefresh(){
-        execute(true);
-    }
-    public void flush(){
-        execute();
-    }
 
-    private void execute(){
-        execute(this.refresh);
-    }
 
+    private static class RequestBuilderContainer{
+        private final ShardReplicationOperationRequestBuilder builder;
+        private final BetterFuture<ShardReplicationOperationRequestBuilder> containerFuture;
+
+        public RequestBuilderContainer(ShardReplicationOperationRequestBuilder builder){
+            final RequestBuilderContainer parent = this;
+            this.builder = builder;
+            this.containerFuture = new BetterFuture<>(new Callable<ShardReplicationOperationRequestBuilder>() {
+                @Override
+                public ShardReplicationOperationRequestBuilder call() throws Exception {
+                    return parent.getBuilder();
+                }
+            });
+        }
+
+        public ShardReplicationOperationRequestBuilder getBuilder(){
+            return builder;
+        }
+        private void done(){
+            containerFuture.done();
+        }
+        public BetterFuture<ShardReplicationOperationRequestBuilder> getFuture(){
+            return containerFuture;
+        }
+    }
     /**
      * Execute the request, check for errors, then re-init the batch for future use
      */
-    private synchronized void execute(boolean refresh ) {
-        try {
-            BulkRequestBuilder bulkRequest = client.prepareBulk();
-            bulkRequest.setRefresh(refresh);
-            int count = bufferSize;
-            while (blockingQueue.size() > 0 && count-- > 0) {
-                RequestBuilderContainer container = (RequestBuilderContainer)blockingQueue.take();
-                ShardReplicationOperationRequestBuilder builder = container.getBuilder();
-                if (builder instanceof IndexRequestBuilder) {
-                    bulkRequest.add((IndexRequestBuilder) builder);
-                }
-                if (builder instanceof DeleteRequestBuilder) {
-                    bulkRequest.add((DeleteRequestBuilder) builder);
-                }
+    private void execute( boolean refresh) {
+
+        if (blockingQueue.size() == 0) {
+            return;
+        }
+
+        BulkRequestBuilder bulkRequest = initRequest(refresh);
+
+        Collection<RequestBuilderContainer> containerCollection = new ArrayList<>(config.getIndexBatchSize());
+        blockingQueue.drainTo(containerCollection);
+        int count = 0;
+        //clear the queue or proceed to buffersize
+        for (RequestBuilderContainer container : containerCollection) {
+
+            ShardReplicationOperationRequestBuilder builder = container.getBuilder();
+            //only handle two types of requests for now, annoyingly there is no base class implementation on BulkRequest
+            if (builder instanceof IndexRequestBuilder) {
+                bulkRequest.add((IndexRequestBuilder) builder);
             }
-            //nothing to do, we haven't added anthing to the index
-            if (bulkRequest.numberOfActions() == 0) {
-                return;
+            if (builder instanceof DeleteRequestBuilder) {
+                bulkRequest.add((DeleteRequestBuilder) builder);
             }
 
-            final BulkResponse responses;
+            if (count++ == config.getIndexBatchSize()) {
+                sendRequest(bulkRequest);
+                bulkRequest = initRequest(refresh);
 
-            try {
-                responses = bulkRequest.execute().actionGet();
-            } catch (Throwable t) {
-                log.error("Unable to communicate with elasticsearch");
-                failureMonitor.fail("Unable to execute batch", t);
-                throw t;
             }
+        }
+        sendRequest(bulkRequest);
+        for (RequestBuilderContainer container : containerCollection) {
+            container.done();
+        }
+    }
 
-            failureMonitor.success();
+    private BulkRequestBuilder initRequest(boolean refresh) {
+        BulkRequestBuilder bulkRequest = client.prepareBulk();
+        bulkRequest.setConsistencyLevel(WriteConsistencyLevel.fromString(config.getWriteConsistencyLevel()));
+        bulkRequest.setRefresh(refresh);
+        return bulkRequest;
+    }
 
-            for (BulkItemResponse response : responses) {
-                if (response.isFailed()) {
-                    throw new RuntimeException("Unable to index documents.  Errors are :"
-                            + response.getFailure().getMessage());
-                }
+    private void sendRequest(BulkRequestBuilder bulkRequest) {
+        //nothing to do, we haven't added anthing to the index
+        if (bulkRequest.numberOfActions() == 0) {
+            return;
+        }
+
+        final BulkResponse responses;
+
+        try {
+            responses = bulkRequest.execute().actionGet();
+        } catch (Throwable t) {
+            log.error("Unable to communicate with elasticsearch");
+            failureMonitor.fail("Unable to execute batch", t);
+            throw t;
+        }
+
+        failureMonitor.success();
+
+        for (BulkItemResponse response : responses) {
+            if (response.isFailed()) {
+                throw new RuntimeException("Unable to index documents.  Errors are :"
+                        + response.getFailure().getMessage());
             }
-        }catch (InterruptedException ie){
-            log.error("Problem taking messages off of queue",ie);
-            throw new RuntimeException(ie);
         }
     }
 
+
     private static class Producer implements Observable.OnSubscribe<RequestBuilderContainer> {
 
         private Subscriber<? super RequestBuilderContainer> subscriber;
@@ -188,16 +223,4 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
         }
     }
 
-    private static class RequestBuilderContainer{
-        private final ShardReplicationOperationRequestBuilder builder;
-
-        public RequestBuilderContainer(ShardReplicationOperationRequestBuilder builder){
-            this.builder = builder;
-        }
-
-        public ShardReplicationOperationRequestBuilder getBuilder(){
-            return builder;
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 736eb9b..6cda9f2 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -21,6 +21,8 @@ package org.apache.usergrid.persistence.index.impl;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.usergrid.persistence.index.*;
 import org.apache.usergrid.persistence.index.query.CandidateResult;
@@ -53,9 +55,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Maps;
 import com.google.inject.Inject;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.*;
 
 
 @RunWith(EsRunner.class)
@@ -64,12 +64,9 @@ public class EntityIndexTest extends BaseIT {
 
     private static final Logger log = LoggerFactory.getLogger( EntityIndexTest.class );
 
-
     @Inject
     public EntityIndexFactory eif;
 
-
-
     @Test
     public void testIndex() throws IOException {
         Id appId = new SimpleId( "application" );
@@ -91,6 +88,49 @@ public class EntityIndexTest extends BaseIT {
     }
 
     @Test
+    public void testIndexThreads() throws IOException {
+        final Id appId = new SimpleId( "application" );
+
+        final ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
+
+        long now = System.currentTimeMillis();
+        final int threads = 1000;
+        final EntityIndex entityIndex = eif.createEntityIndex( applicationScope );
+        final IndexScope indexScope = new IndexScopeImpl(appId, "things");
+        final String entityType = "thing";
+        entityIndex.initializeIndex();
+        final CountDownLatch latch = new CountDownLatch(threads);
+        final AtomicLong failTime=new AtomicLong(0);
+        for(int i=0;i<threads;i++) {
+            Thread thread = new Thread(new Runnable() {
+                public void run() {
+                    try {
+                        insertJsonBlob(entityIndex, entityType, indexScope, "/sample-small.json", 1, 0);
+                        entityIndex.refresh();
+                    } catch (Exception e) {
+                        synchronized (failTime) {
+                            if (failTime.get() == 0) {
+                                failTime.set(System.currentTimeMillis());
+                            }
+                        }
+                        System.out.println(e.toString());
+                        fail("threw exception");
+                    }finally {
+                        latch.countDown();
+                    }
+                }
+            });
+            thread.start();
+        }
+        try {
+            latch.await();
+        }catch (InterruptedException ie){
+            throw new RuntimeException(ie);
+        }
+        assertTrue("system must have failed at "+(failTime.get() - now) ,failTime.get()==0);
+    }
+
+    @Test
     public void testMultipleIndexInitializations(){
         Id appId = new SimpleId( "application" );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/services/src/main/java/org/apache/usergrid/services/guice/ServiceModule.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/guice/ServiceModule.java b/stack/services/src/main/java/org/apache/usergrid/services/guice/ServiceModule.java
index 9ea6142..b61b18c 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/guice/ServiceModule.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/guice/ServiceModule.java
@@ -20,7 +20,6 @@ package org.apache.usergrid.services.guice;
 
 
 import org.apache.usergrid.corepersistence.CoreModule;
-import org.apache.usergrid.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.EntityManagerFactory;
 import org.apache.usergrid.services.ServiceManager;
 import org.apache.usergrid.services.ServiceManagerFactory;
@@ -59,7 +58,6 @@ public class ServiceModule extends AbstractModule {
         //Seems weird, aren't we just binding the factory to the exact same factory when it goes to look for it?
         bind( ServiceManagerFactory.class );
         bind( EntityManagerFactory.class );
-        bind( MetricsFactory.class );
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index 2d2ac1d..5eed002 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -23,9 +23,9 @@ import com.codahale.metrics.Timer;
 import com.google.inject.Injector;
 
 import org.apache.usergrid.corepersistence.CpSetup;
-import org.apache.usergrid.metrics.MetricsFactory;
 import org.apache.usergrid.mq.Message;
 import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.entities.Notification;
 import org.apache.usergrid.persistence.entities.Notifier;
 import org.apache.usergrid.persistence.entities.Receipt;
@@ -86,7 +86,7 @@ public class NotificationsService extends AbstractCollectionService {
         emf = getApplicationContext().getBean(EntityManagerFactory.class);
 
         Properties props = (Properties)getApplicationContext().getBean("properties");
-        metricsService = getApplicationContext().getBean(MetricsFactory.class);
+        metricsService = getApplicationContext().getBean(Injector.class).getInstance(MetricsFactory.class);
         postMeter = metricsService.getMeter(NotificationsService.class, "requests");
         postTimer = metricsService.getTimer(this.getClass(), "execution_rest");
         JobScheduler jobScheduler = new JobScheduler(sm,em);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java
index 86c7b44..6fc7b7a 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java
@@ -21,7 +21,10 @@ import java.util.UUID;
 
 import javax.annotation.PostConstruct;
 
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.entities.Notification;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -29,7 +32,6 @@ import org.springframework.stereotype.Component;
 
 import org.apache.usergrid.batch.JobExecution;
 import org.apache.usergrid.batch.job.OnlyOnceJob;
-import org.apache.usergrid.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.EntityManagerFactory;
 import org.apache.usergrid.persistence.entities.JobData;
@@ -46,7 +48,6 @@ public class QueueJob extends OnlyOnceJob {
 
     private static final Logger logger = LoggerFactory.getLogger( QueueJob.class );
 
-    @Autowired
     private MetricsFactory metricsService;
 
     @Autowired
@@ -68,6 +69,7 @@ public class QueueJob extends OnlyOnceJob {
 
     @PostConstruct
     void init() {
+        metricsService = this.smf.getApplicationContext().getBean( Injector.class ).getInstance(MetricsFactory.class);
         histogram = metricsService.getHistogram( QueueJob.class, "cycle" );
         requests = metricsService.getMeter( QueueJob.class, "requests" );
         execution = metricsService.getTimer( QueueJob.class, "execution" );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 94ebf65..5247a25 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -22,11 +22,11 @@ import com.google.common.cache.*;
 import com.google.inject.Injector;
 
 import org.apache.usergrid.corepersistence.CpSetup;
-import org.apache.usergrid.metrics.MetricsFactory;
 
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.EntityManagerFactory;
 
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.queue.*;
 import org.apache.usergrid.persistence.queue.QueueManager;
 import org.apache.usergrid.services.ServiceManager;
@@ -76,11 +76,11 @@ public class QueueListener  {
     public QueueManager TEST_QUEUE_MANAGER;
     private int consecutiveCallsToRemoveDevices;
 
-    public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, MetricsFactory metricsService, Properties props){
+    public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, Properties props){
         this.queueManagerFactory = smf.getApplicationContext().getBean( Injector.class ).getInstance(QueueManagerFactory.class);
         this.smf = smf;
         this.emf = emf;
-        this.metricsService = metricsService;
+        this.metricsService = smf.getApplicationContext().getBean( Injector.class ).getInstance(MetricsFactory.class);
         this.properties = props;
         this.queueScopeFactory = smf.getApplicationContext().getBean( Injector.class ).getInstance(QueueScopeFactory.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 8cc3450..5b1a6b3 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -20,8 +20,8 @@ import com.clearspring.analytics.hash.MurmurHash;
 import com.clearspring.analytics.stream.frequency.CountMinSketch;
 import com.codahale.metrics.Meter;
 import org.apache.usergrid.batch.JobExecution;
-import org.apache.usergrid.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.entities.Device;
 import org.apache.usergrid.persistence.entities.Notification;
 import org.apache.usergrid.persistence.entities.Notifier;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
index 61805c7..a4b0785 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
@@ -20,13 +20,14 @@ package org.apache.usergrid.services.queues;
 import java.util.List;
 import java.util.Properties;
 
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 
 import org.apache.usergrid.management.importer.ImportService;
 import org.apache.usergrid.management.importer.ImportServiceImpl;
-import org.apache.usergrid.metrics.MetricsFactory;
+
 import org.apache.usergrid.persistence.EntityManagerFactory;
 import org.apache.usergrid.persistence.queue.QueueMessage;
 import org.apache.usergrid.services.ServiceManagerFactory;
@@ -53,8 +54,8 @@ public class ImportQueueListener extends QueueListener {
 
     @Inject
     public ImportQueueListener( final ServiceManagerFactory smf, final EntityManagerFactory emf,
-                                final MetricsFactory metricsService, final Injector injector, final Properties props ) {
-        super( smf, emf, metricsService, injector,  props );
+                                final Injector injector, final Properties props ) {
+        super( smf, emf, injector,  props );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
index 744a8db..8277e86 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
@@ -22,11 +22,11 @@ import com.google.common.cache.*;
 import com.google.inject.Injector;
 
 import org.apache.usergrid.corepersistence.CpSetup;
-import org.apache.usergrid.metrics.MetricsFactory;
 
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.EntityManagerFactory;
 
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.queue.*;
 import org.apache.usergrid.persistence.queue.QueueManager;
 import org.apache.usergrid.services.ServiceManager;
@@ -84,15 +84,14 @@ public abstract class QueueListener  {
      * Initializes the QueueListener.
      * @param smf
      * @param emf
-     * @param metricsService
      * @param props
      */
-    public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, MetricsFactory metricsService, Injector injector, Properties props){
+    public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, Injector injector, Properties props){
         //TODO: change current injectors to use service module instead of CpSetup
         this.queueManagerFactory = injector.getInstance( QueueManagerFactory.class );
         this.smf = smf;
         this.emf = injector.getInstance( EntityManagerFactory.class ); //emf;
-        this.metricsService = metricsService;
+        this.metricsService = injector.getInstance(MetricsFactory.class);
         this.properties = props;
         this.queueScopeFactory = injector.getInstance(QueueScopeFactory.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/services/src/main/resources/usergrid-services-context.xml
----------------------------------------------------------------------
diff --git a/stack/services/src/main/resources/usergrid-services-context.xml b/stack/services/src/main/resources/usergrid-services-context.xml
index 1c2eb63..e97481b 100644
--- a/stack/services/src/main/resources/usergrid-services-context.xml
+++ b/stack/services/src/main/resources/usergrid-services-context.xml
@@ -92,7 +92,6 @@
   <bean id="notificationsQueueListener" class="org.apache.usergrid.services.notifications.QueueListener"
         scope="singleton">
     <constructor-arg name="emf" ref="entityManagerFactory" />
-    <constructor-arg name="metricsService" ref="metricsFactory" />
     <constructor-arg name="props" ref="properties" />
     <constructor-arg name="smf" ref="serviceManagerFactory" />
   </bean>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
index 5fda74b..95418ee 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
@@ -124,7 +124,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
 
         app.getEntityManager().refreshIndex();
 
-        listener = new QueueListener(ns.getServiceManagerFactory(),ns.getEntityManagerFactory(),ns.getMetricsFactory(), new Properties());
+        listener = new QueueListener(ns.getServiceManagerFactory(),ns.getEntityManagerFactory(), new Properties());
         listener.TEST_QUEUE_MANAGER = qm;
         listener.DEFAULT_SLEEP = 200;
         listener.start();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090eb1bc/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
index 8d6fb70..ee00c88 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
@@ -101,7 +101,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         DefaultQueueManager qm = new DefaultQueueManager();
         ns.TEST_QUEUE_MANAGER = qm;
 
-        listener = new QueueListener(ns.getServiceManagerFactory(), ns.getEntityManagerFactory(),ns.getMetricsFactory(), new Properties());
+        listener = new QueueListener(ns.getServiceManagerFactory(), ns.getEntityManagerFactory(), new Properties());
         listener.DEFAULT_SLEEP = 200;
         listener.TEST_QUEUE_MANAGER = qm;
         listener.start();
@@ -186,7 +186,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
         Entity device = app.testRequest(ServiceAction.POST, 1, "users",  user.getUuid(), "devices", device1.getUuid()).getEntity();
         assertEquals(device.getUuid(), device1.getUuid());
 
-        // create and post notification 
+        // create and post notification
         String payload = "Hello, World!";
         Map<String, String> payloads = new HashMap<String, String>(1);
         payloads.put(notifier.getUuid().toString(), payload);


Mime
View raw message