usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From toddn...@apache.org
Subject [4/4] incubator-usergrid git commit: WIP overwrite
Date Wed, 15 Apr 2015 23:22:53 GMT
WIP overwrite


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/85b47ee3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/85b47ee3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/85b47ee3

Branch: refs/heads/USERGRID-541
Commit: 85b47ee30919421dc468211bfad683336098670e
Parents: 0ece847
Author: Todd Nine <tnine@apigee.com>
Authored: Wed Apr 15 17:22:45 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Wed Apr 15 17:22:45 2015 -0600

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    |  43 ++-
 .../corepersistence/CpEntityManager.java        |  42 +--
 .../corepersistence/CpEntityManagerFactory.java |  18 +-
 .../corepersistence/CpManagerCache.java         |   3 +
 .../corepersistence/CpRelationManager.java      |  32 +-
 .../CpRelationManagerFactory.java               |  46 ---
 .../usergrid/corepersistence/GuiceFactory.java  |   4 +-
 .../corepersistence/IndexQueueService.java      |  45 ---
 .../usergrid/corepersistence/IndexService.java  |  50 ---
 .../corepersistence/IndexServiceImpl.java       | 156 ----------
 .../ManagementEntityManagerProvider.java        |  44 +++
 .../events/EntityDeletedHandler.java            |   5 +-
 .../events/EntityVersionCreatedHandler.java     |   8 +-
 .../events/EntityVersionDeletedHandler.java     |  19 +-
 .../corepersistence/index/BufferQueue.java      |  68 +++++
 .../index/BufferQueueInMemoryImpl.java          | 116 +++++++
 .../index/BufferQueueSQSImpl.java               | 306 +++++++++++++++++++
 .../index/IndexQueueService.java                |  45 +++
 .../corepersistence/index/IndexService.java     |  50 +++
 .../corepersistence/index/IndexServiceImpl.java | 148 +++++++++
 .../corepersistence/index/QueryFig.java         |  98 ++++++
 .../corepersistence/index/QueueProvider.java    | 112 +++++++
 .../index/SQSIndexQueueServiceImpl.java         |  35 +++
 .../migration/AppInfoMigrationPlugin.java       |   4 +-
 .../usergrid/persistence/EntityManager.java     |   2 -
 .../corepersistence/TestIndexModule.java        |  47 +++
 .../index/BufferQueueSQSImplTest.java           | 179 +++++++++++
 .../persistence/core/aws/NoAWSCredsRule.java    |  98 ++++++
 .../persistence/index/EntityIndexBatch.java     |   4 +-
 .../usergrid/persistence/index/IndexFig.java    | 104 ++-----
 .../persistence/index/guice/IndexModule.java    |   5 +-
 .../persistence/index/guice/QueueProvider.java  | 116 -------
 .../persistence/index/impl/BufferQueue.java     |  66 ----
 .../index/impl/BufferQueueInMemoryImpl.java     | 115 -------
 .../index/impl/BufferQueueSQSImpl.java          | 306 -------------------
 .../impl/EsApplicationEntityIndexImpl.java      |  12 +-
 .../index/impl/EsEntityIndexBatchImpl.java      |  16 +-
 .../index/impl/EsEntityIndexFactoryImpl.java    |   8 +-
 .../index/impl/EsEntityIndexImpl.java           |  50 +--
 .../index/impl/EsIndexBufferConsumerImpl.java   | 237 ++++++--------
 .../index/impl/EsIndexBufferProducerImpl.java   |  59 ----
 .../index/impl/FailureMonitorImpl.java          |  26 --
 .../index/impl/FlushBufferQueue.java            |  23 ++
 .../index/impl/IndexBufferConsumer.java         |  15 +-
 .../index/impl/IndexBufferProducer.java         |  32 --
 .../persistence/index/impl/IndexIdentifier.java |  46 +++
 .../index/impl/IndexIdentifierImpl.java         | 118 +------
 .../index/impl/IndexOperationMessage.java       | 139 +++++++++
 .../index/impl/IndexRefreshCommandImpl.java     |  12 +-
 .../index/migration/LegacyIndexIdentifier.java  |   4 +-
 .../index/guice/TestIndexModule.java            |  22 +-
 .../index/impl/BufferQueueSQSImplTest.java      | 175 -----------
 .../persistence/queue/NoAWSCredsRule.java       |  98 ------
 .../persistence/queue/QueueManagerTest.java     |   3 +-
 stack/services/pom.xml                          |  50 +--
 .../notifications/NotifiersServiceIT.java       |  19 +-
 56 files changed, 1830 insertions(+), 1873 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/85b47ee3/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 7859ffc..545de43 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -16,6 +16,11 @@
 package org.apache.usergrid.corepersistence;
 
 
+import org.apache.usergrid.corepersistence.index.BufferQueue;
+import org.apache.usergrid.corepersistence.index.IndexService;
+import org.apache.usergrid.corepersistence.index.IndexServiceImpl;
+import org.apache.usergrid.corepersistence.index.QueryFig;
+import org.apache.usergrid.corepersistence.index.QueueProvider;
 import org.apache.usergrid.corepersistence.migration.*;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.safehaus.guicyfig.GuicyFigModule;
