usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [21/39] usergrid git commit: remove index batch execute
Date Wed, 14 Oct 2015 16:54:17 GMT
remove index batch execute


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4c263b87
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4c263b87
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4c263b87

Branch: refs/heads/usergrid-1007-shiro-cache
Commit: 4c263b870d0b45f53e198cad876373485c188669
Parents: a770024
Author: Shawn Feldman <sfeldman@apache.org>
Authored: Mon Sep 28 10:12:54 2015 -0600
Committer: Shawn Feldman <sfeldman@apache.org>
Committed: Mon Sep 28 10:12:54 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpManagerCache.java         | 12 ++++-
 .../corepersistence/CpRelationManager.java      |  4 +-
 .../usergrid/corepersistence/ManagerCache.java  |  7 +++
 .../corepersistence/index/IndexServiceImpl.java |  8 ++--
 .../read/search/CandidateEntityFilter.java      | 16 +++++--
 .../pipeline/read/search/CandidateIdFilter.java | 16 +++++--
 .../persistence/index/EntityIndexBatch.java     |  7 ++-
 .../index/impl/EsEntityIndexBatchImpl.java      |  8 ++--
 .../index/impl/EsEntityIndexFactoryImpl.java    |  4 --
 .../index/impl/EsEntityIndexImpl.java           |  5 +-
 .../persistence/index/impl/EntityIndexTest.java | 48 +++++++++++---------
 .../persistence/index/impl/GeoPagingTest.java   |  4 +-
 .../index/impl/IndexLoadTestsIT.java            |  5 +-
 13 files changed, 88 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
index 89f2ab2..0408bbd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
@@ -26,6 +26,7 @@ import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexLocationStrategy;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.apache.usergrid.persistence.map.MapManager;
 import org.apache.usergrid.persistence.map.MapManagerFactory;
 import org.apache.usergrid.persistence.map.MapScope;
@@ -43,6 +44,7 @@ public class CpManagerCache implements ManagerCache {
     private final GraphManagerFactory gmf;
     private final MapManagerFactory mmf;
     private final IndexLocationStrategyFactory indexLocationStrategyFactory;
+    private final IndexProducer indexProducer;
 
     // TODO: consider making these cache sizes and timeouts configurable
 
@@ -52,13 +54,16 @@ public class CpManagerCache implements ManagerCache {
                            final EntityIndexFactory eif,
                            final GraphManagerFactory gmf,
                            final MapManagerFactory mmf,
-                           final IndexLocationStrategyFactory indexLocationStrategyFactory)
{
+                           final IndexLocationStrategyFactory indexLocationStrategyFactory,
+                           final IndexProducer indexProducer
+    ) {
 
         this.ecmf = ecmf;
         this.eif = eif;
         this.gmf = gmf;
         this.mmf = mmf;
         this.indexLocationStrategyFactory = indexLocationStrategyFactory;
+        this.indexProducer = indexProducer;
     }
 
 
@@ -88,6 +93,11 @@ public class CpManagerCache implements ManagerCache {
         return mmf.createMapManager( mapScope );
     }
 
+    @Override
+    public IndexProducer getIndexProducer() {
+        return indexProducer;
+    }
+
 
     @Override
     public void invalidate() {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index de687b3..b4b39a0 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -535,9 +535,7 @@ public class CpRelationManager implements RelationManager {
 
         batch.deindex( indexScope, memberEntity );
 
-
-        batch.execute();
-
+        managerCache.getIndexProducer().put( batch.build()).subscribe();
 
         // special handling for roles collection of a group
         if ( headEntity.getType().equals( Group.ENTITY_TYPE ) ) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java
index 6425b61..1dee80a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java
@@ -25,6 +25,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.IndexLocationStrategy;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.apache.usergrid.persistence.map.MapManager;
 import org.apache.usergrid.persistence.map.MapScope;
 
@@ -66,6 +67,12 @@ public interface ManagerCache {
     MapManager getMapManager(MapScope mapScope);
 
     /**
+     * gets index producer
+     * @return
+     */
+    IndexProducer getIndexProducer();
+
+    /**
      * invalidate the cache
      */
     void invalidate();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 8a8dba1..d160aac 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -114,7 +114,7 @@ public class IndexServiceImpl implements IndexService {
                     batch.index( indexEdge, entity );
                 } )
                     //return the future from the batch execution
-                .flatMap( batch -> batch.execute() ) );
+                .flatMap( batch -> Observable.just(batch.build()) ) );
 
         return ObservableTimer.time( batches, indexTimer );
     }
