usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From toddn...@apache.org
Subject [2/2] git commit: First pass of changing cp and rm manager to use batches
Date Thu, 02 Oct 2014 04:47:41 GMT
First pass of changing cp and rm manager to use batches


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/bba08ddc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/bba08ddc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/bba08ddc

Branch: refs/heads/esbatching
Commit: bba08ddc1ed27b719ecaf46c63eb3f20d4bef8f5
Parents: fba71c2
Author: Todd Nine <toddnine@apache.org>
Authored: Wed Oct 1 22:47:06 2014 -0600
Committer: Todd Nine <toddnine@apache.org>
Committed: Wed Oct 1 22:47:06 2014 -0600

----------------------------------------------------------------------
 .../CpEntityIndexDeleteListener.java            |  8 +-
 .../corepersistence/CpEntityManager.java        | 40 ++++++---
 .../corepersistence/CpEntityManagerFactory.java | 33 ++++---
 .../corepersistence/CpManagerCache.java         | 12 +--
 .../corepersistence/CpRelationManager.java      | 93 +++++++++++++-------
 .../CpEntityIndexDeleteListenerTest.java        | 12 ++-
 .../core/scope/ApplicationScopeImpl.java        |  6 +-
 .../serialization/EdgeSerializationTest.java    |  3 +
 .../index/impl/EsEntityIndexBatchImpl.java      | 42 +++++++--
 .../index/impl/EsEntityIndexImpl.java           |  2 +-
 .../index/utils/IndexValidationUtils.java       | 10 +++
 11 files changed, 177 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListener.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListener.java
index f8b28ac..cf3f4a6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListener.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListener.java
@@ -59,13 +59,13 @@ public class CpEntityIndexDeleteListener {
 
 
     public Observable<EntityVersion> receive(final MvccEntityDeleteEvent event) {
-        CollectionScope collectionScope = event.getCollectionScope();
-        IndexScope indexScope = new IndexScopeImpl(collectionScope.getApplication(), collectionScope.getOwner(),
collectionScope.getName());
+        final CollectionScope collectionScope = event.getCollectionScope();
+        final IndexScope indexScope = new IndexScopeImpl(collectionScope.getApplication(),
collectionScope.getOwner(), collectionScope.getName());
         final EntityIndex entityIndex = entityIndexFactory.createEntityIndex(indexScope);
         return Observable.create(new ObservableIterator<CandidateResult>("deleteEsIndexVersions")
{
             @Override
             protected Iterator<CandidateResult> getIterator() {
-                CandidateResults results = entityIndex.getEntityVersions(event.getEntity().getId());
+                CandidateResults results = entityIndex.getEntityVersions(indexScope, event.getEntity().getId());
                 return results.iterator();
             }
         }).subscribeOn(Schedulers.io())
@@ -78,7 +78,7 @@ public class CpEntityIndexDeleteListener {
                             //filter find entities <= current version
                             if (entity.getVersion().timestamp() <= event.getVersion().timestamp())
{
                                 versions.add(entity);
-                                entityIndex.deindex(entity.getId(), entity.getVersion());
+                                entityIndex.createBatch().deindex(indexScope, entity.getId(),
entity.getVersion());
                             }
                         }
                         return Observable.from(versions);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index f85ba30..a5475e7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -74,6 +74,7 @@ import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
 import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
 import org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException;
 import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.IndexScope;
 import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.index.query.CounterResolution;
@@ -561,6 +562,12 @@ public class CpEntityManager implements EntityManager {
             logger.debug( "Deleting indexes of all {} collections owning the entity", 
                     owners.keySet().size() );
 
+            final  EntityIndex ei = managerCache.getEntityIndex( appScope );
+
+            final EntityIndexBatch batch = ei.createBatch();
+
+
+
             for ( String ownerType : owners.keySet() ) {
                 Map<UUID, Set<String>> collectionsByUuid = owners.get( ownerType
);
 
@@ -573,27 +580,30 @@ public class CpEntityManager implements EntityManager {
                                 new SimpleId( uuid, ownerType ), 
                                 CpEntityManager.getCollectionScopeNameFromCollectionName(coll)
);
 
-                        EntityIndex ei = managerCache.getEntityIndex( indexScope );
 
-                        ei.deindex( entity );
+                        batch.index( indexScope, entity );
                     }
                 }
             }
 
+
+
             // deindex from default index scope
             IndexScope defaultIndexScope = new IndexScopeImpl( 
                     appScope.getApplication(), 
                     appScope.getApplication(),
                     getCollectionScopeNameFromEntityType( entityRef.getType() ) );
-            EntityIndex entityIndex = managerCache.getEntityIndex( defaultIndexScope );
-            entityIndex.deindex( entity );
+
+            batch.deindex(defaultIndexScope,  entity );
 
             IndexScope allTypesIndexScope = new IndexScopeImpl( 
                 appScope.getApplication(), 
                 appScope.getApplication(), 
                 ALL_TYPES);
-            EntityIndex aei = managerCache.getEntityIndex( allTypesIndexScope );
-            aei.deindex( entity );
+
+            batch.deindex( allTypesIndexScope,  entity );
+
+            batch.execute();
 
             decrementEntityCollection( Schema.defaultCollectionName( entityId.getType() )
);
 
@@ -980,7 +990,7 @@ public class CpEntityManager implements EntityManager {
                 getCollectionScopeNameFromEntityType( entityRef.getType()) );
 
         EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope
);
-        EntityIndex ei = managerCache.getEntityIndex( defaultIndexScope );
+        EntityIndex ei = managerCache.getEntityIndex( appScope );
 
         Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() );
 