@@ -56,25 +61,22 @@ import com.google.inject.multibindings.Multibinder;
  */
 public class CoreModule  extends AbstractModule {
 
-    /**
-     * TODO this is a circular dependency, and should be refactored
-     */
-    private LazyEntityManagerFactoryProvider lazyEntityManagerFactoryProvider;
+
 
     public static final String EVENTS_DISABLED = "corepersistence.events.disabled";
 
 
 
-    public CoreModule( final ApplicationContext context ) {
-        this.lazyEntityManagerFactoryProvider = new LazyEntityManagerFactoryProvider( context );
+    public CoreModule( ) {
+
     }
 
     @Override
     protected void configure() {
 
 
-        //See TODO, this is fugly
-        bind(EntityManagerFactory.class).toProvider( lazyEntityManagerFactoryProvider );
+//        //See TODO, this is fugly
+//        bind(EntityManagerFactory.class).toProvider( lazyEntityManagerFactoryProvider );
 
         install( new CommonModule());
         install( new CollectionModule() {
@@ -141,31 +143,24 @@ public class CoreModule  extends AbstractModule {
         plugins.addBinding().to( AppInfoMigrationPlugin.class );
         plugins.addBinding().to( MigrationModuleVersionPlugin.class );
 
-        bind(AllApplicationsObservable.class).to(AllApplicationsObservableImpl.class);
+        bind( AllApplicationsObservable.class ).to( AllApplicationsObservableImpl.class );
 
-        install(new GuicyFigModule(ApplicationIdCacheFig.class));
-
-    }
 
+        /*****
+         * Indexing service
+         *****/
 
-    /**
-     * TODO, this is a hack workaround due to the guice/spring EMF circular dependency
-     * Once the entity managers have been refactored and moved into guice, remove this dependency.
-     *
-     */
-    public static class LazyEntityManagerFactoryProvider implements Provider<EntityManagerFactory>{
 
-        private final ApplicationContext context;
+        bind(IndexService.class).to(IndexServiceImpl.class);
+        //bind the queue provider
 
+        bind( BufferQueue.class).toProvider( QueueProvider.class );
 
-        public LazyEntityManagerFactoryProvider( final ApplicationContext context ) {this.context = context;}
+        install( new GuicyFigModule( QueryFig.class ) );
 
 
+        install( new GuicyFigModule( ApplicationIdCacheFig.class ) );
 
-        @Override
-        public EntityManagerFactory get() {
-            return this.context.getBean( EntityManagerFactory.class );
-        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/85b47ee3/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index d374e2f..a615a43 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -80,9 +80,6 @@ import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
 import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
 import org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException;
 import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.IndexEdge;
 import org.apache.usergrid.persistence.index.query.CounterResolution;
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.persistence.map.MapManager;
@@ -125,7 +122,6 @@ import static me.prettyprint.hector.api.factory.HFactory.createMutator;
 import static org.apache.commons.lang.StringUtils.capitalize;
 import static org.apache.commons.lang.StringUtils.isBlank;
 import static org.apache.usergrid.corepersistence.util.CpEntityMapUtils.entityToCpEntity;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource;
 import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
 import static org.apache.usergrid.persistence.Schema.COLLECTION_USERS;
 import static org.apache.usergrid.persistence.Schema.DICTIONARY_PERMISSIONS;
@@ -172,7 +168,6 @@ public class CpEntityManager implements EntityManager {
     private UUID applicationId;
     private Application application;
 
-    private CpEntityManagerFactory emf;
 
     private ManagerCache managerCache;
 
@@ -212,29 +207,34 @@ public class CpEntityManager implements EntityManager {
 //    private LoadingCache<EntityScope, org.apache.usergrid.persistence.model.entity.Entity> entityCache;
 
 
-    public CpEntityManager() {
-
-    }
-
-    @Override
-    public void init( EntityManagerFactory emf, UUID applicationId ) {
+    /**
+     * Fugly, make this part of DI
+     * @param cass
+     * @param counterUtils
+     * @param managerCache
+     * @param metricsFactory
+     * @param applicationId
+     */
+    public CpEntityManager(final CassandraService cass, final CounterUtils counterUtils, final ManagerCache managerCache, final MetricsFactory metricsFactory, final UUID applicationId ) {
 
-        Preconditions.checkNotNull( emf, "emf must not be null" );
+        Preconditions.checkNotNull( cass, "cass must not be null" );
+        Preconditions.checkNotNull( counterUtils, "counterUtils must not be null" );
+        Preconditions.checkNotNull( managerCache, "managerCache must not be null" );
         Preconditions.checkNotNull( applicationId, "applicationId must not be null" );
 
-        this.emf = ( CpEntityManagerFactory ) emf;
-        this.managerCache = this.emf.getManagerCache();
+
+        this.managerCache = managerCache;
         this.applicationId = applicationId;
 
         applicationScope = CpNamingUtils.getApplicationScope( applicationId );
 
         ecm =  managerCache.getEntityCollectionManager( applicationScope );
 
-        this.cass = this.emf.getCassandraService();
-        this.counterUtils = this.emf.getCounterUtils();
+        this.cass = cass;
+        this.counterUtils = counterUtils;
 
         //Timer Setup
-        this.metricsFactory = this.emf.getMetricsFactory();
+        this.metricsFactory = metricsFactory;
         this.aggCounterTimer =this.metricsFactory.getTimer( CpEntityManager.class,
             "cp.entity.get.aggregate.counters.timer" );
         this.entCreateTimer =this.metricsFactory.getTimer( CpEntityManager.class, "cp.entity.create.timer" );
@@ -732,10 +732,10 @@ public class CpEntityManager implements EntityManager {
     @Override
     public RelationManager getRelationManager( EntityRef entityRef ) {
         Preconditions.checkNotNull( entityRef, "entityRef cannot be null" );
-        CpRelationManager rmi = CpRelationManagerFactory.get(
-            this, emf, applicationId, entityRef, null, metricsFactory
-        );
-        return rmi;
+
+        CpRelationManager relationManager =
+            new CpRelationManager( metricsFactory, managerCache, this, applicationId, entityRef );
+        return relationManager;
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/85b47ee3/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 9907f91..8741a8c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -156,14 +156,6 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
     }
 
 
-    public ManagerCache getManagerCache() {
-
-        if ( managerCache == null ) {
-            managerCache = injector.getInstance( ManagerCache.class );
-        }
-        return managerCache;
-    }
-
     private Observable<EntityIdScope> getAllEntitiesObservable(){
       return injector.getInstance( Key.get(new TypeLiteral< MigrationDataProvider<EntityIdScope>>(){})).getData();
     }
@@ -184,16 +176,10 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
 
     private EntityManager _getEntityManager( UUID applicationId ) {
 
-        EntityManager em = new CpEntityManager();
-        em.init( this ,applicationId );
-
+        EntityManager em = new CpEntityManager(cassandraService, counterUtils,managerCache, metricsFactory, applicationId );
         return em;
     }
 
-    public MetricsFactory getMetricsFactory(){
-        return metricsFactory;
-    }
-
     @Override
     public Entity createApplicationV2(String organizationName, String name) throws Exception {
         return createApplicationV2( organizationName, name, null );
@@ -746,7 +732,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
     public Health getEntityStoreHealth() {
 
         // could use any collection scope here, does not matter
-        EntityCollectionManager ecm = getManagerCache().getEntityCollectionManager(
+        EntityCollectionManager ecm = managerCache.getEntityCollectionManager(
             new ApplicationScopeImpl( new SimpleId( CpNamingUtils.MANAGEMENT_APPLICATION_ID, "application" ) ) );
 
         return ecm.getHealth();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/85b47ee3/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
index 4cae31e..f4fee0c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
@@ -30,6 +30,9 @@ import org.apache.usergrid.persistence.map.MapScope;
 import com.google.inject.Inject;
 
 
+/**
+ * Cache for managing our other managers.  Now just a delegate.  Needs refactored away
+ */
 public class CpManagerCache implements ManagerCache {
 
     private final EntityCollectionManagerFactory ecmf;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/85b47ee3/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 3ddfb07..3c72b60 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -107,9 +107,6 @@ public class CpRelationManager implements RelationManager {
 
     private static final Logger logger = LoggerFactory.getLogger( CpRelationManager.class );
 
-
-    private CpEntityManagerFactory emf;
-
     private ManagerCache managerCache;
 
     private EntityManager em;
@@ -126,25 +123,18 @@ public class CpRelationManager implements RelationManager {
     private Timer updateCollectionTimer;
 
 
-    public CpRelationManager() {}
-
-
-    public CpRelationManager init( EntityManager em, CpEntityManagerFactory emf, UUID applicationId,
-                                   EntityRef headEntity, IndexBucketLocator indexBucketLocator,
-                                   MetricsFactory metricsFactory ) {
+    public CpRelationManager(final MetricsFactory metricsFactory, final ManagerCache managerCache, final EntityManager em, final UUID applicationId, final EntityRef headEntity ) {
 
         Assert.notNull( em, "Entity manager cannot be null" );
-        Assert.notNull( emf, "Entity manager factory cannot be null" );
         Assert.notNull( applicationId, "Application Id cannot be null" );
         Assert.notNull( headEntity, "Head entity cannot be null" );
         Assert.notNull( headEntity.getUuid(), "Head entity uuid cannot be null" );
         // TODO: this assert should not be failing
         //Assert.notNull( indexBucketLocator, "indexBucketLocator cannot be null" );
         this.em = em;
-        this.emf = emf;
         this.applicationId = applicationId;
         this.headEntity = headEntity;
-        this.managerCache = emf.getManagerCache();
+        this.managerCache = managerCache;
         this.applicationScope = CpNamingUtils.getApplicationScope( applicationId );
 
         this.metricsFactory = metricsFactory;
@@ -165,7 +155,7 @@ public class CpRelationManager implements RelationManager {
         Assert.notNull( cpHeadEntity, String
             .format( "cpHeadEntity cannot be null for entity id %s, app id %s", entityId.getUuid(), applicationId ) );
 
-        return this;
+
     }
 
 
@@ -496,9 +486,10 @@ public class CpRelationManager implements RelationManager {
         //            headEntityScope.getName()});
 
         if ( connectBack && collection != null && collection.getLinkedCollection() != null ) {
-            getRelationManager( itemEntity )
-                .addToCollection( collection.getLinkedCollection(), headEntity, cpHeadEntity, false );
-            getRelationManager( itemEntity ).addToCollection( collection.getLinkedCollection(), headEntity, false );
+            throw new UnsupportedOperationException( "Implement me directly in graph " );
+//            getRelationManager( itemEntity )
+//                .addToCollection( collection.getLinkedCollection(), headEntity, cpHeadEntity, false );
+//            getRelationManager( itemEntity ).addToCollection( collection.getLinkedCollection(), headEntity, false );
         }
 
         return itemEntity;
@@ -557,7 +548,8 @@ public class CpRelationManager implements RelationManager {
             addToCollection( collName, itemEntity );
 
             if ( collection != null && collection.getLinkedCollection() != null ) {
-                getRelationManager( getHeadEntity() ).addToCollection( collection.getLinkedCollection(), itemEntity );
+                throw new UnsupportedOperationException( "Implement me directly in graph " );
+//                getRelationManager( getHeadEntity() ).addToCollection( collection.getLinkedCollection(), itemEntity );
             }
         }
 
@@ -1047,12 +1039,6 @@ public class CpRelationManager implements RelationManager {
     }
 
 
-    private CpRelationManager getRelationManager( EntityRef headEntity ) {
-        CpRelationManager rmi = new CpRelationManager();
-        rmi.init( em, emf, applicationId, headEntity, null, metricsFactory );
-        return rmi;
-    }
-
 
     /** side effect: converts headEntity into an Entity if it is an EntityRef! */
     private Entity getHeadEntity() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/85b47ee3/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManagerFactory.java
deleted file mode 100644
index 4223f37..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManagerFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-package org.apache.usergrid.corepersistence;
-
-import com.google.inject.Inject;
-import org.apache.usergrid.persistence.EntityManager;
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.IndexBucketLocator;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-
-import java.util.UUID;
-
-/**
- * Factory to return and init relation manager instances
- */
-public class CpRelationManagerFactory {
-
-
-    public static CpRelationManager get( EntityManager em,
-                                         CpEntityManagerFactory emf,
-                                         UUID applicationId,
-                                         EntityRef headEntity,
-                                         IndexBucketLocator indexBucketLocator,
-                                         MetricsFactory metricsFactory){
-        CpRelationManager relationManager = new CpRelationManager();
-        relationManager.init(em,emf,applicationId,headEntity,indexBucketLocator,metricsFactory);
-        return relationManager;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/85b47ee3/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java
index 566430f..fe09a38 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java
@@ -128,8 +128,8 @@ public class GuiceFactory implements FactoryBean<Injector>, ApplicationContextAw
             throw new RuntimeException( "Fatal error loading configuration.", e );
         }
 
-        //this is seriously fugly, and needs removed
-        injector = Guice.createInjector( new CoreModule( applicationContext ) );
+        //this is seriously fugly, and needs removed we shouldn't be mixing spring and guice
+        injector = Guice.createInjector( new CoreModule( ) );
 
         return injector;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/85b47ee3/stack/core/src/main/java/org/apache/usergrid/corepersistence/IndexQueueService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/IndexQueueService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/IndexQueueService.java
deleted file mode 100644
index 30bb20d..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/IndexQueueService.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-
-
-/**
- * Low level queue service for indexing entities
- */
-public interface IndexQueueService {
-
-
-    /**
-     * Queue an entity to be index asynchronously
-     * @param applicationScope
-     * @param entityId
-     * @param version
-     */
-    void queueEntityIndex( final ApplicationScope applicationScope, final Id entityId, final UUID version );
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/85b47ee3/stack/core/src/main/java/org/apache/usergrid/corepersistence/IndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/IndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/IndexService.java
deleted file mode 100644
index 84b59e0..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/IndexService.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-
-
-/**
- * Our low level indexing service operations
- */
-public interface IndexService {
-
-
-    /**
-     *  Perform an index update of the entity's state from Cassandra
-     *
-     * @param applicationScope The scope of the entity
-     * @param entity The entity
-     *
-     * @return An observable with the count of every
-     */
-    Observable<Integer> indexEntity( final ApplicationScope applicationScope, final Entity entity );
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/85b47ee3/stack/core/src/main/java/org/apache/usergrid/corepersistence/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/IndexServiceImpl.java
deleted file mode 100644
index 005deb2..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/IndexServiceImpl.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence;
-
-
-import java.util.Collection;
-
-import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.entities.Application;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.IndexEdge;
-import org.apache.usergrid.persistence.index.impl.IndexIdentifierImpl;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.schema.CollectionInfo;
-import org.apache.usergrid.utils.InflectionUtils;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-import rx.Observable;
-import rx.functions.Func1;
-import rx.observables.MathObservable;
-
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeToTarget;
-import static org.apache.usergrid.persistence.Schema.getDefaultSchema;
-
-
-/**
- * Implementation of the indexing service
- */
-@Singleton
-public class IndexServiceImpl implements IndexService {
-
-    private final GraphManagerFactory graphManagerFactory;
-    private final EntityIndexFactory entityIndexFactory;
-    private final EdgesObservable edgesObservable;
-    private final SerializationFig serializationFig;
-
-
-    @Inject
-    public IndexServiceImpl( final GraphManagerFactory graphManagerFactory, final EntityIndexFactory entityIndexFactory,
-                             final EdgesObservable edgesObservable, final SerializationFig serializationFig ) {
-        this.graphManagerFactory = graphManagerFactory;
-        this.entityIndexFactory = entityIndexFactory;
-        this.edgesObservable = edgesObservable;
-        this.serializationFig = serializationFig;
-    }
-
-
-    @Override
-    public Observable<Integer> indexEntity( final ApplicationScope applicationScope, final Entity entity ) {
-
-
-        final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
-
-        // loop through all types of edge to target
-
-
-        final ApplicationEntityIndex ei = entityIndexFactory.createApplicationEntityIndex( applicationScope );
-
-
-        //get all the source edges for an entity
-        final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( gm, entity.getId() );
-
-
-        final Id entityId = entity.getId();
-
-        final Observable<IndexEdge> targetIndexEdges = edgesToTarget.map( edge -> generateScopeToTarget( edge ) );
-
-
-
-        //we might or might not need to index from target-> source
-
-
-        final Observable<IndexEdge>
-            targetSizes = getIndexEdgesToTarget( gm, ei, entity );
-
-
-        final Observable<IndexIdentifierImpl.IndexOperationMessage> observable = Observable.merge( targetIndexEdges,
-            targetSizes ).buffer( serializationFig.getBufferSize() ).flatMap( buffer ->
-            Observable.from(buffer).collect( () -> ei.createBatch(), ( batch, indexEdge ) -> batch.index( indexEdge, entity ) ).flatMap( batch -> Observable.from( batch.execute() ) );
-
-
-
-
-
-        final Observable<IndexIdentifierImpl.IndexOperationMessage> futures = Observable.merge()
-        return MathObservable.sumInteger( sourceSizes );
-    }
-
-
-    /**
-     * Get index edgs to the target
-     * @param graphManager
-     * @param ei The application entity index
-     * @param entity The entity
-     * @return
-     */
-    private Observable<IndexEdge> getIndexEdgesToTarget(
-        final GraphManager graphManager, final ApplicationEntityIndex ei, final Entity entity  ) {
-
-        final Id entityId = entity.getId();
-        final String collectionName = InflectionUtils.pluralize( entityId.getType() );
-
-
-        final CollectionInfo collection = getDefaultSchema().getCollection( Application.ENTITY_TYPE, collectionName );
-
-        //nothing to do
-        if ( collection == null ) {
-            return Observable.empty();
-        }
-
-
-        final String linkedCollection = collection.getLinkedCollection();
-
-        /**
-         * Nothing to link
-         */
-        if ( linkedCollection == null ) {
-            return Observable.empty();
-        }
-
-
-        /**
-         * An observable of sizes as we execute batches
-         */
-       return edgesObservable.getEdgesFromSource( graphManager, entityId, linkedCollection ).map( edge -> generateScopeFromSource( edge ) );
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/85b47ee3/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagementEntityManagerProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagementEntityManagerProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagementEntityManagerProvider.java
new file mode 100644
index 0000000..666c927
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagementEntityManagerProvider.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence;
+
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.EntityManagerFactory;
+
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+
+
+@Singleton
+public class ManagementEntityManagerProvider implements Provider<EntityManager> {
+
+
+
+    @Override
+    @Singleton
+    public EntityManager get() {
+
+        CpNamingUtils.MANAGEMENT_APPLICATION_ID);
+
+        final EntityManager em = new CpEntityManager(  );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/85b47ee3/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
index 6a37144..2de69ed 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
@@ -42,11 +42,8 @@ public class EntityDeletedHandler implements EntityDeleted {
     private static final Logger logger = LoggerFactory.getLogger( EntityDeletedHandler.class );
 
 
-    private final EntityManagerFactory emf;
-
-
     @Inject
-    public EntityDeletedHandler( final EntityManagerFactory emf ) {this.emf = emf;}
+    public EntityDeletedHandler( ) {}
 
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/85b47ee3/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
index 0163fc2..20bdd55 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
@@ -41,15 +41,11 @@ public class EntityVersionCreatedHandler implements EntityVersionCreated {
     private static final Logger logger = LoggerFactory.getLogger(EntityVersionCreatedHandler.class );
 
 
-    private final EntityManagerFactory emf;
-    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
 
 
     @Inject
-    public EntityVersionCreatedHandler( final EntityManagerFactory emf,
-                                        final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
-        this.emf = emf;
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+    public EntityVersionCreatedHandler(  ) {
+
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/85b47ee3/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
index 700851a..22f599e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
@@ -30,9 +30,11 @@ import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexEdge;
 import org.apache.usergrid.persistence.model.entity.Id;
 
@@ -56,17 +58,20 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted {
     private static final Logger logger = LoggerFactory.getLogger( EntityVersionDeletedHandler.class );
 
 
-    private final EntityManagerFactory emf;
     private final EdgesObservable edgesObservable;
     private final SerializationFig serializationFig;
+    private final EntityIndexFactory entityIndexFactory;
+    private final GraphManagerFactory graphManagerFactory;
 
 
     @Inject
-    public EntityVersionDeletedHandler( final EntityManagerFactory emf, final EdgesObservable edgesObservable,
-                                        final SerializationFig serializationFig ) {
-        this.emf = emf;
+    public EntityVersionDeletedHandler( final EdgesObservable edgesObservable, final SerializationFig serializationFig,
+                                        final EntityIndexFactory entityIndexFactory,
+                                        final GraphManagerFactory graphManagerFactory ) {
         this.edgesObservable = edgesObservable;
         this.serializationFig = serializationFig;
+        this.entityIndexFactory = entityIndexFactory;
+        this.graphManagerFactory = graphManagerFactory;
     }
 
 
@@ -87,10 +92,8 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted {
             } );
         }
 
-        CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf;
-
-        final ApplicationEntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope );
-        final GraphManager gm = cpemf.getManagerCache().getGraphManager( scope );
+        final ApplicationEntityIndex ei = entityIndexFactory.createApplicationEntityIndex( scope );
+        final GraphManager gm = graphManagerFactory.createEdgeManager(  scope );
 
 
         //create an observable of all scopes to deIndex

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/85b47ee3/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueue.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueue.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueue.java
new file mode 100644
index 0000000..fc1bdb7
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueue.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+
+
+/**
+ * A temporary interface of our buffer Q to decouple of producer and consumer;
+ */
+public interface BufferQueue {
+
+    /**
+     * Offer the indexoperation message.  Some queues may support not returning the future until ack or fail.
+     * Other queues may return the future after ack on the offer.  See the implementation documentation for details.
+     * @param operation
+     */
+    public void offer(final IndexOperationMessage operation);
+
+
+    /**
+     * Perform a take, potentially blocking until up to takesize is available, or timeout has elapsed.
+     * May return less than the take size, but will never return null
+     *
+     * @param takeSize
+     * @param timeout
+     * @param timeUnit
+     * @return A null safe lid
+     */
+    public List<IndexOperationMessage> take(final int takeSize, final long timeout, final TimeUnit timeUnit );
+
+
+    /**
+     * Ack all messages so they do not appear again.  Meant for transactional queues, and may or may not be implemented.
+     * This will set the future as done in in memory operations
+     *
+     * @param messages
+     */
+    public void ack(final List<IndexOperationMessage> messages);
+
+    /**
+     * Mark these message as failed.  Set the exception in the future on local operation
+     *
+     * @param messages
+     */
+    public void fail(final List<IndexOperationMessage> messages, final Throwable t);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/85b47ee3/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueInMemoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueInMemoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueInMemoryImpl.java
new file mode 100644
index 0000000..0e43da3
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueInMemoryImpl.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+@Singleton
+public class BufferQueueInMemoryImpl implements BufferQueue {
+
+
+    private final QueryFig fig;
+    private final ArrayBlockingQueue<IndexOperationMessage> messages;
+
+
+    @Inject
+    public BufferQueueInMemoryImpl( final QueryFig fig ) {
+        this.fig = fig;
+        messages = new ArrayBlockingQueue<>( fig.getIndexQueueSize() );
+    }
+
+
+    @Override
+    public void offer( final IndexOperationMessage operation ) {
+        try {
+            messages.offer( operation, fig.getQueueOfferTimeout(), TimeUnit.MILLISECONDS );
+        }
+        catch ( InterruptedException e ) {
+            throw new RuntimeException("Unable to offer message to queue", e);
+        }
+    }
+
+
+    @Override
+    public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) {
+
+        final List<IndexOperationMessage> response = new ArrayList<>( takeSize );
+        try {
+
+
+            messages.drainTo( response, takeSize );
+
+            //we got something, go process it
+            if ( response.size() > 0 ) {
+                return response;
+            }
+
+
+            final IndexOperationMessage polled = messages.poll( timeout, timeUnit );
+
+            if ( polled != null ) {
+                response.add( polled );
+
+                //try to add more
+                messages.drainTo( response, takeSize - 1 );
+            }
+        }
+        catch ( InterruptedException e ) {
+            //swallow
+        }
+
+
+        return response;
+    }
+
+
+    @Override
+    public void ack( final List<IndexOperationMessage> messages ) {
+        //if we have a future ack it
+        for ( final IndexOperationMessage op : messages ) {
+            op.done();
+        }
+    }
+
+
+    @Override
+    public void fail( final List<IndexOperationMessage> messages, final Throwable t ) {
+
+
+        for ( final IndexOperationMessage op : messages ) {
+            final BetterFuture<IndexOperationMessage> future = op.getFuture();
+
+            if ( future != null ) {
+                future.setError( t );
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/85b47ee3/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImpl.java
new file mode 100644
index 0000000..d955014
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImpl.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.MapScope;
+import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ * This is experimental at best.  Our SQS size limit is a problem.  We shouldn't use this for index operation. Only for
+ * performing
+ */
+@Singleton
+public class BufferQueueSQSImpl implements BufferQueue {
+
+    private static final Logger logger = LoggerFactory.getLogger( BufferQueueSQSImpl.class );
+
+    /** Hacky, copied from CPEntityManager b/c we can't access it here */
+    public static final UUID MANAGEMENT_APPLICATION_ID = UUID.fromString( "b6768a08-b5d5-11e3-a495-11ddb1de66c8" );
+
+
+    /**
+     * Set our TTL to 1 month.  This is high, but in the event of a bug, we want these entries to get removed
+     */
+    public static final int TTL = 60 * 60 * 24 * 30;
+
+    /**
+     * The name to put in the map
+     */
+    public static final String MAP_NAME = "esqueuedata";
+
+
+    private static final String QUEUE_NAME = "es_queue";
+
+    private static SmileFactory SMILE_FACTORY = new SmileFactory();
+
+
+    static {
+        SMILE_FACTORY.delegateToTextual( true );
+    }
+
+
+    private final QueueManager queue;
+    private final MapManager mapManager;
+    private final QueryFig queryFig;
+    private final ObjectMapper mapper;
+    private final Meter readMeter;
+    private final Timer readTimer;
+    private final Meter writeMeter;
+    private final Timer writeTimer;
+
+
+    @Inject
+    public BufferQueueSQSImpl( final QueueManagerFactory queueManagerFactory, final QueryFig queryFig,
+                               final MapManagerFactory mapManagerFactory, final MetricsFactory metricsFactory ) {
+        final QueueScope queueScope =
+            new QueueScopeImpl( QUEUE_NAME );
+
+        this.queue = queueManagerFactory.getQueueManager( queueScope );
+        this.queryFig = queryFig;
+
+        final MapScope scope = new MapScopeImpl( new SimpleId( MANAGEMENT_APPLICATION_ID, "application" ), MAP_NAME );
+
+        this.mapManager = mapManagerFactory.createMapManager( scope );
+
+
+        this.writeTimer = metricsFactory.getTimer( BufferQueueSQSImpl.class, "write.timer" );
+        this.writeMeter = metricsFactory.getMeter( BufferQueueSQSImpl.class, "write.meter" );
+
+        this.readTimer = metricsFactory.getTimer( BufferQueueSQSImpl.class, "read.timer" );
+        this.readMeter = metricsFactory.getMeter( BufferQueueSQSImpl.class, "read.meter" );
+
+        this.mapper = new ObjectMapper( SMILE_FACTORY );
+        //pretty print, disabling for speed
+        //            mapper.enable(SerializationFeature.INDENT_OUTPUT);
+
+    }
+
+
+    @Override
+    public void offer( final IndexOperationMessage operation ) {
+
+        //no op
+        if(operation.isEmpty()){
+            operation.getFuture().done();
+            return;
+        }
+
+        final Timer.Context timer = this.writeTimer.time();
+        this.writeMeter.mark();
+
+        final UUID identifier = UUIDGenerator.newTimeUUID();
+
+        try {
+
+            final String payLoad = toString( operation );
+
+            //write to cassandra
+            this.mapManager.putString( identifier.toString(), payLoad, TTL );
+
+            //signal to SQS
+            this.queue.sendMessage( identifier );
+            operation.done();
+        }
+        catch ( IOException e ) {
+            throw new RuntimeException( "Unable to queue message", e );
+        }
+        finally {
+            timer.stop();
+        }
+    }
+
+
+    @Override
+    public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) {
+
+        //SQS doesn't support more than 10
+
+        final int actualTake = Math.min( 10, takeSize );
+
+        final Timer.Context timer = this.readTimer.time();
+
+        try {
+
+            List<QueueMessage> messages = queue
+                .getMessages( actualTake, queryFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ),
+                    String.class );
+
+
+
+            final List<IndexOperationMessage> response = new ArrayList<>( messages.size() );
+
+            final List<String> mapEntries = new ArrayList<>( messages.size() );
+
+
+            if(messages.size() == 0){
+                return response;
+            }
+
+            //add all our keys  for a single round trip
+            for ( final QueueMessage message : messages ) {
+                mapEntries.add( message.getBody().toString() );
+            }
+
+            //look up the values
+            final Map<String, String> storedCommands = mapManager.getStrings( mapEntries );
+
+
+            //load them into our response
+            for ( final QueueMessage message : messages ) {
+
+                final String key = getMessageKey( message );
+
+                //now see if the key was there
+                final String payload = storedCommands.get( key );
+
+                //the entry was not present in cassandra, ignore this message.  Failure should eventually kick it to
+                // a DLQ
+
+                if ( payload == null ) {
+                    continue;
+                }
+
+                final IndexOperationMessage messageBody;
+
+                try {
+                    messageBody = fromString( payload );
+                }
+                catch ( IOException e ) {
+                    logger.error( "Unable to deserialize message from string.  This is a bug", e );
+                    throw new RuntimeException( "Unable to deserialize message from string.  This is a bug", e );
+                }
+
+                SqsIndexOperationMessage operation = new SqsIndexOperationMessage( message, messageBody );
+
+                response.add( operation );
+            }
+
+            readMeter.mark( response.size() );
+            return response;
+        }
+        //stop our timer
+        finally {
+            timer.stop();
+        }
+    }
+
+
+    @Override
+    public void ack( final List<IndexOperationMessage> messages ) {
+
+        //nothing to do
+        if ( messages.size() == 0 ) {
+            return;
+        }
+
+        List<QueueMessage> toAck = new ArrayList<>( messages.size() );
+
+        for ( IndexOperationMessage ioe : messages ) {
+
+
+            final SqsIndexOperationMessage sqsIndexOperationMessage =   ( SqsIndexOperationMessage ) ioe;
+
+            final String key = getMessageKey( sqsIndexOperationMessage.getMessage() );
+
+            //remove it from the map
+            mapManager.delete( key  );
+
+            toAck.add( ( ( SqsIndexOperationMessage ) ioe ).getMessage() );
+        }
+
+        queue.commitMessages( toAck );
+    }
+
+
+    @Override
+    public void fail( final List<IndexOperationMessage> messages, final Throwable t ) {
+        //no op, just let it retry after the queue timeout
+    }
+
+
+    /** Read the object from Base64 string. */
+    private IndexOperationMessage fromString( String s ) throws IOException {
+        IndexOperationMessage o = mapper.readValue( s, IndexOperationMessage.class );
+        return o;
+    }
+
+
+    /** Write the object to a Base64 string. */
+    private String toString( IndexOperationMessage o ) throws IOException {
+        return mapper.writeValueAsString( o );
+    }
+
+    private String getMessageKey(final QueueMessage message){
+        return message.getBody().toString();
+    }
+
+    /**
+     * The message that subclasses our IndexOperationMessage.  holds a pointer to the original message
+     */
+    public class SqsIndexOperationMessage extends IndexOperationMessage {
+
+        private final QueueMessage message;
+
+
+        public SqsIndexOperationMessage( final QueueMessage message, final IndexOperationMessage source ) {
+            this.message = message;
+            this.addAllDeIndexRequest( source.getDeIndexRequests() );
+            this.addAllIndexRequest( source.getIndexRequests() );
+        }
+
+
+        /**
+         * Get the message from our queue
+         */
+        public QueueMessage getMessage() {
+            return message;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/85b47ee3/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexQueueService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexQueueService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexQueueService.java
new file mode 100644
index 0000000..8d7f222
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexQueueService.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+
+
+/**
+ * Low level queue service for indexing entities
+ */
+public interface IndexQueueService {
+
+
+    /**
+     * Queue an entity to be index asynchronously
+     * @param applicationScope
+     * @param entityId
+     * @param version
+     */
+    void queueEntityIndex( final ApplicationScope applicationScope, final Id entityId, final UUID version );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/85b47ee3/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
new file mode 100644
index 0000000..2bf073c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+
+
+/**
+ * Our low level indexing service operations
+ */
+public interface IndexService {
+
+
+    /**
+     *  Perform an index update of the entity's state from Cassandra
+     *
+     * @param applicationScope The scope of the entity
+     * @param entity The entity
+     *
+     * @return An observable with the count of every
+     */
+    Observable<Integer> indexEntity( final ApplicationScope applicationScope, final Entity entity );
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/85b47ee3/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
new file mode 100644
index 0000000..2a7533a
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.entities.Application;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.IndexEdge;
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.schema.CollectionInfo;
+import org.apache.usergrid.utils.InflectionUtils;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeToTarget;
+import static org.apache.usergrid.persistence.Schema.getDefaultSchema;
+
+
+/**
+ * Implementation of the indexing service
+ */
+@Singleton
+public class IndexServiceImpl implements IndexService {
+
+    private final GraphManagerFactory graphManagerFactory;
+    private final EntityIndexFactory entityIndexFactory;
+    private final EdgesObservable edgesObservable;
+    private final IndexFig indexFig;
+
+
+    @Inject
+    public IndexServiceImpl( final GraphManagerFactory graphManagerFactory, final EntityIndexFactory entityIndexFactory,
+                             final EdgesObservable edgesObservable, IndexFig indexFig ) {
+        this.graphManagerFactory = graphManagerFactory;
+        this.entityIndexFactory = entityIndexFactory;
+        this.edgesObservable = edgesObservable;
+        this.indexFig = indexFig;
+    }
+
+
+    @Override
+    public Observable<Integer> indexEntity( final ApplicationScope applicationScope, final Entity entity ) {
+
+
+        //bootstrap the lower modules from their caches
+        final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
+        final ApplicationEntityIndex ei = entityIndexFactory.createApplicationEntityIndex( applicationScope );
+
+
+        final Id entityId = entity.getId();
+
+
+        //we always index in the target scope
+        final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( gm, entityId );
+
+        //we may have to index
+        final Observable<IndexEdge> sourceEdgesToIndex = edgesToTarget.map( edge -> generateScopeToTarget( edge ) );
+
+
+        //we might or might not need to index from target-> source
+
+
+        final Observable<IndexEdge> targetSizes = getIndexEdgesToTarget( gm, entityId );
+
+
+        final Observable<IndexOperationMessage> observable =
+            //try to send a whole batch if we can
+            Observable.merge( sourceEdgesToIndex, targetSizes ).buffer( indexFig.getIndexBatchSize() )
+
+                //map into batches based on our buffer size
+                .flatMap( buffer -> Observable.from( buffer ).collect( () -> ei.createBatch(),
+                    ( batch, indexEdge ) -> batch.index( indexEdge, entity ) )
+                    //return the future from the batch execution
+                    .flatMap( batch -> Observable.from( batch.execute() ) ) );
+
+        observable.toBlocking().last();
+
+
+        return Observable.just( 0 );
+    }
+
+
+    /**
+     * Get index edgs to the target
+     *
+     * @param graphManager The graph manager
+     * @param entityId The entitie's id
+     */
+    private Observable<IndexEdge> getIndexEdgesToTarget( final GraphManager graphManager, final Id entityId ) {
+
+        final String collectionName = InflectionUtils.pluralize( entityId.getType() );
+
+
+        final CollectionInfo collection = getDefaultSchema().getCollection( Application.ENTITY_TYPE, collectionName );
+
+        //nothing to do
+        if ( collection == null ) {
+            return Observable.empty();
+        }
+
+
+        final String linkedCollection = collection.getLinkedCollection();
+
+        /**
+         * Nothing to link
+         */
+        if ( linkedCollection == null ) {
+            return Observable.empty();
+        }
+
+
+        /**
+         * An observable of sizes as we execute batches
+         */
+        return edgesObservable.getEdgesFromSource( graphManager, entityId, linkedCollection )
+                              .map( edge -> generateScopeFromSource( edge ) );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/85b47ee3/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueryFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueryFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueryFig.java
new file mode 100644
index 0000000..a7d2450
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueryFig.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+
+/**
+ * Application id cache fig
+ */
+public interface QueryFig extends GuicyFig {
+
+
+    /**
+     * Amount of time in milliseconds to wait when ES rejects our request before retrying.  Provides simple
+     * backpressure
+     */
+    public static final String FAILURE_REJECTED_RETRY_WAIT_TIME = "elasticsearch.rejected_retry_wait";
+
+    /**
+     * The number of worker threads to consume from the queue
+     */
+    public static final String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count";
+
+    /**
+     * The queue implementation to use.  Values come from <class>QueueProvider.Implementations</class>
+     */
+    public static final String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl";
+
+
+    /**
+     * The queue implementation to use.  Values come from <class>QueueProvider.Implementations</class>
+     */
+    public static final String ELASTICSEARCH_QUEUE_OFFER_TIMEOUT = "elasticsearch.queue.offer_timeout";
+
+    /**
+     * Amount of time to wait when reading from the queue
+     */
+    public static final String INDEX_QUEUE_READ_TIMEOUT = "elasticsearch.queue_read_timeout";
+
+    /**
+     * Amount of time to wait when reading from the queue in milliseconds
+     */
+    public static final String INDEX_QUEUE_TRANSACTION_TIMEOUT = "elasticsearch.queue_transaction_timeout";
+
+
+    String INDEX_QUEUE_SIZE = "elasticsearch.queue_size";
+
+
+    @Default( "1000" )
+    @Key( FAILURE_REJECTED_RETRY_WAIT_TIME )
+    long getFailureRetryTime();
+
+    //give us 60 seconds to process the message
+    @Default( "60" )
+    @Key( INDEX_QUEUE_READ_TIMEOUT )
+    int getIndexQueueTimeout();
+
+    @Default( "2" )
+    @Key( ELASTICSEARCH_WORKER_COUNT )
+    int getWorkerCount();
+
+    @Default( "LOCAL" )
+    @Key( ELASTICSEARCH_QUEUE_IMPL )
+    String getQueueImplementation();
+
+    @Default( "1000" )
+    @Key( ELASTICSEARCH_QUEUE_OFFER_TIMEOUT )
+    long getQueueOfferTimeout();
+
+    /**
+     * size of the buffer to build up before you send results
+     */
+    @Default( "1000" )
+    @Key( INDEX_QUEUE_SIZE )
+    int getIndexQueueSize();
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/85b47ee3/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueueProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueueProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueueProvider.java
new file mode 100644
index 0000000..d3920db
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueueProvider.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+
+
+/**
+ * A provider to allow users to configure their queue impl via properties
+ */
+@Singleton
+public class QueueProvider implements Provider<BufferQueue> {
+
+    private final QueryFig queryFig;
+
+    private final QueueManagerFactory queueManagerFactory;
+    private final MapManagerFactory mapManagerFactory;
+    private final MetricsFactory metricsFactory;
+
+    private BufferQueue bufferQueue;
+
+
+    @Inject
+    public QueueProvider( final QueryFig queryFig, final QueueManagerFactory queueManagerFactory,
+                          final MapManagerFactory mapManagerFactory, final MetricsFactory metricsFactory ) {
+        this.queryFig = queryFig;
+
+
+        this.queueManagerFactory = queueManagerFactory;
+        this.mapManagerFactory = mapManagerFactory;
+        this.metricsFactory = metricsFactory;
+    }
+
+
+    @Override
+    @Singleton
+    public BufferQueue get() {
+        if ( bufferQueue == null ) {
+            bufferQueue = getQueue();
+        }
+
+
+        return bufferQueue;
+    }
+
+
+    private BufferQueue getQueue() {
+        final String value = queryFig.getQueueImplementation();
+
+        final Implementations impl = Implementations.valueOf( value );
+
+        switch ( impl ) {
+            case LOCAL:
+                return new BufferQueueInMemoryImpl( queryFig );
+            case SQS:
+                return new BufferQueueSQSImpl( queueManagerFactory, queryFig, mapManagerFactory, metricsFactory );
+            default:
+                throw new IllegalArgumentException( "Configuration value of " + getErrorValues() + " are allowed" );
+        }
+    }
+
+
+    private String getErrorValues() {
+        String values = "";
+
+        for ( final Implementations impl : Implementations.values() ) {
+            values += impl + ", ";
+        }
+
+        values = values.substring( 0, values.length() - 2 );
+
+        return values;
+    }
+
+
+    /**
+     * Different implementations
+     */
+    public static enum Implementations {
+        LOCAL,
+        SQS;
+
+
+        public String asString() {
+            return toString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/85b47ee3/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSIndexQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSIndexQueueServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSIndexQueueServiceImpl.java
new file mode 100644
index 0000000..42f36b1
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSIndexQueueServiceImpl.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+public class SQSIndexQueueServiceImpl implements IndexQueueService {
+
+    @Override
+    public void queueEntityIndex( final ApplicationScope applicationScope, final Id entityId, final UUID version ) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/85b47ee3/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java
index 129f2f9..9f13454 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java
@@ -65,8 +65,8 @@ public class AppInfoMigrationPlugin implements MigrationPlugin {
     @Inject
     final private MigrationInfoSerialization migrationInfoSerialization;
 
-    @Inject
-    final private EntityManagerFactory emf;
+//    @Inject
+//    final private EntityManagerFactory emf;
 
     @Inject
     final private EntityCollectionManagerFactory entityCollectionManagerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/85b47ee3/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
index 7929b12..fc8b3d5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
@@ -689,8 +689,6 @@ public interface EntityManager {
     /** @return the cass */
     CassandraService getCass();
 
-    public void init( EntityManagerFactory emf,  UUID applicationId);
-
     /** For testing purposes */
     public void flushManagerCaches();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/85b47ee3/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java
new file mode 100644
index 0000000..6a98af6
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence;
+
+
+import org.safehaus.guicyfig.GuicyFigModule;
+
+import org.apache.usergrid.persistence.core.guice.CommonModule;
+import org.apache.usergrid.persistence.core.guice.TestModule;
+import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.guice.IndexModule;
+import org.apache.usergrid.persistence.index.guice.IndexTestFig;
+
+import com.google.inject.Inject;
+import com.google.inject.TypeLiteral;
+
+import rx.Observable;
+
+
+public class TestIndexModule extends TestModule {
+
+    @Override
+    protected void configure() {
+
+        //this will break, we need to untagle this and move to guice in core completely
+        install( new CoreModule());
+    }
+
+
+}


Mime
View raw message