@@ -142,7 +142,7 @@ public class IndexServiceImpl implements IndexService {
 
             batch.index( indexEdge, entity );
 
-            return batch.execute();
+            return Observable.just(batch.build());
         } );
 
         return ObservableTimer.time( batches, addTimer  );
@@ -185,7 +185,7 @@ public class IndexServiceImpl implements IndexService {
 
                 batch = deindexBatchIteratorResolver( fromTarget, sourceEdgesToBeDeindexed,
batch );
 
-                return batch.execute();
+                return Observable.just(batch.build());
             } );
 
         return ObservableTimer.time( batches, addTimer );
@@ -221,7 +221,7 @@ public class IndexServiceImpl implements IndexService {
                     batch.deindex( searchEdge, candidateResult );
                 } )
                     //return the future from the batch execution
-                .flatMap( batch -> batch.execute() );
+                .flatMap( batch ->Observable.just(batch.build()) );
 
         return ObservableTimer.time(batches, indexTimer);
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
index 14c880f..ceb18ae 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
@@ -24,6 +24,7 @@ import java.util.*;
 
 import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
 import org.apache.usergrid.persistence.index.*;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.apache.usergrid.persistence.model.field.Field;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,16 +57,19 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
     private final EntityIndexFactory entityIndexFactory;
     private final IndexLocationStrategyFactory indexLocationStrategyFactory;
+    private final IndexProducer indexProducer;
 
 
     @Inject
     public CandidateEntityFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
                                   final EntityIndexFactory entityIndexFactory,
-                                  final IndexLocationStrategyFactory indexLocationStrategyFactory
+                                  final IndexLocationStrategyFactory indexLocationStrategyFactory,
+                                  final IndexProducer indexProducer
                                   ) {
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
         this.entityIndexFactory = entityIndexFactory;
         this.indexLocationStrategyFactory = indexLocationStrategyFactory;
+        this.indexProducer = indexProducer;
     }
 
 
@@ -108,7 +112,7 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
                         //now we have a collection, validate our canidate set is correct.
                         return entitySets.map(
                             entitySet -> new EntityVerifier(
-                                applicationIndex.createBatch(), entitySet, candidateResults)
+                                applicationIndex.createBatch(), entitySet, candidateResults,indexProducer)
                         )
                             .doOnNext(entityCollector -> entityCollector.merge())
                             .flatMap(entityCollector -> Observable.from(entityCollector.getResults()))
@@ -150,14 +154,17 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
 
         private final EntityIndexBatch batch;
         private final List<FilterResult<Candidate>> candidateResults;
+        private final IndexProducer indexProducer;
         private final EntitySet entitySet;
 
 
         public EntityVerifier( final EntityIndexBatch batch, final EntitySet entitySet,
-                               final List<FilterResult<Candidate>> candidateResults
) {
+                               final List<FilterResult<Candidate>> candidateResults,
+                               final IndexProducer indexProducer) {
             this.batch = batch;
             this.entitySet = entitySet;
             this.candidateResults = candidateResults;
+            this.indexProducer = indexProducer;
             this.results = new ArrayList<>( entitySet.size() );
         }
 
@@ -171,7 +178,8 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
                 validate( candidateResult );
             }
 
-           batch.execute().subscribe();
+            indexProducer.put(batch.build()).subscribe();
+
         }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
index 3b1c102..b2fd675 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateIdFilter.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 
 import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
 import org.apache.usergrid.persistence.index.*;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,15 +54,19 @@ public class CandidateIdFilter extends AbstractFilter<FilterResult<Candidate>,
F
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
     private final EntityIndexFactory entityIndexFactory;
     private final IndexLocationStrategyFactory indexLocationStrategyFactory;
+    private final IndexProducer indexProducer;
 
 
     @Inject
     public CandidateIdFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
                               final EntityIndexFactory entityIndexFactory,
-                              final IndexLocationStrategyFactory indexLocationStrategyFactory)
{
+                              final IndexLocationStrategyFactory indexLocationStrategyFactory,
+                              final IndexProducer indexProducer
+                              ) {
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
         this.entityIndexFactory = entityIndexFactory;
         this.indexLocationStrategyFactory = indexLocationStrategyFactory;
+        this.indexProducer = indexProducer;
     }
 
 
@@ -97,7 +102,7 @@ public class CandidateIdFilter extends AbstractFilter<FilterResult<Candidate>,
F
 
                     return versionSetObservable.map(
                         entitySet -> new EntityCollector( applicationIndex.createBatch(),
entitySet,
-                            candidateResults ) ).doOnNext( entityCollector -> entityCollector.merge()
).flatMap(
+                            candidateResults, indexProducer ) ).doOnNext( entityCollector
-> entityCollector.merge() ).flatMap(
                         entityCollector -> Observable.from( entityCollector.collectResults()
) );
                 } );
 