@@ -1002,7 +1012,8 @@ public class CpEntityManager implements EntityManager {
         logger.debug("Wrote {}:{} version {}", new Object[] { 
             cpEntity.getId().getType(), cpEntity.getId().getUuid(), cpEntity.getVersion()
});
 
-        ei.index( cpEntity );
+
+        ei.createBatch().index(defaultIndexScope, cpEntity ).execute();
 
         // update in all containing collections and connection indexes
         CpRelationManager rm = (CpRelationManager)getRelationManager( entityRef );
@@ -2530,12 +2541,13 @@ public class CpEntityManager implements EntityManager {
         }
 
         // Index CP entity into default collection scope
-        IndexScope defaultIndexScope = new IndexScopeImpl( 
-            appScope.getApplication(), 
-            appScope.getApplication(), 
-            CpEntityManager.getCollectionScopeNameFromEntityType( entity.getType() ) );
-        EntityIndex ei = managerCache.getEntityIndex( defaultIndexScope );
-        ei.index( cpEntity );
+//        IndexScope defaultIndexScope = new IndexScopeImpl(
+//            appScope.getApplication(),
+//            appScope.getApplication(),
+//            CpEntityManager.getCollectionScopeNameFromEntityType( entity.getType() ) );
+//        EntityIndex ei = managerCache.getEntityIndex( appScope );
+//
+//        ei.createBatch().index( defaultIndexScope,  cpEntity ).execute();
 
         // reflect changes in the legacy Entity
         entity.setUuid( cpEntity.getId().getUuid() );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 3e9a6a9..090a4d6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -50,6 +50,7 @@ import org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsExcept
 import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexScope;
 import org.apache.usergrid.persistence.index.query.CandidateResult;
@@ -263,8 +264,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
                     .getEntityIndex( SYSTEM_ORGS_INDEX_SCOPE );
 
             orgInfoEntity = ecm.write( orgInfoEntity ).toBlockingObservable().last();
-            eci.index( orgInfoEntity );
-            eci.refresh();
+            eci.createBatch().index(SYSTEM_ORGS_INDEX_SCOPE,  orgInfoEntity ).executeAndRefresh();
         }
 
         if ( properties == null ) {
@@ -288,7 +288,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
                     .getEntityIndex( SYSTEM_APPS_INDEX_SCOPE );
 
             appInfoEntity = ecm.write( appInfoEntity ).toBlockingObservable().last();
-            eci.index( appInfoEntity );
+            eci.createBatch().index(SYSTEM_APPS_INDEX_SCOPE,  appInfoEntity ).executeAndRefresh();
             eci.refresh();
         }
 
@@ -325,7 +325,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         Query q = Query.fromQL(PROPERTY_NAME + " = '" + name + "'");
 
         EntityIndex ei = getManagerCache().getEntityIndex( SYSTEM_ORGS_INDEX_SCOPE );
