usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From toddn...@apache.org
Subject [04/15] usergrid git commit: Refactored schedulers to have separate schedulers for different tasks
Date Thu, 12 Nov 2015 22:14:54 GMT
Refactored schedulers to have separate schedulers for different tasks

Also fixes a bug with unique values.  Values are now validated on read to ensure that unique value is still valid.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0e1f0e64
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0e1f0e64
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0e1f0e64

Branch: refs/heads/master
Commit: 0e1f0e64176ce13b579762c42d8358eaa7543f20
Parents: 7da99c7
Author: Todd Nine <tnine@apigee.com>
Authored: Wed Nov 11 16:55:00 2015 -0700
Committer: Todd Nine <tnine@apigee.com>
Committed: Thu Nov 12 11:27:16 2015 -0700

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    | 130 ++++++++++++++-----
 .../asyncevents/AmazonAsyncEventService.java    |   1 +
 .../asyncevents/AsyncEventsSchedulerFig.java    |  94 ++++++++++++++
 .../asyncevents/AsyncIndexProvider.java         |   2 +-
 .../asyncevents/EventExecutionScheduler.java    |  37 ++++++
 .../traverse/ReadGraphCollectionFilter.java     |   3 +-
 .../traverse/ReadGraphConnectionFilter.java     |   3 +-
 .../corepersistence/rx/impl/AsyncRepair.java    |  38 ++++++
 .../corepersistence/rx/impl/ImportRepair.java   |  38 ++++++
 .../service/ServiceSchedulerFig.java            |  48 +++++++
 .../collection/guice/CollectionModule.java      |  30 +++++
 .../guice/CollectionTaskExecutor.java           |  35 -----
 .../EntityCollectionManagerFactoryImpl.java     |   5 +-
 .../impl/EntityCollectionManagerImpl.java       |  16 ++-
 .../mvcc/stage/write/WriteCommit.java           |   4 +-
 .../scheduler/CollectionExecutorScheduler.java  |  52 ++++++++
 .../scheduler/CollectionSchedulerFig.java       |  53 ++++++++
 .../collection/EntityCollectionManagerIT.java   |  53 +++++---
 .../core/executor/TaskExecutorFactory.java      | 101 ++++++++++----
 .../persistence/core/guice/CommonModule.java    |  25 ++--
 .../persistence/core/rx/RxSchedulerFig.java     |  71 ----------
 .../core/rx/RxTaskSchedulerImpl.java            |  81 +-----------
 .../usergrid/services/AbstractService.java      |  12 +-
 23 files changed, 639 insertions(+), 293 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 959edec..09db151 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -16,12 +16,16 @@
 package org.apache.usergrid.corepersistence;
 
 
+import java.util.concurrent.ThreadPoolExecutor;
+
 import org.safehaus.guicyfig.GuicyFigModule;
 
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventsSchedulerFig;
 import org.apache.usergrid.corepersistence.asyncevents.AsyncIndexProvider;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
+import org.apache.usergrid.corepersistence.asyncevents.EventExecutionScheduler;
 import org.apache.usergrid.corepersistence.index.ApplicationIndexBucketLocator;
 import org.apache.usergrid.corepersistence.index.CoreIndexFig;
 import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
@@ -42,6 +46,8 @@ import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemImpl;
 import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
 import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservableImpl;
 import org.apache.usergrid.corepersistence.rx.impl.AllNodesInGraphImpl;
+import org.apache.usergrid.corepersistence.rx.impl.AsyncRepair;
+import org.apache.usergrid.corepersistence.rx.impl.ImportRepair;
 import org.apache.usergrid.corepersistence.service.AggregationService;
 import org.apache.usergrid.corepersistence.service.AggregationServiceFactory;
 import org.apache.usergrid.corepersistence.service.AggregationServiceImpl;
@@ -51,20 +57,26 @@ import org.apache.usergrid.corepersistence.service.CollectionService;
 import org.apache.usergrid.corepersistence.service.CollectionServiceImpl;
 import org.apache.usergrid.corepersistence.service.ConnectionService;
 import org.apache.usergrid.corepersistence.service.ConnectionServiceImpl;
+import org.apache.usergrid.corepersistence.service.ServiceSchedulerFig;
 import org.apache.usergrid.corepersistence.service.StatusService;
 import org.apache.usergrid.corepersistence.service.StatusServiceImpl;
 import org.apache.usergrid.persistence.collection.guice.CollectionModule;
 import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
 import org.apache.usergrid.persistence.core.guice.CommonModule;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider;
 import org.apache.usergrid.persistence.core.migration.data.MigrationPlugin;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.rx.RxTaskSchedulerImpl;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.guice.GraphModule;
 import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphNode;
 import org.apache.usergrid.persistence.index.guice.IndexModule;
 
 import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Provides;
 import com.google.inject.TypeLiteral;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.multibindings.Multibinder;
@@ -73,25 +85,22 @@ import com.google.inject.multibindings.Multibinder;
 /**
  * Guice Module that encapsulates Core Persistence.
  */