@@ -115,14 +120,16 @@ public class CandidateIdFilter extends AbstractFilter<FilterResult<Candidate>,
F
 
         private final EntityIndexBatch batch;
         private final List<FilterResult<Candidate>> candidateResults;
+        private final IndexProducer indexProducer;
         private final VersionSet versionSet;
 
 
         public EntityCollector( final EntityIndexBatch batch, final VersionSet versionSet,
-                                final List<FilterResult<Candidate>> candidateResults
) {
+                                final List<FilterResult<Candidate>> candidateResults,
final IndexProducer indexProducer ) {
             this.batch = batch;
             this.versionSet = versionSet;
             this.candidateResults = candidateResults;
+            this.indexProducer = indexProducer;
             this.results = new ArrayList<>( versionSet.size() );
         }
 
@@ -136,7 +143,8 @@ public class CandidateIdFilter extends AbstractFilter<FilterResult<Candidate>,
F
                 validate( candidateResult );
             }
 
-            batch.execute();
+            indexProducer.put( batch.build()).subscribe();
+
         }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
index d1b076d..98652c1 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -18,6 +18,7 @@ package org.apache.usergrid.persistence.index;/*
  */
 
 
+import java.util.List;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
@@ -62,11 +63,13 @@ public interface EntityIndexBatch {
     EntityIndexBatch deindex( final SearchEdge searchEdge, final Id id, final UUID version
);
 
 
+
+
     /**
-     * Execute the batch
+     * get the batches
      * @return future to guarantee execution
      */
-    Observable<IndexOperationMessage> execute();
+    IndexOperationMessage build();
 
     /**
      * Get the number of operations in the batch

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 64a1c6a..68830ca 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -19,6 +19,7 @@
 package org.apache.usergrid.persistence.index.impl;
 
 
+import java.util.List;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.index.*;
@@ -41,7 +42,6 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     private final IndexAlias alias;
 
     private final IndexLocationStrategy indexLocationStrategy;
-    private final IndexProducer indexBatchBufferProducer;
 
     private final EntityIndex entityIndex;
     private final ApplicationScope applicationScope;
@@ -49,12 +49,10 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
 
     public EsEntityIndexBatchImpl( final IndexLocationStrategy locationStrategy,
-                                   final IndexProducer indexBatchBufferProducer,
                                    final EntityIndex entityIndex
     ) {
         this.indexLocationStrategy = locationStrategy;
 
-        this.indexBatchBufferProducer = indexBatchBufferProducer;
         this.entityIndex = entityIndex;
         this.applicationScope = indexLocationStrategy.getApplicationScope();
 
@@ -122,8 +120,8 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
 
     @Override
-    public Observable execute() {
-        return indexBatchBufferProducer.put( container );
+    public IndexOperationMessage build() {
+        return container;
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
index b66fd40..c91057b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
@@ -37,7 +37,6 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
     private final IndexFig config;
     private final IndexCache indexCache;
     private final EsProvider provider;
-    private final IndexProducer indexProducer;
     private final MetricsFactory metricsFactory;
     private final IndexRefreshCommand refreshCommand;
 
@@ -50,7 +49,6 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
                     config,
                     refreshCommand,
                     metricsFactory,
-                    indexProducer,
                     locationStrategy
                 );
                 index.initialize();
@@ -62,7 +60,6 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
     public EsEntityIndexFactoryImpl( final IndexFig indexFig,
                                      final IndexCache indexCache,
                                      final EsProvider provider,
-                                     final IndexProducer indexProducer,
                                      final MetricsFactory metricsFactory,
                                      final IndexRefreshCommand refreshCommand
 
@@ -70,7 +67,6 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
         this.config = indexFig;
         this.indexCache = indexCache;
         this.provider = provider;
-        this.indexProducer = indexProducer;
         this.metricsFactory = metricsFactory;
         this.refreshCommand = refreshCommand;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 6317a69..f6ebce2 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -116,7 +116,6 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
     private final SearchRequestBuilderStrategyV2 searchRequestBuilderStrategyV2;
     private final int cursorTimeout;
     private final long queryTimeout;
-    private final IndexProducer indexBatchBufferProducer;
     private final FailureMonitorImpl failureMonitor;
     private final Timer aggregationTimer;
 
@@ -131,13 +130,11 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData
{
                               final IndexFig indexFig,
                               final IndexRefreshCommand indexRefreshCommand,
                               final MetricsFactory metricsFactory,
-                              final IndexProducer indexBatchBufferProducer,
                               final IndexLocationStrategy indexLocationStrategy
     ) {
 
         this.indexFig = indexFig;
         this.indexLocationStrategy = indexLocationStrategy;
-        this.indexBatchBufferProducer = indexBatchBufferProducer;
         this.failureMonitor = new FailureMonitorImpl( indexFig, provider );
         this.esProvider = provider;
         this.indexRefreshCommand = indexRefreshCommand;
@@ -374,7 +371,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
     @Override
     public EntityIndexBatch createBatch() {
         EntityIndexBatch batch =
-            new EsEntityIndexBatchImpl(indexLocationStrategy , indexBatchBufferProducer,
this );
+            new EsEntityIndexBatchImpl(indexLocationStrategy, this );
         return batch;
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 6348a44..5243d5a 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -82,6 +82,9 @@ public class EntityIndexTest extends BaseIT {
     public IndexFig fig;
 
     @Inject
+    public IndexProducer indexProducer;
+
+    @Inject
     public CassandraFig cassandraFig;
 
     @Inject
@@ -137,7 +140,7 @@ public class EntityIndexTest extends BaseIT {
 
 
         batch.index( indexEdge, entity1 );
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();
 
 
         Entity entity2 = new Entity( entityType );
@@ -151,7 +154,7 @@ public class EntityIndexTest extends BaseIT {
 
 
         batch.index( indexEdge, entity2 );
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
 
         entityIndex.refreshAsync().toBlocking().first();
 
@@ -215,7 +218,7 @@ public class EntityIndexTest extends BaseIT {
 
                     EntityIndexBatch batch = entityIndex.createBatch();
                     insertJsonBlob( sampleJson, batch, entityType, indexEdge, size, 0 );
-                    batch.execute().toBlocking().last();
+                    indexProducer.put(batch.build()).subscribe();;
                 }
                 catch ( Exception e ) {
                     synchronized ( failTime ) {
@@ -290,7 +293,7 @@ public class EntityIndexTest extends BaseIT {
 
         EntityIndexBatch entityIndexBatch = entityIndex.createBatch();
         entityIndexBatch.deindex(searchEdge, crs.get(0));
-        entityIndexBatch.execute().toBlocking().last();
+        indexProducer.put(entityIndexBatch.build()).subscribe();
         entityIndex.refreshAsync().toBlocking().first();
 
         //Hilda Youn
@@ -306,7 +309,7 @@ public class EntityIndexTest extends BaseIT {
         });
         EntityIndexBatch batch = entityIndex.createBatch();
         insertJsonBlob(sampleJson, batch, entityType, indexEdge, max, startIndex);
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
         IndexRefreshCommandImpl.IndexRefreshCommandInfo info =  entityIndex.refreshAsync().toBlocking().first();
         long time = info.getExecutionTime();
         log.info("refresh took ms:" + time);
@@ -364,7 +367,7 @@ public class EntityIndexTest extends BaseIT {
         EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() );
         entity.setField( new UUIDField( IndexingUtils.ENTITY_ID_FIELDNAME, UUID.randomUUID()
) );
 
-        entityIndex.createBatch().index( searchEdge, entity ).execute().toBlocking().last();
+        indexProducer.put(entityIndex.createBatch().index( searchEdge, entity ).build()).subscribe();
         entityIndex.refreshAsync().toBlocking().first();
 
         CandidateResults candidateResults = entityIndex
@@ -373,7 +376,7 @@ public class EntityIndexTest extends BaseIT {
 
         EntityIndexBatch batch = entityIndex.createBatch();
         batch.deindex( searchEdge, entity );
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
         entityIndex.refreshAsync().toBlocking().first();
 
         candidateResults = entityIndex
@@ -410,7 +413,8 @@ public class EntityIndexTest extends BaseIT {
             entity[i].setField( new UUIDField( IndexingUtils.ENTITY_ID_FIELDNAME, entityUUID
) );
 
             //index the new entity. This is where the loop will be set to create like 100
entities.
-            entityIndex.createBatch().index( searchEdge, entity[i] ).execute().toBlocking().last();
+            indexProducer.put(entityIndex.createBatch().index( searchEdge, entity[i]  ).build()).subscribe();
+
         }
         entityIndex.refreshAsync().toBlocking().first();
 
@@ -541,16 +545,16 @@ public class EntityIndexTest extends BaseIT {
         EntityIndexBatch batch = entityIndex.createBatch();
 
         batch.index( indexSCope, user );
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
         entityIndex.refreshAsync().toBlocking().first();
 
         final String query = "where username = 'edanuff'";
 
         CandidateResults r = entityIndex.search( indexSCope, SearchTypes.fromTypes( "edanuff"
), query, 10, 0);
-        assertEquals( user.getId(), r.get( 0 ).getId() );
+        assertEquals( user.getId(), r.get( 0 ).getId());
 
         batch.deindex( indexSCope, user.getId(), user.getVersion() );
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
         entityIndex.refreshAsync().toBlocking().first();
 
         // EntityRef
@@ -611,7 +615,7 @@ public class EntityIndexTest extends BaseIT {
         EntityUtils.setVersion( fred, UUIDGenerator.newTimeUUID() );
         batch.index( indexScope, fred);
 
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
         entityIndex.refreshAsync().toBlocking().first();
 
         final SearchTypes searchTypes = SearchTypes.fromTypes( "user" );
@@ -681,14 +685,14 @@ public class EntityIndexTest extends BaseIT {
             EntityUtils.setId( user, userId );
             EntityUtils.setVersion( user, UUIDGenerator.newTimeUUID() );
 
-            entityIds.add( userId );
+            entityIds.add(userId );
 
 
             batch.index( indexEdge, user );
         }
 
 
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
 
         entityIndex.refreshAsync().toBlocking().first();
 
@@ -755,7 +759,7 @@ public class EntityIndexTest extends BaseIT {
         EntityIndexBatch batch = entityIndex.createBatch();
 
         batch.index( indexSCope, user );
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
         entityIndex.refreshAsync().toBlocking().first();
 
         final String query = "where searchUUID = " + searchUUID;
@@ -794,7 +798,7 @@ public class EntityIndexTest extends BaseIT {
         EntityIndexBatch batch = entityIndex.createBatch();
 
         batch.index(indexSCope, user);
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
         entityIndex.refreshAsync().toBlocking().first();
 
         final String query = "where string = 'I am*'";
@@ -849,9 +853,9 @@ public class EntityIndexTest extends BaseIT {
 
 
         EntityIndexBatch batch = entityIndex.createBatch();
-        batch.index( indexSCope, first );
+        batch.index(indexSCope, first );
         batch.index( indexSCope, second );
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
         entityIndex.refreshAsync().toBlocking().first();
 
 
@@ -916,7 +920,7 @@ public class EntityIndexTest extends BaseIT {
         batch.index(indexScope2, second);
 
 
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
         entityIndex.refreshAsync().toBlocking().first();
 
 
@@ -997,7 +1001,7 @@ public class EntityIndexTest extends BaseIT {
         batch.index( indexScope2, second);
 
 
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
         entityIndex.refreshAsync().toBlocking().first();
 
 
@@ -1107,7 +1111,7 @@ public class EntityIndexTest extends BaseIT {
         batch.index( indexScope2, second);
 
 
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
         entityIndex.refreshAsync().toBlocking().first();
 
 
@@ -1262,7 +1266,7 @@ public class EntityIndexTest extends BaseIT {
         batch.index( indexScope2, second);
 
 
-        batch.execute().toBlocking().last();
+        indexProducer.put(batch.build()).subscribe();;
         entityIndex.refreshAsync().toBlocking().first();
         long size = entityIndex.getEntitySize(new SearchEdgeImpl(ownerId,type, SearchEdge.NodeType.SOURCE));
         assertTrue( size == 100 );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
index dce69f9..98b85f1 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
@@ -70,6 +70,8 @@ public class GeoPagingTest extends BaseIT {
     @Inject
     public EntityIndexFactory eif;
 
+    @Inject
+    IndexProducer indexProducer;
 
     @Inject
     @Rule
@@ -128,7 +130,7 @@ public class GeoPagingTest extends BaseIT {
 
         }
 
-        batch.execute().toBlocking().last();
+        indexProducer.put( batch.build()).subscribe();
 
         entityIndex.refreshAsync().toBlocking().last();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c263b87/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
index cca3f0f..1be1195 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
@@ -92,7 +92,7 @@ public class IndexLoadTestsIT extends BaseIT {
     public IndexTestFig indexTestFig;
 
     @Inject
-    public EntityIndexFactory entityIndexFactory;
+    public IndexProducer indexProducer;
 
     @Inject
     public MetricsFactory metricsFactory;
@@ -347,7 +347,8 @@ public class IndexLoadTestsIT extends BaseIT {
 
 
                     //execute
-                    entityIndexBatch.execute();
+                    IndexOperationMessage message = entityIndexBatch.build();
+                    indexProducer.put(message);
                     //stop
                     time.close();
                 } ).toBlocking().last();


Mime
View raw message