-        CandidateResults results = ei.search( q );
+        CandidateResults results = ei.search(SYSTEM_ORGS_INDEX_SCOPE,  q );
 
         if ( results.isEmpty() ) {
             return null; 
@@ -342,7 +342,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
 
         EntityIndex ei = getManagerCache().getEntityIndex( SYSTEM_APPS_INDEX_SCOPE );
         
-        CandidateResults results = ei.search( q );
+        CandidateResults results = ei.search(SYSTEM_APPS_INDEX_SCOPE,  q );
 
         if ( results.isEmpty() ) {
             return null; 
@@ -371,7 +371,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
             Query q = Query.fromQL("select *");
             q.setCursor( cursor );
 
-            CandidateResults results = ei.search( q );
+            CandidateResults results = ei.search(SYSTEM_APPS_INDEX_SCOPE,  q );
             cursor = results.getCursor();
 
             Iterator<CandidateResult> iter = results.iterator();
@@ -417,7 +417,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
 
         Query q = Query.fromQL("select *");
 
-        CandidateResults results = ei.search( q );
+        CandidateResults results = ei.search(SYSTEM_PROPS_INDEX_SCOPE,  q );
 
         if ( results.isEmpty() ) {
             return new HashMap<String,String>();
@@ -447,7 +447,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
             .getEntityIndex( SYSTEM_PROPS_INDEX_SCOPE );
 
         Query q = Query.fromQL("select *");
-        CandidateResults results = ei.search( q );
+        CandidateResults results = ei.search(SYSTEM_PROPS_INDEX_SCOPE,  q );
         Entity propsEntity;
         if ( !results.isEmpty() ) {
             propsEntity = em.load( results.iterator().next().getId()).toBlockingObservable().last();
@@ -464,7 +464,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         }
 
         propsEntity = em.write( propsEntity ).toBlockingObservable().last();
-        ei.index( propsEntity );    
+        ei.createBatch().index( SYSTEM_PROPS_INDEX_SCOPE, propsEntity );
 
         return true;
     }
@@ -485,7 +485,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         EntityIndex ei = getManagerCache().getEntityIndex( SYSTEM_PROPS_INDEX_SCOPE );
 
         Query q = Query.fromQL("select *");
-        CandidateResults results = ei.search( q );
+        CandidateResults results = ei.search(SYSTEM_PROPS_INDEX_SCOPE,  q );
 
         Entity propsEntity = em.load( 
                 results.iterator().next().getId() ).toBlockingObservable().last();
@@ -501,7 +501,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         propsEntity.removeField( name );
 
         propsEntity = em.write( propsEntity ).toBlockingObservable().last();
-        ei.index( propsEntity );    
+        ei.createBatch().index( SYSTEM_PROPS_INDEX_SCOPE, propsEntity );
 
         return true;
     }
@@ -616,9 +616,10 @@ public class CpEntityManagerFactory implements EntityManagerFactory,
Application
         EntityIndex ei = managerCache.getEntityIndex( is );
 
         Query q = Query.fromQL("select *");
-        CandidateResults results = ei.search( q );
+        CandidateResults results = ei.search(is,  q );
 
-        Map<String, UUID> appMap = new HashMap<String, UUID>();
+        int count = 0;
+        final EntityIndexBatch batch = ei.createBatch();
 
         Iterator<CandidateResult> iter = results.iterator();
         while (iter.hasNext()) {
@@ -643,7 +644,11 @@ public class CpEntityManagerFactory implements EntityManagerFactory,
Application
                 }
             );
 
-            ei.index(entity);
+            batch.index(is, entity);
+
+
+
+            count++;
         }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