-public class CoreModule  extends AbstractModule {
-
-
+public class CoreModule extends AbstractModule {
 
 
     @Override
     protected void configure() {
 
-        install( new CommonModule());
+        install( new CommonModule() );
         install( new CollectionModule() {
             /**
              * configure our migration data provider for all entities in the system
              */
             @Override
-           public void configureMigrationProvider() {
+            public void configureMigrationProvider() {
 
-                bind(new TypeLiteral<MigrationDataProvider<EntityIdScope>>(){}).to(
-                    AllEntitiesInSystemImpl.class );
-           }
+                bind( new TypeLiteral<MigrationDataProvider<EntityIdScope>>() {} ).to( AllEntitiesInSystemImpl.class );
+            }
         } );
         install( new GraphModule() {
 
@@ -100,30 +109,28 @@ public class CoreModule  extends AbstractModule {
              */
             @Override
             public void configureMigrationProvider() {
-                bind( new TypeLiteral<MigrationDataProvider<GraphNode>>() {} ).to(
-                    AllNodesInGraphImpl.class );
+                bind( new TypeLiteral<MigrationDataProvider<GraphNode>>() {} ).to( AllNodesInGraphImpl.class );
             }
         } );
-        install(new IndexModule(){
+        install( new IndexModule() {
             @Override
             public void configureMigrationProvider() {
-                bind( new TypeLiteral<MigrationDataProvider<ApplicationScope>>() {} ).to(
-                    AllApplicationsObservableImpl.class );
+                bind( new TypeLiteral<MigrationDataProvider<ApplicationScope>>() {} )
+                    .to( AllApplicationsObservableImpl.class );
             }
-        });
-       //        install(new MapModule());   TODO, re-enable when index module doesn't depend on queue
-       //        install(new QueueModule());
+        } );
+        //        install(new MapModule());   TODO, re-enable when index module doesn't depend on queue
+        //        install(new QueueModule());
 
-        bind(ManagerCache.class).to( CpManagerCache.class );
-        bind(ApplicationIdCacheFactory.class);
+        bind( ManagerCache.class ).to( CpManagerCache.class );
+        bind( ApplicationIdCacheFactory.class );
 
 
         /**
          * Create our migrations for within our core plugin
          */
         Multibinder<DataMigration> dataMigrationMultibinder =
-                    Multibinder.newSetBinder( binder(),
-                        new TypeLiteral<DataMigration>() {}, CoreMigration.class );
+            Multibinder.newSetBinder( binder(), new TypeLiteral<DataMigration>() {}, CoreMigration.class );
 
 
         dataMigrationMultibinder.addBinding().to( DeDupConnectionDataMigration.class );
@@ -135,7 +142,7 @@ public class CoreModule  extends AbstractModule {
         plugins.addBinding().to( MigrationModuleVersionPlugin.class );
 
         bind( AllApplicationsObservable.class ).to( AllApplicationsObservableImpl.class );
-        bind( AllEntityIdsObservable.class).to( AllEntityIdsObservableImpl.class );
+        bind( AllEntityIdsObservable.class ).to( AllEntityIdsObservableImpl.class );
 
 
         /*****
@@ -143,50 +150,103 @@ public class CoreModule  extends AbstractModule {
          *****/
 
 
-        bind( IndexService.class ).to(IndexServiceImpl.class);
+        bind( IndexService.class ).to( IndexServiceImpl.class );
 
         //bind the event handlers
-        bind( EventBuilder.class).to( EventBuilderImpl.class );
-        bind(ApplicationIndexBucketLocator.class);
+        bind( EventBuilder.class ).to( EventBuilderImpl.class );
+        bind( ApplicationIndexBucketLocator.class );
 
         //bind the queue provider
         bind( AsyncEventService.class ).toProvider( AsyncIndexProvider.class );
 
 
-        bind( ReIndexService.class).to(ReIndexServiceImpl.class);
+        bind( ReIndexService.class ).to( ReIndexServiceImpl.class );
 
-        install(new FactoryModuleBuilder()
-            .implement(AggregationService.class, AggregationServiceImpl.class)
-            .build(AggregationServiceFactory.class));
+        install( new FactoryModuleBuilder().implement( AggregationService.class, AggregationServiceImpl.class )
+                                           .build( AggregationServiceFactory.class ) );
 
-        bind(IndexLocationStrategyFactory.class).to( IndexLocationStrategyFactoryImpl.class );
+        bind( IndexLocationStrategyFactory.class ).to( IndexLocationStrategyFactoryImpl.class );
 
-        install(new GuicyFigModule(IndexProcessorFig.class));
-
-        install(new GuicyFigModule(CoreIndexFig.class));
+        install( new GuicyFigModule( IndexProcessorFig.class ) );
 
+        install( new GuicyFigModule( CoreIndexFig.class ) );
 
 
         install( new GuicyFigModule( ApplicationIdCacheFig.class ) );
 
         install( new GuicyFigModule( EntityManagerFig.class ) );
 
+        install( new GuicyFigModule( AsyncEventsSchedulerFig.class ) );
+
+        install( new GuicyFigModule( ServiceSchedulerFig.class ) );
+
         //install our pipeline modules
-        install(new PipelineModule());
+        install( new PipelineModule() );
 
         /**
          * Install our service operations
          */
 
-        bind( CollectionService.class).to( CollectionServiceImpl.class );
+        bind( CollectionService.class ).to( CollectionServiceImpl.class );
 
-        bind( ConnectionService.class).to( ConnectionServiceImpl.class);
+        bind( ConnectionService.class ).to( ConnectionServiceImpl.class );
 
         bind( ApplicationService.class ).to( ApplicationServiceImpl.class );
 
         bind( StatusService.class ).to( StatusServiceImpl.class );
+    }
+
+
+    @Provides
+    @Inject
+    @EventExecutionScheduler
+    public RxTaskScheduler getSqsTaskScheduler( final AsyncEventsSchedulerFig asyncEventsSchedulerFig ) {
+
+        final String poolName = asyncEventsSchedulerFig.getIoSchedulerName();
+        final int threadCount = asyncEventsSchedulerFig.getMaxIoThreads();
+
+
+        final ThreadPoolExecutor executor = TaskExecutorFactory
+            .createTaskExecutor( poolName, threadCount, threadCount, TaskExecutorFactory.RejectionAction.CALLERRUNS );
 
+        final RxTaskScheduler taskScheduler = new RxTaskSchedulerImpl( executor );
 
+        return taskScheduler;
     }
 
+
+    @Provides
+    @Inject
+    @AsyncRepair
+    public RxTaskScheduler getAsyncRepairScheduler( final AsyncEventsSchedulerFig asyncEventsSchedulerFig ) {
+
+        final String poolName = asyncEventsSchedulerFig.getRepairPoolName();
+        final int threadCount = asyncEventsSchedulerFig.getMaxRepairThreads();
+
+
+        final ThreadPoolExecutor executor = TaskExecutorFactory
+            .createTaskExecutor( poolName, threadCount, 1, TaskExecutorFactory.RejectionAction.DROP );
+
+        final RxTaskScheduler taskScheduler = new RxTaskSchedulerImpl( executor );
+
+        return taskScheduler;
+    }
+
+
+    @Provides
+    @Inject
+    @ImportRepair
+    public RxTaskScheduler getImportRepairScheduler( final AsyncEventsSchedulerFig asyncEventsSchedulerFig ) {
+
+        final String poolName = asyncEventsSchedulerFig.getImportSchedulerName();
+        final int threadCount = asyncEventsSchedulerFig.getMaxImportThreads();
+
+
+        final ThreadPoolExecutor executor = TaskExecutorFactory
+            .createTaskExecutor( poolName, threadCount, 1, TaskExecutorFactory.RejectionAction.CALLERRUNS );
+
+        final RxTaskScheduler taskScheduler = new RxTaskSchedulerImpl( executor );
+
+        return taskScheduler;
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 16e119c..24ec51f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -153,6 +153,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
                                     final EventBuilder eventBuilder,
                                     final MapManagerFactory mapManagerFactory,
                                     final QueueFig queueFig,
+                                    @EventExecutionScheduler
                                     final RxTaskScheduler rxTaskScheduler ) {
         this.indexProducer = indexProducer;
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java
new file mode 100644
index 0000000..83eb02e
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+
+/**
+ *
+ */
+@FigSingleton
+public interface AsyncEventsSchedulerFig extends GuicyFig {
+
+
+    /**
+     * Amount of threads to use in async processing
+     */
+    String IO_SCHEDULER_THREADS = "scheduler.io.threads";
+
+
+    /**
+     * Name of pool to use when performing scheduling
+     */
+    String IO_SCHEDULER_NAME = "scheduler.io.poolName";
+
+
+    /**
+     * Amount of threads to use in async processing
+     */
+    String REPAIR_SCHEDULER_THREADS = "repair.io.threads";
+
+
+    /**
+     * Name of pool to use when performing scheduling
+     */
+    String REPAIR_SCHEDULER_NAME = "repair.io.poolName";
+
+
+    /**
+     * Amount of threads to use in async processing
+     */
+    String IMPORT_SCHEDULER_THREADS = "import.io.threads";
+
+
+    /**
+     * Name of pool to use when performing scheduling
+     */
+    String IMPORT_SCHEDULER_NAME = "import.io.poolName";
+
+
+    @Default( "100" )
+    @Key( IO_SCHEDULER_THREADS )
+    int getMaxIoThreads();
+
+    @Default( "Usergrid-SQS-Pool" )
+    @Key( IO_SCHEDULER_NAME )
+    String getIoSchedulerName();
+
+
+    @Default( "20" )
+    @Key( REPAIR_SCHEDULER_THREADS )
+    int getMaxRepairThreads();
+
+    @Default( "Usergrid-Repair-Pool" )
+    @Key( REPAIR_SCHEDULER_NAME )
+    String getRepairPoolName();
+
+    @Default( "100" )
+    @Key( IMPORT_SCHEDULER_THREADS )
+    int getMaxImportThreads();
+
+    @Default( "Usergrid-Import-Pool" )
+    @Key( IMPORT_SCHEDULER_NAME )
+    String getImportSchedulerName();
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 2bace8d..d65cffd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -62,7 +62,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
     public AsyncIndexProvider(final IndexProcessorFig indexProcessorFig,
                               final QueueManagerFactory queueManagerFactory,
                               final MetricsFactory metricsFactory,
-                              final RxTaskScheduler rxTaskScheduler,
+                              @EventExecutionScheduler final RxTaskScheduler rxTaskScheduler,
                               final EntityCollectionManagerFactory entityCollectionManagerFactory,
                               final EventBuilder eventBuilder,
                               final IndexLocationStrategyFactory indexLocationStrategyFactory,

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventExecutionScheduler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventExecutionScheduler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventExecutionScheduler.java
new file mode 100644
index 0000000..ce09aae
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventExecutionScheduler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Label for using the event execution scheduler
+ */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface EventExecutionScheduler {}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
index 3d7df3b..3819659 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
+import org.apache.usergrid.corepersistence.rx.impl.AsyncRepair;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 
@@ -44,7 +45,7 @@ public class ReadGraphCollectionFilter extends AbstractReadGraphFilter {
      */
     @Inject
     public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory,
-                                      final RxTaskScheduler rxTaskScheduler,
+                                      @AsyncRepair final RxTaskScheduler rxTaskScheduler,
                                       final EventBuilder eventBuilder,
                                       final AsyncEventService asyncEventService,
                                       @Assisted final String collectionName ) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
index b2d368b..3c92c03 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
+import org.apache.usergrid.corepersistence.rx.impl.AsyncRepair;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 
@@ -44,7 +45,7 @@ public class ReadGraphConnectionFilter extends AbstractReadGraphFilter {
      */
     @Inject
     public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory,
-                                      final RxTaskScheduler rxTaskScheduler,
+                                      @AsyncRepair final RxTaskScheduler rxTaskScheduler,
                                       final EventBuilder eventBuilder,
                                       final AsyncEventService asyncEventService,
                                       @Assisted final String connectionName ) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AsyncRepair.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AsyncRepair.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AsyncRepair.java
new file mode 100644
index 0000000..aa2cc12
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AsyncRepair.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.rx.impl;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Label for using the async repair scheduler
+ */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface AsyncRepair {
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ImportRepair.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ImportRepair.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ImportRepair.java
new file mode 100644
index 0000000..d65d04c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ImportRepair.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.rx.impl;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Label for using the async repair scheduler
+ */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface ImportRepair {
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java
new file mode 100644
index 0000000..ddaa01c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.service;
+
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+
+/**
+ *
+ */
+@FigSingleton
+public interface ServiceSchedulerFig extends GuicyFig {
+
+
+    /**
+     * The number of threads to use when importing entities into result sets
+     */
+    String SERVICE_IMPORT_THREADS = "service.import.threads";
+
+
+
+    @Default("20")
+    @Key( SERVICE_IMPORT_THREADS)
+    int getImportThreads();
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 78c7f37..0a6e270 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -18,6 +18,8 @@
 package org.apache.usergrid.persistence.collection.guice;
 
 
+import java.util.concurrent.ThreadPoolExecutor;
+
 import org.safehaus.guicyfig.GuicyFigModule;
 
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
@@ -25,11 +27,18 @@ import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
 import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerFactoryImpl;
 import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGenerator;
 import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGeneratorImpl;
+import org.apache.usergrid.persistence.collection.scheduler.CollectionExecutorScheduler;
+import org.apache.usergrid.persistence.collection.scheduler.CollectionSchedulerFig;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;
 import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.rx.RxTaskSchedulerImpl;
 
 import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Provides;
 
 
 /**
@@ -45,6 +54,7 @@ public abstract class CollectionModule extends AbstractModule {
 
         // noinspection unchecked
         install( new GuicyFigModule( SerializationFig.class ) );
+        install( new GuicyFigModule( CollectionSchedulerFig.class ) );
         install( new SerializationModule() );
         install( new ServiceModule() );
 
@@ -62,6 +72,26 @@ public abstract class CollectionModule extends AbstractModule {
     }
 
 
+
+
+    @Provides
+    @Inject
+    @CollectionExecutorScheduler
+    public RxTaskScheduler getRxTaskScheduler( final CollectionSchedulerFig collectionSchedulerFig ){
+
+        final String poolName = collectionSchedulerFig.getIoSchedulerName();
+        final int threadCount = collectionSchedulerFig.getMaxIoThreads();
+
+
+        final ThreadPoolExecutor executor = TaskExecutorFactory.createTaskExecutor( poolName, threadCount, threadCount,
+            TaskExecutorFactory.RejectionAction.CALLERRUNS );
+
+        final RxTaskScheduler taskScheduler = new RxTaskSchedulerImpl(executor  );
+
+        return taskScheduler;
+    }
+
+
     /**
      * Gives callers the ability to to configure an instance of
      *

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionTaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionTaskExecutor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionTaskExecutor.java
deleted file mode 100644
index 53c1f48..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionTaskExecutor.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.apache.usergrid.persistence.collection.guice;/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-
-@BindingAnnotation
-@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-public @interface CollectionTaskExecutor {}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index 45cee06..a52ee9c 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -35,6 +35,7 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
+import org.apache.usergrid.persistence.collection.scheduler.CollectionExecutorScheduler;
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
@@ -74,7 +75,6 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
     private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
     private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
     private final Keyspace keyspace;
-    private final EntityCacheFig entityCacheFig;
     private final MetricsFactory metricsFactory;
     private final RxTaskScheduler rxTaskScheduler;
 
@@ -107,7 +107,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
                                                final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
                                                final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
                                                final Keyspace keyspace, final EntityCacheFig entityCacheFig,
-                                               MetricsFactory metricsFactory, final RxTaskScheduler rxTaskScheduler ) {
+                                               final MetricsFactory metricsFactory, @CollectionExecutorScheduler  final RxTaskScheduler rxTaskScheduler ) {
 
         this.writeStart = writeStart;
         this.writeVerifyUnique = writeVerifyUnique;
@@ -123,7 +123,6 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
         this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
         this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
         this.keyspace = keyspace;
-        this.entityCacheFig = entityCacheFig;
         this.metricsFactory = metricsFactory;
         this.rxTaskScheduler = rxTaskScheduler;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index cb1515c..d6bbdc5 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -63,6 +63,7 @@ import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.field.Field;
+import org.apache.usergrid.persistence.model.util.EntityUtils;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.codahale.metrics.Timer;
@@ -117,7 +118,6 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     private final Timer deleteTimer;
     private final Timer fieldIdTimer;
     private final Timer fieldEntityTimer;
-    private final Timer updateTimer;
     private final Timer loadTimer;
     private final Timer getLatestTimer;
 
@@ -165,7 +165,6 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         this.deleteTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "base.delete");
         this.fieldIdTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "base.fieldId");
         this.fieldEntityTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "base.fieldEntity");
-        this.updateTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "base.update");
         this.loadTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "base.load");
         this.getLatestTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "base.latest");
     }
@@ -188,8 +187,14 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeStart );
 
 
-        final Observable<Entity> write = observable.map( writeCommit ).compose( uniqueCleanup )
-                                                                              //now extract the ioEvent we need to return
+        final Observable<Entity> write = observable.map( writeCommit )
+                                                   .map(ioEvent -> {
+                //fire this in the background so we don't block writes
+                Observable.just( ioEvent ).compose( uniqueCleanup ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
+                return ioEvent;
+            }
+         )
+                                                                              //now extract the ioEvent we need to return and update the version
                                                                               .map( ioEvent -> ioEvent.getEvent().getEntity().get() );
 
         return ObservableTimer.time( write, writeTimer );
@@ -358,7 +363,6 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                         continue;
                     }
 
-
                     //else add it to our result set
                     response.addEntity( expectedUnique.getField(), entity );
                 }
@@ -380,6 +384,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     }
 
 
+
+
     // fire the stages
     public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,
                                                                   WriteStart writeState ) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index 9b1a393..fe3f9a9 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -94,7 +94,9 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect
         final ApplicationScope applicationScope = ioEvent.getEntityCollection();
 
         //set the version into the entity
-        EntityUtils.setVersion( mvccEntity.getEntity().get(), version );
+        final Entity entity = mvccEntity.getEntity().get();
+
+        EntityUtils.setVersion( entity, version );
 
         MvccValidationUtils.verifyMvccEntityWithEntity( ioEvent.getEvent() );
         ValidationUtils.verifyTimeUuid( version ,"version" );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionExecutorScheduler.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionExecutorScheduler.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionExecutorScheduler.java
new file mode 100644
index 0000000..8f8aa00
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionExecutorScheduler.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.scheduler;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface CollectionExecutorScheduler {}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionSchedulerFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionSchedulerFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionSchedulerFig.java
new file mode 100644
index 0000000..daefa9b
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionSchedulerFig.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.scheduler;
+
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+
+/**
+ *
+ */
+@FigSingleton
+public interface CollectionSchedulerFig extends GuicyFig {
+
+
+    /**
+     * Amount of threads to use in async processing
+     */
+    String COLLECTION_SCHEDULER_THREADS = "scheduler.collection.threads";
+
+
+    /**
+     * Name of pool to use when performing scheduling
+     */
+    String COLLECTION_SCHEDULER_NAME = "scheduler.collection.poolName";
+
+
+    @Default( "20" )
+    @Key( COLLECTION_SCHEDULER_THREADS )
+    int getMaxIoThreads();
+
+    @Default( "Usergrid-Collection-Pool" )
+    @Key( COLLECTION_SCHEDULER_NAME )
+    String getIoSchedulerName();
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
index e6c6909..115be99 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
@@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.collection;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 
@@ -58,6 +59,7 @@ import rx.Observable;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -315,18 +317,17 @@ public class EntityCollectionManagerIT {
 
 
     @Test
-    public void writeAndGetField2X() {
-
-
+    public void writeAndGetField2X() throws InterruptedException {
         ApplicationScope collectionScope1 = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
-        Entity newEntity = new Entity( new SimpleId( "test" ) );
-        Field field = new StringField( "testField", "unique", true );
-        newEntity.setField( field );
+        final Id entityId = new SimpleId( "test" );
+        Entity firstInstance = new Entity( entityId  );
+        Field firstField = new StringField( "testField", "unique", true );
+        firstInstance.setField( firstField );
 
         EntityCollectionManager manager = factory.createCollectionManager( collectionScope1 );
 
-        Observable<Entity> observable = manager.write( newEntity );
+        Observable<Entity> observable = manager.write( firstInstance );
 
         Entity createReturned = observable.toBlocking().lastOrDefault( null );
 
@@ -334,21 +335,22 @@ public class EntityCollectionManagerIT {
         assertNotNull( "Id was assigned", createReturned.getId() );
         assertNotNull( "Version was assigned", createReturned.getVersion() );
 
-        Id id = manager.getIdField( newEntity.getId().getType(), field ).toBlocking().lastOrDefault( null );
-        assertNotNull( id );
-        assertEquals( newEntity.getId(), id );
+        final Id existingId = manager.getIdField( firstInstance.getId().getType(), firstField ).toBlocking().lastOrDefault( null );
+        assertNotNull( existingId );
+        assertEquals( firstInstance.getId(), existingId );
 
         Field fieldNull = new StringField( "testFieldNotThere", "uniquely", true );
-        id = manager.getIdField( newEntity.getId().getType(), fieldNull ).toBlocking().lastOrDefault( null );
-        assertNull( id );
+        final Id noId = manager.getIdField( firstInstance.getId().getType(), fieldNull ).toBlocking().lastOrDefault( null );
+        assertNull( noId );
 
 
         //ensure we clean up
 
-        Field fieldSecond = new StringField( "testField", "unique2", true );
-        newEntity.setField( fieldSecond );
+        Entity secondInstance = new Entity( entityId  );
+        Field secondField = new StringField( firstField.getName(), "unique2", true );
+        secondInstance.setField( secondField );
 
-        Observable<Entity> observableSecond = manager.write( newEntity );
+        Observable<Entity> observableSecond = manager.write( secondInstance );
 
         Entity createReturnedSecond = observableSecond.toBlocking().lastOrDefault( null );
 
@@ -356,16 +358,27 @@ public class EntityCollectionManagerIT {
         assertNotNull( "Id was assigned", createReturnedSecond.getId() );
         assertNotNull( "Version was assigned", createReturnedSecond.getVersion() );
 
-        Id idFirst = manager.getIdField( newEntity.getId().getType(), field ).toBlocking().lastOrDefault( null );
+        assertNotEquals( "Versions should not be equal", createReturned.getVersion(), createReturnedSecond.getVersion() );
 
-        assertNull(idFirst);
+        //sanity check, get the entity to ensure it's the right version
 
-        Id idSecond = manager.getIdField( newEntity.getId().getType(), fieldSecond ).toBlocking().lastOrDefault( null );
+        final Entity loadedVersion = manager.load( entityId ).toBlocking().last();
 
-        assertNotNull( idSecond );
-        assertEquals( newEntity.getId(), idSecond );
+        assertEquals(entityId, loadedVersion.getId());
+        assertEquals(createReturnedSecond.getVersion(), loadedVersion.getVersion());
 
+        //give clean time to run.  need to finish the todo below
+        Thread.sleep( 2000 );
 
+        //TODO, we need to implement verify and repair on this
+        final Id idFirst = manager.getIdField( firstInstance.getId().getType(), firstField ).toBlocking().lastOrDefault( null );
+        assertNull(idFirst);
+
+
+        final Id idSecond = manager.getIdField( secondInstance.getId().getType(), secondField ).toBlocking().lastOrDefault( null );
+
+        assertNotNull( idSecond );
+        assertEquals( secondInstance.getId(), idSecond );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
index 3c6a750..bd3d3e9 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
@@ -20,30 +20,45 @@
 package org.apache.usergrid.persistence.core.executor;
 
 
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicLong;
-
 
 /**
  * A task executor that allows you to submit tasks
  */
 public class TaskExecutorFactory {
 
-    private static final Logger log = LoggerFactory.getLogger(TaskExecutorFactory.class);
+    private static final Logger log = LoggerFactory.getLogger( TaskExecutorFactory.class );
+
 
     public enum RejectionAction {
+        /**
+         * If there is no capacity left, throw an exception
+         */
         ABORT,
-        CALLERRUNS
+        /**
+         * If there is no capacity left, the caller runs the callable
+         */
+        CALLERRUNS,
+
+        /**
+         * If there is no capacity left, the request is logged and then silently dropped
+         */
+        DROP
     }
+
+
     /**
      * Create a task executor
-     * @param schedulerName
-     * @param maxThreadCount
-     * @param maxQueueSize
-     * @return
      */
     public static ThreadPoolExecutor createTaskExecutor( final String schedulerName, final int maxThreadCount,
                                                          final int maxQueueSize, RejectionAction rejectionAction ) {
@@ -52,22 +67,22 @@ public class TaskExecutorFactory {
         final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>( maxQueueSize );
 
 
-        if(rejectionAction.equals(RejectionAction.ABORT)){
-
+        if ( rejectionAction == RejectionAction.ABORT ) {
             return new MaxSizeThreadPool( queue, schedulerName, maxThreadCount );
-
         }
-        else if(rejectionAction.equals(RejectionAction.CALLERRUNS)){
+        else if ( rejectionAction == RejectionAction.CALLERRUNS ) {
 
             return new MaxSizeThreadPoolCallerRuns( queue, schedulerName, maxThreadCount );
-
-        }else{
-            //default to the thread pool with ABORT policy
-            return new MaxSizeThreadPool( queue, schedulerName, maxThreadCount );
         }
-
+        else if ( rejectionAction == RejectionAction.DROP ) {
+            return new MaxSizeThreadPoolDrops( queue, schedulerName, maxThreadCount );
+        }
+        else {
+            throw new IllegalArgumentException( "Unable to create a scheduler with the arguments provided" );
+        }
     }
 
+
     /**
      * Create a thread pool that will reject work if our audit tasks become overwhelmed
      */
@@ -78,14 +93,29 @@ public class TaskExecutorFactory {
         }
     }
 
+
     /**
      * Create a thread pool that will implement CallerRunsPolicy if our tasks become overwhelmed
      */
     private static final class MaxSizeThreadPoolCallerRuns extends ThreadPoolExecutor {
 
-        public MaxSizeThreadPoolCallerRuns( final BlockingQueue<Runnable> queue, final String poolName, final int maxPoolSize ) {
-            super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue,
-                new CountingThreadFactory( poolName ), new RejectedHandler(poolName) );
+        public MaxSizeThreadPoolCallerRuns( final BlockingQueue<Runnable> queue, final String poolName,
+                                            final int maxPoolSize ) {
+            super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( poolName ),
+                new CallerRunsHandler( poolName ) );
+        }
+    }
+
+
+    /**
+     * Create a thread pool that will implement CallerRunsPolicy if our tasks become overwhelmed
+     */
+    private static final class MaxSizeThreadPoolDrops extends ThreadPoolExecutor {
+
+        public MaxSizeThreadPoolDrops( final BlockingQueue<Runnable> queue, final String poolName,
+                                       final int maxPoolSize ) {
+            super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( poolName ),
+                new DropHandler( poolName ) );
         }
     }
 
@@ -111,29 +141,50 @@ public class TaskExecutorFactory {
             Thread t = new Thread( r, threadName );
 
             //set it to be a daemon thread so it doesn't block shutdown
-            t.setDaemon(true);
+            t.setDaemon( true );
 
             return t;
         }
     }
 
+
     /**
      * The handler that will handle rejected executions and signal the interface
      */
-    private static final class RejectedHandler implements RejectedExecutionHandler {
+    private static final class CallerRunsHandler implements RejectedExecutionHandler {
 
         private final String poolName;
 
-        private RejectedHandler (final String poolName) {this.poolName = poolName;}
+
+        private CallerRunsHandler( final String poolName ) {this.poolName = poolName;}
+
 
         @Override
         public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
-            log.warn( "{} task queue full, rejecting task {} and running in thread {}", poolName, r, Thread.currentThread().getName() );
+            log.warn( "{} task queue full, rejecting task {} and running in thread {}", poolName, r,
+                Thread.currentThread().getName() );
 
             //We've decided we want to have a "caller runs" policy, to just invoke the task when rejected
 
             r.run();
         }
+    }
+
+
+    /**
+     * The handler that will handle rejected executions and signal the interface
+     */
+    private static final class DropHandler implements RejectedExecutionHandler {
+
+        private final String poolName;
+
 
+        private DropHandler( final String poolName ) {this.poolName = poolName;}
+
+
+        @Override
+        public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
+            log.warn( "{} task queue full, dropping task {}", poolName, r );
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
index b93ba76..75e2b29 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
@@ -19,11 +19,6 @@
 package org.apache.usergrid.persistence.core.guice;
 
 
-import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactoryImpl;
-import org.apache.usergrid.persistence.core.metrics.MetricsFig;
-import org.apache.usergrid.persistence.core.migration.data.*;
 import org.safehaus.guicyfig.GuicyFigModule;
 
 import org.apache.usergrid.persistence.core.astyanax.AstyanaxKeyspaceProvider;
@@ -32,14 +27,21 @@ import org.apache.usergrid.persistence.core.astyanax.CassandraConfigImpl;
 import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.consistency.TimeServiceImpl;
+import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactoryImpl;
+import org.apache.usergrid.persistence.core.metrics.MetricsFig;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManagerImpl;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoCache;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoCacheImpl;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerializationImpl;
 import org.apache.usergrid.persistence.core.migration.data.MigrationPlugin;
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
 import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
 import org.apache.usergrid.persistence.core.migration.schema.MigrationManagerFig;
 import org.apache.usergrid.persistence.core.migration.schema.MigrationManagerImpl;
-import org.apache.usergrid.persistence.core.rx.RxSchedulerFig;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
-import org.apache.usergrid.persistence.core.rx.RxTaskSchedulerImpl;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Key;
@@ -91,16 +93,11 @@ public class CommonModule extends AbstractModule {
         Multibinder.newSetBinder(binder(), MigrationPlugin.class);
 
 
-        /**
-         * RX java scheduler configuration
-         */
-
-        install(new GuicyFigModule(RxSchedulerFig.class));
 
         install(new GuicyFigModule(ClusterFig.class));
         bind(SettingsValidationCluster.class).asEagerSingleton(); //validate props from ClusterFig on startup
 
-        bind(RxTaskScheduler.class).to(RxTaskSchedulerImpl.class);
+
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
deleted file mode 100644
index 4511518..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.core.rx;
-
-
-import org.safehaus.guicyfig.Default;
-import org.safehaus.guicyfig.FigSingleton;
-import org.safehaus.guicyfig.GuicyFig;
-import org.safehaus.guicyfig.Key;
-
-
-/**
- *
- */
-@FigSingleton
-public interface RxSchedulerFig extends GuicyFig {
-
-
-    /**
-     * Amount of time in milliseconds to wait when ES rejects our request before retrying.  Provides simple
-     * backpressure
-     */
-    String IO_SCHEDULER_THREADS = "scheduler.io.threads";
-
-
-    /**
-     * Amount of time in milliseconds to wait when ES rejects our request before retrying.  Provides simple
-     * backpressure
-     */
-    String IO_SCHEDULER_NAME = "scheduler.io.poolName";
-
-    /**
-     * The number of threads to use when importing entities into result sets
-     */
-    String IO_IMPORT_THREADS = "scheduler.import.threads";
-
-
-
-
-    @Default( "100" )
-    @Key( IO_SCHEDULER_THREADS )
-    int getMaxIoThreads();
-
-    @Default( "Usergrid-RxIOPool" )
-    @Key(IO_SCHEDULER_NAME)
-    String getIoSchedulerName();
-
-    @Default("20")
-    @Key( IO_IMPORT_THREADS)
-    int getImportThreads();
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
index dce46cb..261cbeb 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
@@ -20,18 +20,9 @@
 package org.apache.usergrid.persistence.core.rx;
 
 
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
@@ -42,29 +33,17 @@ import rx.schedulers.Schedulers;
 /**
  * An implementation of the task scheduler that allows us to control the number of I/O threads
  */
-@Singleton
 public class RxTaskSchedulerImpl implements RxTaskScheduler {
 
-    private static final Logger log = LoggerFactory.getLogger( RxTaskSchedulerImpl.class );
-
     private final Scheduler scheduler;
-    private final String poolName;
 
     @Inject
-    public RxTaskSchedulerImpl(final RxSchedulerFig schedulerFig){
-
-        this.poolName = schedulerFig.getIoSchedulerName();
-
-        final int poolSize = schedulerFig.getMaxIoThreads();
-
-
-        final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(poolSize);
-
+    public RxTaskSchedulerImpl(final ThreadPoolExecutor executor){
 
-        final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, poolSize );
+        Preconditions.checkNotNull( executor , "executor must not be null");
 
 
-        this.scheduler = Schedulers.from(threadPool);
+        this.scheduler = Schedulers.from(executor);
 
 
     }
@@ -76,56 +55,4 @@ public class RxTaskSchedulerImpl implements RxTaskScheduler {
     }
 
 
-    /**
-     * Create a thread pool that will reject work if our audit tasks become overwhelmed
-     */
-    private final class MaxSizeThreadPool extends ThreadPoolExecutor {
-
-        public MaxSizeThreadPool( final BlockingQueue<Runnable> queue, final int maxPoolSize ) {
-
-            super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( ),  new RejectedHandler() );
-        }
-    }
-
-
-    /**
-     * Thread factory that will name and count threads for easier debugging
-     */
-    private final class CountingThreadFactory implements ThreadFactory {
-
-        private final AtomicLong threadCounter = new AtomicLong();
-
-
-        @Override
-        public Thread newThread( final Runnable r ) {
-            final long newValue = threadCounter.incrementAndGet();
-
-            final String threadName = poolName + "-" + newValue;
-
-            Thread t = new Thread( r, threadName  );
-
-            //set it to be a daemon thread so it doesn't block shutdown
-            t.setDaemon( true );
-
-            return t;
-        }
-    }
-
-
-    /**
-     * The handler that will handle rejected executions and signal the interface
-     */
-    private final class RejectedHandler implements RejectedExecutionHandler {
-
-
-        @Override
-        public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
-            log.warn( "{} task queue full, rejecting task {} and running in thread {}", poolName, r, Thread.currentThread().getName() );
-
-            //We've decided we want to have a "caller runs" policy, to just invoke the task when rejected
-
-            r.run();
-        }
-
-    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
index d032589..662370f 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
@@ -27,6 +27,10 @@ import java.util.Set;
 import java.util.UUID;
 
 import com.codahale.metrics.Timer;
+
+import org.apache.usergrid.corepersistence.rx.impl.AsyncRepair;
+import org.apache.usergrid.corepersistence.rx.impl.ImportRepair;
+import org.apache.usergrid.corepersistence.service.ServiceSchedulerFig;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
 import org.slf4j.Logger;
@@ -42,7 +46,6 @@ import org.apache.usergrid.persistence.EntityRef;
 import org.apache.usergrid.persistence.Query;
 import org.apache.usergrid.persistence.Results;
 import org.apache.usergrid.persistence.Schema;
-import org.apache.usergrid.persistence.core.rx.RxSchedulerFig;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.security.shiro.utils.SubjectUtils;
 import org.apache.usergrid.services.ServiceParameter.IdParameter;
@@ -54,6 +57,7 @@ import org.apache.usergrid.services.exceptions.ServiceResourceNotFoundException;
 import org.apache.usergrid.services.exceptions.UnsupportedServiceOperationException;
 
 import com.google.inject.Injector;
+import com.google.inject.Key;
 
 import rx.Observable;
 import rx.Scheduler;
@@ -100,7 +104,7 @@ public abstract class AbstractService implements Service {
     protected Map<String, Object> defaultEntityMetadata;
 
     private Scheduler rxScheduler;
-    private RxSchedulerFig rxSchedulerFig;
+    private ServiceSchedulerFig rxSchedulerFig;
     private MetricsFactory metricsFactory;
     private Timer entityGetTimer;
     private Timer entitiesGetTimer;
@@ -117,8 +121,8 @@ public abstract class AbstractService implements Service {
         this.sm = sm;
         em = sm.getEntityManager();
         final Injector injector = sm.getApplicationContext().getBean( Injector.class );
-        rxScheduler = injector.getInstance( RxTaskScheduler.class ).getAsyncIOScheduler();
-        rxSchedulerFig = injector.getInstance(RxSchedulerFig.class);
+        rxScheduler = injector.getInstance( Key.get(RxTaskScheduler.class, ImportRepair.class)).getAsyncIOScheduler();
+        rxSchedulerFig = injector.getInstance(ServiceSchedulerFig.class );
         metricsFactory = injector.getInstance(MetricsFactory.class);
         this.entityGetTimer = metricsFactory.getTimer(this.getClass(), "importEntity.get");
         this.entitiesGetTimer = metricsFactory.getTimer(this.getClass(), "importEntities.get");


Mime
View raw message