index 2f8bd3f..0e7c084 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
@@ -36,8 +36,8 @@ class CpManagerCache {
     private final LRUCache2<CollectionScope, EntityCollectionManager> ecmCache
             = new LRUCache2<CollectionScope, EntityCollectionManager>(50, 1 * 60 *
60 * 1000);
 
-    private final LRUCache2<IndexScope, EntityIndex> eiCache
-            = new LRUCache2<IndexScope, EntityIndex>(50, 1 * 60 * 60 * 1000);
+    private final LRUCache2<ApplicationScope, EntityIndex> eiCache
+            = new LRUCache2<>(50, 1 * 60 * 60 * 1000);
 
     private final LRUCache2<ApplicationScope, GraphManager> gmCache
             = new LRUCache2<ApplicationScope, GraphManager>(50, 1 * 60 * 60 * 1000);
@@ -61,13 +61,13 @@ class CpManagerCache {
         return ecm;
     }
 
-    public EntityIndex getEntityIndex(IndexScope indexScope) {
+    public EntityIndex getEntityIndex(ApplicationScope applicationScope) {
 
-        EntityIndex ei = eiCache.get(indexScope);
+        EntityIndex ei = eiCache.get(applicationScope);
 
         if (ei == null) {
-            ei = eif.createEntityIndex(indexScope);
-            eiCache.put(indexScope, ei);
+            ei = eif.createEntityIndex(applicationScope);
+            eiCache.put(applicationScope, ei);
         }
         return ei;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 3486fbc..260e429 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.utils.UUIDUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -425,6 +426,11 @@ public class CpRelationManager implements RelationManager {
 
         // loop through all types of edge to target
         int count = 0;
+
+        final EntityIndex ei = managerCache.getEntityIndex( applicationScope );
+
+        final EntityIndexBatch entityIndexBatch = ei.createBatch();
+
         while ( edgeTypesToTarget.hasNext() ) {
 
             // get all edges of the type
@@ -460,10 +466,9 @@ public class CpRelationManager implements RelationManager {
                         applicationScope.getApplication(),
                         new SimpleId(sourceEntity.getUuid(), sourceEntity.getType()),
                         CpEntityManager.getConnectionScopeName( cpHeadEntity.getId().getType(),
connName ));
-                } 
-           
-                EntityIndex ei = managerCache.getEntityIndex(indexScope);
-                ei.index(cpEntity);
+                }
+
+                entityIndexBatch.index(indexScope, cpEntity);
 
                 // reindex the entity in the source entity's all-types index
                 
@@ -471,8 +476,8 @@ public class CpRelationManager implements RelationManager {
                     applicationScope.getApplication(),
                     new SimpleId(sourceEntity.getUuid(), sourceEntity.getType()),
                     ALL_TYPES);
-                ei = managerCache.getEntityIndex(indexScope);
-                ei.index(cpEntity);
+
+                entityIndexBatch.index(indexScope, cpEntity);
 
                 count++;
             }
@@ -659,6 +664,7 @@ public class CpRelationManager implements RelationManager {
             applicationScope.getApplication(), 
             applicationScope.getApplication(), 
             CpEntityManager.getCollectionScopeNameFromEntityType( itemRef.getType()));
+
         EntityCollectionManager memberMgr = managerCache.getEntityCollectionManager(memberScope);
 
         org.apache.usergrid.persistence.model.entity.Entity memberEntity = memberMgr.load(
@@ -700,29 +706,36 @@ public class CpRelationManager implements RelationManager {
         GraphManager gm = managerCache.getGraphManager(applicationScope);
         gm.writeEdge(edge).toBlockingObservable().last();
 
+        final EntityIndex index = managerCache.getEntityIndex( applicationScope );
+
+        final EntityIndexBatch batch = index.createBatch();
+
+
         // index member into entity collection | type scope
         IndexScope collectionIndexScope = new IndexScopeImpl(
             applicationScope.getApplication(), 
             cpHeadEntity.getId(), 
             CpEntityManager.getCollectionScopeNameFromCollectionName( collName ));
-        EntityIndex collectionIndex = managerCache.getEntityIndex(collectionIndexScope);
-        collectionIndex.index( memberEntity );
+
+        batch.index(collectionIndexScope, memberEntity );
 
         // index member into entity | all-types scope
         IndexScope entityAllTypesScope = new IndexScopeImpl(
             applicationScope.getApplication(), 
             cpHeadEntity.getId(), 
             ALL_TYPES);
-        EntityIndex entityAllCollectionIndex = managerCache.getEntityIndex(entityAllTypesScope);
-        entityAllCollectionIndex.index( memberEntity );
+
+        batch.index(entityAllTypesScope, memberEntity );
 
         // index member into application | all-types scope
         IndexScope appAllTypesScope = new IndexScopeImpl(
             applicationScope.getApplication(), 
             applicationScope.getApplication(), 
             ALL_TYPES);
-        EntityIndex allCollectionIndex = managerCache.getEntityIndex(appAllTypesScope);
-        allCollectionIndex.index( memberEntity );
+
+        batch.index( appAllTypesScope,  memberEntity );
+
+        batch.execute();
 
         logger.debug("Added entity {}:{} to collection {}", new String[] { 
             itemRef.getUuid().toString(), itemRef.getType(), collName }); 
@@ -844,13 +857,16 @@ public class CpRelationManager implements RelationManager {
         org.apache.usergrid.persistence.model.entity.Entity memberEntity = memberMgr.load(
             new SimpleId( itemRef.getUuid(), itemRef.getType() )).toBlockingObservable().last();
 
+        final EntityIndex ei = managerCache.getEntityIndex(applicationScope);
+        final EntityIndexBatch batch = ei.createBatch();
+
         // remove item from collection index
         IndexScope indexScope = new IndexScopeImpl(
             applicationScope.getApplication(), 
             cpHeadEntity.getId(), 
             CpEntityManager.getCollectionScopeNameFromCollectionName( collName ));
-        EntityIndex ei = managerCache.getEntityIndex(indexScope);
-        ei.deindex( memberEntity );
+
+        batch.deindex(indexScope,  memberEntity );
 
         // remove collection from item index 
         IndexScope itemScope = new IndexScopeImpl(
@@ -858,8 +874,11 @@ public class CpRelationManager implements RelationManager {
             memberEntity.getId(), 
             CpEntityManager.getCollectionScopeNameFromCollectionName( 
                 Schema.defaultCollectionName( cpHeadEntity.getId().getType() )));
-        ei = managerCache.getEntityIndex(itemScope);
-        ei.deindex( cpHeadEntity );
+
+
+        batch.deindex(itemScope,  cpHeadEntity );
+
+        batch.execute();
 
         // remove edge from collection to item 
         GraphManager gm = managerCache.getGraphManager(applicationScope);
@@ -958,7 +977,8 @@ public class CpRelationManager implements RelationManager {
             applicationScope.getApplication(), 
             cpHeadEntity.getId(), 
             CpEntityManager.getCollectionScopeNameFromCollectionName( collName ));
-        EntityIndex ei = managerCache.getEntityIndex(indexScope);
+
+        EntityIndex ei = managerCache.getEntityIndex(applicationScope);
       
         logger.debug("Searching scope {}:{}:{}",
             new String[] { 
@@ -969,7 +989,7 @@ public class CpRelationManager implements RelationManager {
         query.setEntityType( collection.getType() );
         query = adjustQuery( query );
 
-        CandidateResults crs = ei.search( query );
+        CandidateResults crs = ei.search(indexScope,  query );
 
         return buildResults( query, crs, collName );
 
@@ -1088,21 +1108,25 @@ public class CpRelationManager implements RelationManager {
         GraphManager gm = managerCache.getGraphManager(applicationScope);
         gm.writeEdge(edge).toBlockingObservable().last();
 
+        final EntityIndex ei = managerCache.getEntityIndex(applicationScope);
+        final EntityIndexBatch batch = ei.createBatch();
+
         // Index the new connection in app|source|type context
         IndexScope indexScope = new IndexScopeImpl(
             applicationScope.getApplication(), 
             cpHeadEntity.getId(), 
             CpEntityManager.getConnectionScopeName( connectedEntityRef.getType(), connectionType
));
-        EntityIndex ei = managerCache.getEntityIndex(indexScope);
-        ei.index( targetEntity );
+       batch.index( indexScope, targetEntity );
 
         // Index the new connection in app|scope|all-types context
         IndexScope allTypesIndexScope = new IndexScopeImpl(
             applicationScope.getApplication(), 
             cpHeadEntity.getId(), 
             ALL_TYPES);
-        EntityIndex aei = managerCache.getEntityIndex(allTypesIndexScope);
-        aei.index( targetEntity );
+
+        batch.index( allTypesIndexScope, targetEntity );
+
+        batch.execute();
 
         Keyspace ko = cass.getApplicationKeyspace( applicationId );
         Mutator<ByteBuffer> m = createMutator( ko, be );
@@ -1307,21 +1331,23 @@ public class CpRelationManager implements RelationManager {
         GraphManager gm = managerCache.getGraphManager(applicationScope);
         gm.deleteEdge(edge).toBlockingObservable().last();
 
+        final EntityIndex ei = managerCache.getEntityIndex( applicationScope )  ;
+        final EntityIndexBatch batch = ei.createBatch();
+
         // Deindex the connection in app|source|type context
         IndexScope indexScope = new IndexScopeImpl(
             applicationScope.getApplication(), 
             new SimpleId( connectingEntityRef.getUuid(), connectingEntityRef.getType() ),
             CpEntityManager.getConnectionScopeName( targetEntity.getId().getType(), connectionType
));
-        EntityIndex ei = managerCache.getEntityIndex( indexScope );
-        ei.deindex( targetEntity );
+        batch.deindex( indexScope , targetEntity );
 
         // Deindex the connection in app|source|type context
         IndexScope allTypesIndexScope = new IndexScopeImpl(
             applicationScope.getApplication(), 
             new SimpleId( connectingEntityRef.getUuid(), connectingEntityRef.getType() ),
             ALL_TYPES);
-        EntityIndex aei = managerCache.getEntityIndex(allTypesIndexScope);
-        aei.deindex( targetEntity );
+
+        batch.deindex( allTypesIndexScope,  targetEntity );
 
     }
 
@@ -1377,7 +1403,9 @@ public class CpRelationManager implements RelationManager {
                 applicationScope.getApplication(), 
                 cpHeadEntity.getId(), 
                 scopeName);
-            EntityIndex ei = managerCache.getEntityIndex(indexScope);
+
+            final EntityIndex ei = managerCache.getEntityIndex(applicationScope);
+
         
             logger.debug("Searching connected entities from scope {}:{}:{}", new String[]
{ 
                 indexScope.getApplication().toString(), 
@@ -1385,7 +1413,7 @@ public class CpRelationManager implements RelationManager {
                 indexScope.getName()}); 
 
             query = adjustQuery( query );
-            CandidateResults crs = ei.search( query );
+            CandidateResults crs = ei.search( indexScope, query );
 
             raw = buildResults( query , crs, query.getConnectionType() );
         }
@@ -1480,7 +1508,8 @@ public class CpRelationManager implements RelationManager {
                 applicationScope.getApplication(), 
                 cpHeadEntity.getId(), 
                 ALL_TYPES);
-            EntityIndex ei = managerCache.getEntityIndex(indexScope);
+
+            EntityIndex ei = managerCache.getEntityIndex(applicationScope);
         
             logger.debug("Searching connections from the all-types scope {}:{}:{}", new String[]
{ 
                 indexScope.getApplication().toString(), 
@@ -1488,7 +1517,7 @@ public class CpRelationManager implements RelationManager {
                 indexScope.getName()}); 
 
             query = adjustQuery( query );
-            CandidateResults crs = ei.search( query );
+            CandidateResults crs = ei.search(indexScope,  query );
 
             return buildConnectionResults(query , crs, query.getConnectionType() );
         }
@@ -1498,7 +1527,7 @@ public class CpRelationManager implements RelationManager {
             cpHeadEntity.getId(), 
             CpEntityManager.getConnectionScopeName( 
                     query.getEntityType(), query.getConnectionType() ));
-        EntityIndex ei = managerCache.getEntityIndex(indexScope);
+        EntityIndex ei = managerCache.getEntityIndex(applicationScope);
     
         logger.debug("Searching connections from the '{}' scope {}:{}:{}", new String[] {

             indexScope.getApplication().toString(), 
@@ -1506,7 +1535,7 @@ public class CpRelationManager implements RelationManager {
             indexScope.getName()}); 
 
         query = adjustQuery( query );
-        CandidateResults crs = ei.search( query );
+        CandidateResults crs = ei.search( indexScope, query );
 
         return buildConnectionResults(query , crs, query.getConnectionType() );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListenerTest.java
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListenerTest.java
index 6ee62c6..d59432b 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListenerTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListenerTest.java
@@ -35,6 +35,7 @@ import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImp
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.core.entity.EntityVersion;
 import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexScope;
 import org.apache.usergrid.persistence.index.query.CandidateResult;
@@ -82,6 +83,10 @@ public class CpEntityIndexDeleteListenerTest {
         when(scope.getApplication()).thenReturn(entityId);
         when(eif.createEntityIndex(any(IndexScope.class))).thenReturn(entityIndex);
 
+        final EntityIndexBatch batch = mock(EntityIndexBatch.class);
+
+        when(entityIndex.createBatch()).thenReturn( batch );
+
         CandidateResults results = mock(CandidateResults.class);
         List<CandidateResult> resultsList  = new ArrayList<>();
         resultsList.add(entity);
@@ -90,7 +95,7 @@ public class CpEntityIndexDeleteListenerTest {
         when(results.iterator()).thenReturn(entities);
         when(serializationFig.getBufferSize()).thenReturn(10);
         when(serializationFig.getHistorySize()).thenReturn(20);
-        when(entityIndex.getEntityVersions(entityId)).thenReturn(results);
+        when(entityIndex.getEntityVersions(any(IndexScope.class), entityId)).thenReturn(results);
         MvccEntity mvccEntity = new MvccEntityImpl(entityId,uuid, MvccEntity.Status.COMPLETE,mock(Entity.class));
 
 
@@ -98,6 +103,9 @@ public class CpEntityIndexDeleteListenerTest {
         Observable<EntityVersion> o = esEntityIndexDeleteListener.receive(event);
         EntityVersion testEntity = o.toBlocking().last();
         assertEquals(testEntity.getId(),mvccEntity.getId());
-        verify(entityIndex).deindex(entity.getId(),entity.getVersion());
+
+        verify(entityIndex).createBatch();
+
+        verify(batch).deindex(any(IndexScope.class), entity.getId(),entity.getVersion());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScopeImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScopeImpl.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScopeImpl.java
index 4e067c2..e8dbb02 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScopeImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScopeImpl.java
@@ -48,13 +48,13 @@ public class ApplicationScopeImpl implements ApplicationScope {
         if ( this == o ) {
             return true;
         }
-        if ( !( o instanceof ApplicationScopeImpl ) ) {
+        if ( !( o instanceof ApplicationScope ) ) {
             return false;
         }
 
-        final ApplicationScopeImpl that = ( ApplicationScopeImpl ) o;
+        final ApplicationScope that = ( ApplicationScope ) o;
 
-        if ( !application.equals( that.application ) ) {
+        if ( !application.equals( that.getApplication() ) ) {
             return false;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationTest.java
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationTest.java
index 9c29d56..57391de 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationTest.java
@@ -27,6 +27,7 @@ import java.util.UUID;
 
 import org.junit.Before;
 import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -656,6 +657,7 @@ public abstract class EdgeSerializationTest {
      * Test paging by resuming the search from the edge
      */
     @Test
+    @Ignore("Kills embedded cassandra")
     public void pageIteration() throws ConnectionException {
 
         int size = graphFig.getScanPageSize() * 2;
@@ -695,6 +697,7 @@ public abstract class EdgeSerializationTest {
      * edge types
      */
     @Test
+    @Ignore("Kills embedded cassandra")
     public void testIteratorPaging() throws ConnectionException {
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index d9857f2..93f0e41 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -42,6 +42,7 @@ import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.IndexFig;
 import org.apache.usergrid.persistence.index.IndexScope;
 import org.apache.usergrid.persistence.index.query.CandidateResult;
+import org.apache.usergrid.persistence.index.utils.IndexValidationUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.field.ArrayField;
@@ -91,15 +92,20 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
     private BulkRequestBuilder bulkRequest;
 
+    private final int autoFlushSize;
 
-    public EsEntityIndexBatchImpl( final ApplicationScope applicationScope,
-                                   final Client client, final IndexFig config, final Set<String>
knownTypes ) {
+    private int count;
+
+
+    public EsEntityIndexBatchImpl( final ApplicationScope applicationScope, final Client
client, final IndexFig config,
+                                   final Set<String> knownTypes, final int autoFlushSize
) {
 
         this.applicationScope = applicationScope;
         this.client = client;
         this.knownTypes = knownTypes;
         this.indexName = createIndexName( config.getIndexPrefix(), applicationScope );
         this.refresh = config.isForcedRefresh();
+        this.autoFlushSize = autoFlushSize;
         initBatch();
     }
 
@@ -107,6 +113,9 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     @Override
     public EntityIndexBatch index( final IndexScope indexScope, final Entity entity ) {
 
+
+        IndexValidationUtils.validateScopeMatch( indexScope, applicationScope );
+
         final String indexType = createCollectionScopeTypeName( indexScope );
 
         if ( log.isDebugEnabled() ) {
@@ -118,7 +127,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
         ValidationUtils.verifyEntityWrite( entity );
 
-        initType(indexScope,  indexType );
+        initType( indexScope, indexType );
 
         Map<String, Object> entityAsMap = entityToMap( entity );
 
@@ -135,12 +144,16 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
         bulkRequest.add( client.prepareIndex( indexName, indexType, indexId ).setSource(
entityAsMap ) );
 
+        maybeFlush();
+
         return this;
     }
 
 
     @Override
-    public EntityIndexBatch deindex(final IndexScope indexScope, final Id id, final UUID
version ) {
+    public EntityIndexBatch deindex( final IndexScope indexScope, final Id id, final UUID
version ) {
+
+        IndexValidationUtils.validateScopeMatch( indexScope, applicationScope );
 
         final String indexType = createCollectionScopeTypeName( indexScope );
 
@@ -158,6 +171,8 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
         log.debug( "Deindexed Entity with index id " + indexId );
 
+        maybeFlush();
+
         return this;
     }
 
@@ -165,14 +180,14 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     @Override
     public EntityIndexBatch deindex( final IndexScope indexScope, final Entity entity ) {
 
-       return  deindex(indexScope,  entity.getId(), entity.getVersion() );
+        return deindex( indexScope, entity.getId(), entity.getVersion() );
     }
 
 
     @Override
     public EntityIndexBatch deindex( final IndexScope indexScope, final CandidateResult entity
) {
 
-        return deindex(indexScope,  entity.getId(), entity.getVersion() );
+        return deindex( indexScope, entity.getId(), entity.getVersion() );
     }
 
 
@@ -184,7 +199,6 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
     /**
      * Execute the request, check for errors, then re-init the batch for future use
-     * @param request
      */
     private void execute( final BulkRequestBuilder request ) {
         final BulkResponse response = request.execute().actionGet();
@@ -199,7 +213,17 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
     @Override
     public void executeAndRefresh() {
-        execute(bulkRequest.setRefresh( true ) );
+        execute( bulkRequest.setRefresh( true ) );
+    }
+
+
+    private void maybeFlush() {
+        count++;
+
+        if ( count % autoFlushSize == 0 ) {
+            execute();
+            count = 0;
+        }
     }
 
 
@@ -217,6 +241,8 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         try {
             XContentBuilder mxcb = EsEntityIndexImpl.createDoubleStringIndexMapping( jsonBuilder(),
typeName );
 
+
+            //TODO Dave can this be collapsed into the build as well?
             admin.indices().preparePutMapping( indexName ).setType( typeName ).setSource(
mxcb ).execute().actionGet();
 
             admin.indices().prepareGetMappings( indexName ).addTypes( typeName ).execute().actionGet();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 077036a..56316d8 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -134,7 +134,7 @@ public class EsEntityIndexImpl implements EntityIndex {
 
     @Override
     public EntityIndexBatch createBatch() {
-        return new EsEntityIndexBatchImpl( applicationScope, client, config, knownTypes );
+        return new EsEntityIndexBatchImpl( applicationScope, client, config, knownTypes,
1000 );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/IndexValidationUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/IndexValidationUtils.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/IndexValidationUtils.java
index 899e7b0..d6080de 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/IndexValidationUtils.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/IndexValidationUtils.java
@@ -19,6 +19,7 @@
 package org.apache.usergrid.persistence.index.utils;
 
 
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.index.IndexScope;
 
 import com.google.common.base.Preconditions;
@@ -50,4 +51,13 @@ public class IndexValidationUtils {
     }
 
 
+    /**
+     * Validate the scope in the index matches the application scope
+     * @param indexScope
+     * @param scope
+     */
+    public static void validateScopeMatch(final IndexScope indexScope,final ApplicationScope
scope){
+        Preconditions.checkArgument( scope.equals( indexScope ) );
+    }
+
 }


Mime
View raw message