usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From toddn...@apache.org
Subject [2/3] incubator-usergrid git commit: First pass at upgrading to java 8 and latest RX java
Date Fri, 20 Mar 2015 00:31:39 GMT
First pass at upgrading to java 8 and latest RX java


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/282e2271
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/282e2271
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/282e2271

Branch: refs/heads/USERGRID-451
Commit: 282e22712890cdda0439a5694810cff632526d7b
Parents: 72ec19d
Author: Todd Nine <tnine@apigee.com>
Authored: Thu Mar 19 18:00:57 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Thu Mar 19 18:00:57 2015 -0600

----------------------------------------------------------------------
 stack/core/pom.xml                              |  26 +--
 .../corepersistence/CpEntityManager.java        |   2 +-
 .../corepersistence/CpEntityManagerFactory.java |   7 +-
 .../corepersistence/CpRelationManager.java      |  24 +--
 .../usergrid/corepersistence/CpWalker.java      |  77 +++-----
 .../events/EntityVersionDeletedHandler.java     |  72 +++----
 .../migration/EntityTypeMappingMigration.java   |  41 ++--
 .../migration/EntityTypeMappingMigrationIT.java |   2 +-
 .../impl/EntityCollectionManagerImpl.java       |  10 +-
 .../collection/impl/EntityDeletedTask.java      |  20 +-
 .../impl/EntityVersionCleanupTask.java          |  27 +--
 .../impl/EntityVersionCreatedTask.java          |  26 +--
 .../MvccEntitySerializationStrategyImpl.java    |  89 +++------
 .../MvccEntitySerializationStrategyV3Impl.java  |  91 +++------
 .../migration/MvccEntityDataMigrationImpl.java  | 169 +++++++---------
 .../persistence/collection/rx/ParallelTest.java |  10 +-
 ...ctMvccEntityDataMigrationV1ToV3ImplTest.java |   2 +-
 stack/corepersistence/common/pom.xml            |  15 +-
 .../astyanax/MultiKeyColumnNameIterator.java    |   4 +-
 .../MultiKeyColumnNameIteratorTest.java         | 187 ++++++++----------
 .../astyanax/MultiRowColumnIteratorTest.java    |  50 ++---
 .../graph/impl/GraphManagerImpl.java            |   6 +-
 .../graph/impl/stage/EdgeMetaRepairImpl.java    |   2 +
 .../impl/stage/NodeDeleteListenerImpl.java      |   2 +-
 .../impl/migration/EdgeDataMigrationImpl.java   |  87 ++++-----
 .../graph/GraphManagerShardConsistencyIT.java   |   2 +-
 .../usergrid/persistence/graph/SimpleTest.java  |  12 +-
 .../migration/EdgeDataMigrationImplTest.java    |   2 +-
 stack/corepersistence/pom.xml                   |   8 +-
 .../index/impl/IndexLoadTestsIT.java            | 105 ++++------
 stack/pom.xml                                   |   8 +-
 .../management/importer/ImportServiceImpl.java  |  34 ++--
 .../impl/ApplicationQueueManagerImpl.java       | 195 +++++++++----------
 .../setup/ConcurrentProcessSingleton.java       |  16 +-
 34 files changed, 604 insertions(+), 826 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/pom.xml
----------------------------------------------------------------------
diff --git a/stack/core/pom.xml b/stack/core/pom.xml
index 971ee62..119a52b 100644
--- a/stack/core/pom.xml
+++ b/stack/core/pom.xml
@@ -130,15 +130,7 @@
           </execution>
         </executions>
       </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <version>2.3.2</version>
-        <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
-        </configuration>
-      </plugin>
+
 
 <!--            <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
@@ -481,17 +473,11 @@
       <version>${metrics.version}</version>
     </dependency>
 
-    <dependency>
-      <groupId>com.netflix.rxjava</groupId>
-      <artifactId>rxjava-core</artifactId>
-      <version>${rx.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>com.netflix.rxjava</groupId>
-      <artifactId>rxjava-math</artifactId>
-      <version>${rx.version}</version>
-    </dependency>
+      <dependency>
+          <groupId>io.reactivex</groupId>
+          <artifactId>rxjava</artifactId>
+          <version>${rx.version}</version>
+      </dependency>
 
     <dependency>
       <groupId>com.clearspring.analytics</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 789e640..9cffdaf 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -1097,7 +1097,7 @@ public class CpEntityManager implements EntityManager {
         } );
 
         //TODO: does this call and others like it need a graphite reporter?
-        cpEntity = ecm.write( cpEntity ).toBlockingObservable().last();
+        cpEntity = ecm.write( cpEntity ).toBlocking().last();
 
         logger.debug( "Wrote {}:{} version {}", new Object[] {
                 cpEntity.getId().getType(), cpEntity.getId().getUuid(), cpEntity.getVersion()

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index f76b9fc..83c3d85 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -451,7 +451,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
                 fromEntityId, edgeType, Long.MAX_VALUE,
                 SearchByEdgeType.Order.DESCENDING, null ));
 
-        Iterator<Edge> iter = edges.toBlockingObservable().getIterator();
+        //TODO This is wrong, and will result in OOM if there are too many applications.  This needs to stream properly with a buffer
+        Iterator<Edge> iter = edges.toBlocking().getIterator();
         while ( iter.hasNext() ) {
 
             Edge edge = iter.next();
@@ -469,7 +470,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
 
             org.apache.usergrid.persistence.model.entity.Entity e =
                     managerCache.getEntityCollectionManager( collScope ).load( targetId )
-                        .toBlockingObservable().lastOrDefault(null);
+                        .toBlocking().lastOrDefault(null);
 
             if ( e == null ) {
                 logger.warn("Applicaion {} in index but not found in collections", targetId );
@@ -624,7 +625,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
     public long performEntityCount() {
         //TODO, this really needs to be a task that writes this data somewhere since this will get
         //progressively slower as the system expands
-        return (Long) getAllEntitiesObservable().longCount().toBlocking().last();
+        return (Long) getAllEntitiesObservable().countLong().toBlocking().last();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 2eeee28..c4e970d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -280,7 +280,7 @@ public class CpRelationManager implements RelationManager {
         Observable<String> types= gm.getEdgeTypesFromSource(
             new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix,  null ));
 
-        Iterator<String> iter = types.toBlockingObservable().getIterator();
+        Iterator<String> iter = types.toBlocking().getIterator();
         while ( iter.hasNext() ) {
             indexes.add( iter.next() );
         }
@@ -346,7 +346,7 @@ public class CpRelationManager implements RelationManager {
             Observable<Edge> edges = gm.loadEdgesToTarget( new SimpleSearchByEdgeType(
                 cpHeadEntity.getId(), etype, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, null ));
 
-            Iterator<Edge> iter = edges.toBlockingObservable().getIterator();
+            Iterator<Edge> iter = edges.toBlocking().getIterator();
             while ( iter.hasNext() ) {
                 Edge edge = iter.next();
 
@@ -383,7 +383,7 @@ public class CpRelationManager implements RelationManager {
         final GraphManager gm = managerCache.getGraphManager( applicationScope );
 
         Iterator<String> edgeTypesToTarget = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType(
-            cpHeadEntity.getId(), null, null) ).toBlockingObservable().getIterator();
+            cpHeadEntity.getId(), null, null) ).toBlocking().getIterator();
 
         logger.debug("updateContainingCollectionsAndCollections(): "
                 + "Searched for edges to target {}:{}\n   in scope {}\n   found: {}",
@@ -484,7 +484,7 @@ public class CpRelationManager implements RelationManager {
             SearchByEdgeType.Order.DESCENDING,
             null ) );
 
-        return edges.toBlockingObservable().firstOrDefault( null ) != null;
+        return edges.toBlocking().firstOrDefault( null ) != null;
     }
 
 
@@ -511,7 +511,7 @@ public class CpRelationManager implements RelationManager {
             SearchByEdgeType.Order.DESCENDING,
             null ) );
 
-        return edges.toBlockingObservable().firstOrDefault( null ) != null;
+        return edges.toBlocking().firstOrDefault( null ) != null;
     }
 
 
@@ -528,7 +528,7 @@ public class CpRelationManager implements RelationManager {
             SearchByEdgeType.Order.DESCENDING,
             null ) ); // last
 
-        Iterator<Edge> iterator = edgesToTarget.toBlockingObservable().getIterator();
+        Iterator<Edge> iterator = edgesToTarget.toBlocking().getIterator();
         int count = 0;
         while ( iterator.hasNext() ) {
             iterator.next();
@@ -569,7 +569,7 @@ public class CpRelationManager implements RelationManager {
         Observable<String> str = gm.getEdgeTypesFromSource(
                 new SimpleSearchEdgeType( cpHeadEntity.getId(), null, null ) );
 
-        Iterator<String> iter = str.toBlockingObservable().getIterator();
+        Iterator<String> iter = str.toBlocking().getIterator();
         while ( iter.hasNext() ) {
             String edgeType = iter.next();
             indexes.add( CpNamingUtils.getCollectionName( edgeType ) );
@@ -692,7 +692,7 @@ public class CpRelationManager implements RelationManager {
         // create graph edge connection from head entity to member entity
         Edge edge = new SimpleEdge( cpHeadEntity.getId(), edgeType, memberEntity.getId(), uuidHash );
         GraphManager gm = managerCache.getGraphManager( applicationScope );
-        gm.writeEdge( edge ).toBlockingObservable().last();
+        gm.writeEdge( edge ).toBlocking().last();
 
         logger.debug( "Wrote edgeType {}\n   from {}:{}\n   to {}:{}\n   scope {}:{}",
             new Object[] {
@@ -855,7 +855,7 @@ public class CpRelationManager implements RelationManager {
                 cpHeadEntity.getId(),
                 CpNamingUtils.getEdgeTypeFromCollectionName( collName ),
                 memberEntity.getId(), UUIDUtils.getUUIDLong( memberEntity.getId().getUuid() ) );
-        gm.deleteEdge( collectionToItemEdge ).toBlockingObservable().last();
+        gm.deleteEdge( collectionToItemEdge ).toBlocking().last();
 
         // remove edge from item to collection
         Edge itemToCollectionEdge = new SimpleEdge(
@@ -865,7 +865,7 @@ public class CpRelationManager implements RelationManager {
                 cpHeadEntity.getId(),
                 UUIDUtils.getUUIDLong( cpHeadEntity.getId().getUuid() ) );
 
-        gm.deleteEdge( itemToCollectionEdge ).toBlockingObservable().last();
+        gm.deleteEdge( itemToCollectionEdge ).toBlocking().last();
 
         // special handling for roles collection of a group
         if ( headEntity.getType().equals( Group.ENTITY_TYPE ) ) {
@@ -1058,7 +1058,7 @@ public class CpRelationManager implements RelationManager {
                 cpHeadEntity.getId(), edgeType, targetEntity.getId(), System.currentTimeMillis() );
 
         GraphManager gm = managerCache.getGraphManager( applicationScope );
-        gm.writeEdge( edge ).toBlockingObservable().last();
+        gm.writeEdge( edge ).toBlocking().last();
 
         EntityIndex ei = managerCache.getEntityIndex( applicationScope );
         EntityIndexBatch batch = ei.createBatch();
@@ -1290,7 +1290,7 @@ public class CpRelationManager implements RelationManager {
                 System.currentTimeMillis() );
 
         GraphManager gm = managerCache.getGraphManager( applicationScope );
-        gm.deleteEdge( edge ).toBlockingObservable().last();
+        gm.deleteEdge( edge ).toBlocking().last();
 
         final EntityIndex ei = managerCache.getEntityIndex( applicationScope );
         final EntityIndexBatch batch = ei.createBatch();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
index 4b902d8..332d5a8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
@@ -104,53 +104,38 @@ public class CpWalker {
         Observable<String> edgeTypes = gm.getEdgeTypesFromSource(
             new SimpleSearchEdgeType( applicationId, edgeType, null ) );
 
-        edgeTypes.flatMap( new Func1<String, Observable<Edge>>() {
-            @Override
-            public Observable<Edge> call( final String edgeType ) {
-
-                logger.debug( "Loading edges of type {} from node {}", edgeType, applicationId );
-
-                return gm.loadEdgesFromSource(  new SimpleSearchByEdgeType(
-                    applicationId, edgeType, Long.MAX_VALUE, order , null ) );
-
-            }
-
-        } ).parallel( new Func1<Observable<Edge>, Observable<Edge>>() {
-
-            @Override
-            public Observable<Edge> call( final Observable<Edge> edgeObservable ) { // process edges in parallel
-                return edgeObservable.doOnNext( new Action1<Edge>() { // visit and update then entity
-
-                    @Override
-                    public void call( Edge edge ) {
-
-                        logger.info( "Re-indexing edge {}", edge );
-
-                        EntityRef targetNodeEntityRef = new SimpleEntityRef(
-                            edge.getTargetNode().getType(),
-                            edge.getTargetNode().getUuid() );
-
-                        Entity entity;
-                        try {
-                            entity = em.get( targetNodeEntityRef );
-                        }
-                        catch ( Exception ex ) {
-                            logger.error( "Error getting sourceEntity {}:{}, continuing",
-                                targetNodeEntityRef.getType(),
-                                targetNodeEntityRef.getUuid() );
-                            return;
-                        }
-                        if(entity == null){
-                            return;
-                        }
-                        String collName = CpNamingUtils.getCollectionName( edge.getType() );
-                        visitor.visitCollectionEntry( em, collName, entity );
-                    }
-                } );
-            }
-        }, Schedulers.io() )
+        edgeTypes.flatMap( emittedEdgeType -> {
+
+            logger.debug( "Loading edges of type {} from node {}", edgeType, applicationId );
+
+            return gm.loadEdgesFromSource(
+                new SimpleSearchByEdgeType( applicationId, emittedEdgeType, Long.MAX_VALUE, order, null ) );
+        } ).flatMap( edge -> {
+            //run each edge through it's own scheduler, up to 100 at a time
+            return Observable.just( edge ).doOnNext( edgeValue -> {
+                logger.info( "Re-indexing edge {}", edgeValue );
+
+                EntityRef targetNodeEntityRef =
+                    new SimpleEntityRef( edgeValue.getTargetNode().getType(), edgeValue.getTargetNode().getUuid() );
+
+                Entity entity;
+                try {
+                    entity = em.get( targetNodeEntityRef );
+                }
+                catch ( Exception ex ) {
+                    logger.error( "Error getting sourceEntity {}:{}, continuing", targetNodeEntityRef.getType(),
+                        targetNodeEntityRef.getUuid() );
+                    return;
+                }
+                if ( entity == null ) {
+                    return;
+                }
+                String collName = CpNamingUtils.getCollectionName( edgeValue.getType() );
+                visitor.visitCollectionEntry( em, collName, entity );
+            } ).subscribeOn( Schedulers.io() );
+        }, 100 );
 
         // wait for it to complete
-        .toBlocking().lastOrDefault( null ); // end foreach on edges
+        edgeTypes.toBlocking().lastOrDefault( null ); // end foreach on edges
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
index c45949b..23f5a32 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
@@ -17,53 +17,48 @@
  */
 package org.apache.usergrid.corepersistence.events;
 
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
 
 import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.usergrid.corepersistence.CpEntityManagerFactory;
-import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
 import org.apache.usergrid.persistence.EntityManagerFactory;
 import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
-import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.IndexScope;
 import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
 
 import rx.Observable;
-import rx.functions.Action1;
-import rx.functions.Action2;
-import rx.functions.Func1;
-import rx.schedulers.Schedulers;
+
+import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
 
 
 /**
- * Remove Entity index when specific version of Entity is deleted.
- * TODO: do we need this? Don't our version-created and entity-deleted handlers take care of this?
- * If we do need it then it should be wired in via GuiceModule in the corepersistence package.
+ * Remove Entity index when specific version of Entity is deleted. TODO: do we need this? Don't our version-created and
+ * entity-deleted handlers take care of this? If we do need it then it should be wired in via GuiceModule in the
+ * corepersistence package.
  */
 @Singleton
 public class EntityVersionDeletedHandler implements EntityVersionDeleted {
-    private static final Logger logger = LoggerFactory.getLogger(EntityVersionDeletedHandler.class );
-
-
+    private static final Logger logger = LoggerFactory.getLogger( EntityVersionDeletedHandler.class );
 
 
     private final EntityManagerFactory emf;
 
+
     @Inject
     public EntityVersionDeletedHandler( final EntityManagerFactory emf ) {this.emf = emf;}
 
 
-
     @Override
     public void versionDeleted( final CollectionScope scope, final Id entityId,
                                 final List<MvccLogEntry> entityVersions ) {
@@ -71,40 +66,33 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted {
 
         // This check is for testing purposes and for a test that to be able to dynamically turn
         // off and on delete previous versions so that it can test clean-up on read.
-        if ( System.getProperty( EVENTS_DISABLED, "false" ).equals( "true" )) {
+        if ( System.getProperty( EVENTS_DISABLED, "false" ).equals( "true" ) ) {
             return;
         }
 
-        if(logger.isDebugEnabled()) {
-            logger.debug( "Handling versionDeleted count={} event for entity {}:{} v {} " + "scope\n   name: {}\n   owner: {}\n   app: {}",
-                new Object[] {
+        if ( logger.isDebugEnabled() ) {
+            logger.debug( "Handling versionDeleted count={} event for entity {}:{} v {} "
+                    + "scope\n   name: {}\n   owner: {}\n   app: {}", new Object[] {
                     entityVersions.size(), entityId.getType(), entityId.getUuid(), scope.getName(), scope.getOwner(),
                     scope.getApplication()
                 } );
         }
 
-        CpEntityManagerFactory cpemf = (CpEntityManagerFactory)emf;
+        CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf;
 
         final EntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope );
 
-        final IndexScope indexScope = new IndexScopeImpl(
-                new SimpleId(scope.getOwner().getUuid(), scope.getOwner().getType()),
-                scope.getName()
-        );
-
-        Observable.from( entityVersions )
-            .collect( ei.createBatch(), new Action2<EntityIndexBatch, MvccLogEntry>() {
-                @Override
-                public void call( final EntityIndexBatch entityIndexBatch, final MvccLogEntry mvccLogEntry ) {
-                    entityIndexBatch.deindex( indexScope, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion() );
-                }
-            } ).doOnNext( new Action1<EntityIndexBatch>() {
-            @Override
-            public void call( final EntityIndexBatch entityIndexBatch ) {
+        final IndexScope indexScope =
+            new IndexScopeImpl( new SimpleId( scope.getOwner().getUuid(), scope.getOwner().getType() ),
+                scope.getName() );
+
+        //create our batch, and then collect all of them into a single batch
+        Observable.from( entityVersions ).collect( () -> ei.createBatch(), ( entityIndexBatch, mvccLogEntry ) -> {
+            entityIndexBatch.deindex( indexScope, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion() );
+        } )
+            //after our batch is collected, execute it
+            .doOnNext( entityIndexBatch -> {
                 entityIndexBatch.execute();
-            }
-        } ).toBlocking().last();
+            } ).toBlocking().last();
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
index 40ad236..6531d16 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
@@ -37,6 +37,7 @@ import com.google.inject.Inject;
 import rx.Observable;
 import rx.functions.Action1;
 import rx.functions.Func1;
+import rx.schedulers.Schedulers;
 
 
 /**
@@ -63,36 +64,26 @@ public class EntityTypeMappingMigration implements DataMigration<EntityIdScope>
         final AtomicLong atomicLong = new AtomicLong();
 
 
-        allEntitiesInSystemObservable.getData()
-                                            //process the entities in parallel
-         .parallel( new Func1<Observable<EntityIdScope>, Observable<EntityIdScope>>() {
+        //migrate up to 100 types simultaneously
+        allEntitiesInSystemObservable.getData().flatMap( entityIdScope -> {
+            return Observable.just( entityIdScope ).doOnNext( entityIdScopeObservable -> {
+                final MapScope ms = CpNamingUtils
+                                                 .getEntityTypeMapScope( entityIdScope.getCollectionScope().getApplication() );
 
+                                             final MapManager mapManager = managerCache.getMapManager( ms );
 
-                 @Override
-                 public Observable<EntityIdScope> call( final Observable<EntityIdScope> entityIdScopeObservable ) {
+                                             final UUID entityUuid = entityIdScope.getId().getUuid();
+                                             final String entityType = entityIdScope.getId().getType();
 
-                     //for each entity observable, get the map scope and write it to the map
-                     return entityIdScopeObservable.doOnNext( new Action1<EntityIdScope>() {
-                         @Override
-                         public void call( final EntityIdScope entityIdScope ) {
-                             final MapScope ms = CpNamingUtils
-                                 .getEntityTypeMapScope( entityIdScope.getCollectionScope().getApplication() );
+                                             mapManager.putString( entityUuid.toString(), entityType );
 
-                             final MapManager mapManager = managerCache.getMapManager( ms );
+                                             if ( atomicLong.incrementAndGet() % 100 == 0 ) {
+                                                 observer.update( getMaxVersion(),
+                                                     String.format( "Updated %d entities", atomicLong.get() ) );
+                                             }
 
-                             final UUID entityUuid = entityIdScope.getId().getUuid();
-                             final String entityType = entityIdScope.getId().getType();
-
-                             mapManager.putString( entityUuid.toString(), entityType );
-
-                             if ( atomicLong.incrementAndGet() % 100 == 0 ) {
-                                 observer.update( getMaxVersion(),
-                                     String.format( "Updated %d entities", atomicLong.get() ) );
-                             }
-                         }
-                     } );
-                 }
-             } ).count().toBlocking().last();
+            } ).subscribeOn( Schedulers.io() );
+        }, 100 ).count().toBlocking().last();
 
 
         return getMaxVersion();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
index be7cee4..88f56c8 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
@@ -77,7 +77,7 @@ public class EntityTypeMappingMigrationIT  {
         final MapScope mapScope2 = new MapScopeImpl(applicationId, CpNamingUtils.TYPES_BY_UUID_MAP );
 
 
-        final Observable<EntityIdScope> scopes = Observable.from(idScope1, idScope2);
+        final Observable<EntityIdScope> scopes = Observable.just(idScope1, idScope2);
 
         final TestMigrationDataProvider<EntityIdScope> migrationDataProvider = new TestMigrationDataProvider<>();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index f565fab..70b5a3a 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -235,7 +235,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
 
         final Timer.Context timer = deleteTimer.time();
-        Observable<Id> o = Observable.from(new CollectionIoEvent<Id>(collectionScope, entityId))
+        Observable<Id> o = Observable.just( new CollectionIoEvent<Id>( collectionScope, entityId ) )
             .map(markStart)
             .doOnNext( markCommit )
             .map(new Func1<CollectionIoEvent<MvccEntity>, Id>() {
@@ -284,7 +284,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                     return Observable.empty();
                 }
 
-                return Observable.from(entity.getEntity().get());
+                return Observable.just( entity.getEntity().get() );
             }
         })
             .doOnNext( new Action1<Entity>() {
@@ -449,19 +449,19 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,
                                                                   WriteStart writeState ) {
 
-        return Observable.from( writeData ).map( writeState ).doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() {
+        return Observable.just( writeData ).map( writeState ).doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() {
 
                     @Override
                     public void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
 
                         Observable<CollectionIoEvent<MvccEntity>> unique =
-                                Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
+                                Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
                                           .doOnNext( writeVerifyUnique );
 
 
                         // optimistic verification
                         Observable<CollectionIoEvent<MvccEntity>> optimistic =
-                                Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
+                                Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
                                           .doOnNext( writeOptimisticVerify );
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
index 5472645..7620907 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
@@ -127,22 +127,10 @@ public class EntityDeletedTask implements Task<Void> {
 
         LOG.debug( "Started firing {} listeners", listenerSize );
 
-        //if we have more than 1, run them on the rx scheduler for a max of 8 operations at a time
-        Observable.from(listeners)
-                .parallel( new Func1<Observable<EntityDeleted>, Observable<EntityDeleted>>() {
-
-                    @Override
-                    public Observable<EntityDeleted> call(
-                            final Observable<EntityDeleted> entityVersionDeletedObservable ) {
-
-                        return entityVersionDeletedObservable.doOnNext( new Action1<EntityDeleted>() {
-                            @Override
-                            public void call( final EntityDeleted listener ) {
-                                listener.deleted(collectionScope, entityId, version);
-                            }
-                        } );
-                    }
-                }, Schedulers.io() ).toBlocking().last();
+        //if we have more than 1, run them on the rx scheduler for a max of 10 operations at a time
+        Observable.from(listeners).flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> {
+            listener.deleted( collectionScope, entityId, version );
+        } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last();
 
         LOG.debug( "Finished firing {} listeners", listenerSize );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index b245528..1a7b86b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -159,7 +159,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
                             throw new RuntimeException( "Unable to execute batch mutation", e );
                         }
                     }
-                } ).subscribeOn( Schedulers.io() ).longCount().toBlocking();
+                } ).subscribeOn( Schedulers.io() ).countLong().toBlocking();
 
 
         //start calling the listeners for remove log entries
@@ -201,7 +201,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
                             throw new RuntimeException( "Unable to execute batch mutation", e );
                         }
                     }
-                } ).subscribeOn( Schedulers.io() ).longCount().toBlocking();
+                } ).subscribeOn( Schedulers.io() ).countLong().toBlocking();
 
         //wait or this to complete
         final Long removedCount = uniqueValueCleanup.last();
@@ -232,21 +232,14 @@ public class EntityVersionCleanupTask implements Task<Void> {
         logger.debug( "Started firing {} listeners", listenerSize );
 
         //if we have more than 1, run them on the rx scheduler for a max of 8 operations at a time
-        Observable.from( listeners )
-                  .parallel( new Func1<Observable<EntityVersionDeleted>, Observable<EntityVersionDeleted>>() {
-
-                      @Override
-                      public Observable<EntityVersionDeleted> call(
-                              final Observable<EntityVersionDeleted> entityVersionDeletedObservable ) {
-
-                          return entityVersionDeletedObservable.doOnNext( new Action1<EntityVersionDeleted>() {
-                              @Override
-                              public void call( final EntityVersionDeleted listener ) {
-                                  listener.versionDeleted( scope, entityId, versions );
-                              }
-                          } );
-                      }
-                  }, Schedulers.io() ).toBlocking().last();
+
+
+        //if we have more than 1, run them on the rx scheduler for a max of 10 operations at a time
+        Observable.from(listeners).flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> {
+            listener.versionDeleted( scope, entityId, versions );
+        } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last();
+
+
 
         logger.debug( "Finished firing {} listeners", listenerSize );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
index 7d3beb1..16a6e77 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
@@ -64,7 +64,7 @@ public class EntityVersionCreatedTask implements Task<Void> {
     @Override
     public Void rejected() {
 
-        // Our task was rejected meaning our queue was full.  
+        // Our task was rejected meaning our queue was full.
         // We need this operation to run, so we'll run it in our current thread
         try {
             call();
@@ -76,7 +76,7 @@ public class EntityVersionCreatedTask implements Task<Void> {
         return null;
     }
 
-    
+
     @Override
     public Void call() throws Exception {
 
@@ -100,22 +100,12 @@ public class EntityVersionCreatedTask implements Task<Void> {
 
         logger.debug( "Started firing {} listeners", listenerSize );
 
-        //if we have more than 1, run them on the rx scheduler for a max of 8 operations at a time
-        Observable.from(listeners).parallel( 
-            new Func1<Observable<EntityVersionCreated>, Observable<EntityVersionCreated>>() {
-
-                @Override
-                public Observable<EntityVersionCreated> call(
-                    final Observable<EntityVersionCreated> entityVersionCreatedObservable ) {
-
-                    return entityVersionCreatedObservable.doOnNext( new Action1<EntityVersionCreated>() {
-                        @Override
-                        public void call( final EntityVersionCreated listener ) {
-                            listener.versionCreated(collectionScope,entity);
-                        }
-                    } );
-                }
-            }, Schedulers.io() ).toBlocking().last();
+
+        Observable.from( listeners )
+                  .flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> {
+                      listener.versionCreated( collectionScope, entity );
+                  } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last();
+
 
         logger.debug( "Finished firing {} listeners", listenerSize );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
index e1445e3..ad1d91a 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
@@ -176,77 +176,52 @@ public abstract class MvccEntitySerializationStrategyImpl implements MvccEntityS
         }
 
 
-       final EntitySetImpl entitySetResults =  Observable.from( rowKeys )
-               //buffer our entities per request, then for that buffer, execute the query in parallel (if neccessary)
-                                                         .buffer(entitiesPerRequest )
-                                                         .parallel( new Func1<Observable<List<ScopedRowKey
-                <CollectionPrefixedKey<Id>>>>, Observable<Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>>>() {
+        final EntitySetImpl entitySetResults = Observable.from( rowKeys )
+            //buffer our entities per request, then for that buffer, execute the query in parallel (if neccessary)
+            .buffer( entitiesPerRequest ).flatMap( listObservable -> {
 
 
-            @Override
-            public Observable<Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>> call(
-                    final Observable<List<ScopedRowKey<CollectionPrefixedKey<Id>>>> listObservable ) {
-
-
-                 //here, we execute our query then emit the items either in parallel, or on the current thread if we have more than 1 request
-                return listObservable.map( new Func1<List<ScopedRowKey<CollectionPrefixedKey<Id>>>,
-                        Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>>() {
-
+                //here, we execute our query then emit the items either in parallel, or on the current thread
+                // if we have more than 1 request
+                return Observable.just( listObservable ).map( scopedRowKeys -> {
 
-                    @Override
-                    public Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> call(
-                            final List<ScopedRowKey<CollectionPrefixedKey<Id>>> scopedRowKeys ) {
-
-                            try {
-                                return keyspace.prepareQuery( columnFamily ).getKeySlice( rowKeys )
-                                                              .withColumnRange( maxVersion, null, false,
-                                                                      1 ).execute().getResult();
-                            }
-                            catch ( ConnectionException e ) {
-                                throw new CollectionRuntimeException( null, collectionScope, "An error occurred connecting to cassandra",
-                                        e );
-                            }
+                    try {
+                        return keyspace.prepareQuery( columnFamily ).getKeySlice( rowKeys )
+                                       .withColumnRange( maxVersion, null, false, 1 ).execute().getResult();
                     }
-                } );
-
-
-
-            }
-        }, scheduler )
-
-               //reduce all the output into a single Entity set
-               .reduce( new EntitySetImpl( entityIds.size() ),
-                new Func2<EntitySetImpl, Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>, EntitySetImpl>() {
-                    @Override
-                    public EntitySetImpl call( final EntitySetImpl entitySet,
-                                               final Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> rows ) {
-
-                        final Iterator<Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>> latestEntityColumns = rows.iterator();
+                    catch ( ConnectionException e ) {
+                        throw new CollectionRuntimeException( null, collectionScope,
+                            "An error occurred connecting to cassandra", e );
+                    }
+                } ).subscribeOn( scheduler );
+            }, 10 )
 
-                        while ( latestEntityColumns.hasNext() ) {
-                                   final Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> row = latestEntityColumns.next();
+            .reduce( new EntitySetImpl( entityIds.size() ), ( entitySet, rows ) -> {
+                final Iterator<Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID>> latestEntityColumns =
+                    rows.iterator();
 
-                                   final ColumnList<UUID> columns = row.getColumns();
+                while ( latestEntityColumns.hasNext() ) {
+                    final Row<ScopedRowKey<CollectionPrefixedKey<Id>>, UUID> row = latestEntityColumns.next();
 
-                                   if ( columns.size() == 0 ) {
-                                       continue;
-                                   }
+                    final ColumnList<UUID> columns = row.getColumns();
 
-                                   final Id entityId = row.getKey().getKey().getSubKey();
+                    if ( columns.size() == 0 ) {
+                        continue;
+                    }
 
-                                   final Column<UUID> column = columns.getColumnByIndex( 0 );
+                    final Id entityId = row.getKey().getKey().getSubKey();
 
-                                   final MvccEntity parsedEntity =
-                                           new MvccColumnParser( entityId, getEntitySerializer() ).parseColumn( column );
+                    final Column<UUID> column = columns.getColumnByIndex( 0 );
 
-                                    entitySet.addEntity( parsedEntity );
-                               }
+                    final MvccEntity parsedEntity =
+                        new MvccColumnParser( entityId, getEntitySerializer() ).parseColumn( column );
 
+                    entitySet.addEntity( parsedEntity );
+                }
 
 
-                        return entitySet;
-                    }
-                } ).toBlocking().last();
+                return entitySet;
+            } ).toBlocking().last();
 
         return entitySetResults;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
index a5046f6..de959b5 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV3Impl.java
@@ -185,82 +185,55 @@ public class MvccEntitySerializationStrategyV3Impl implements MvccEntitySerializ
 
 
         final EntitySetImpl entitySetResults = Observable.from( rowKeys )
-                //buffer our entities per request, then for that buffer, execute the query in parallel (if neccessary)
-                .buffer( entitiesPerRequest ).parallel(
-                        new Func1<Observable<List<ScopedRowKey<CollectionPrefixedKey<Id>>>>,
-                                Observable<Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean>>>() {
+            //buffer our entities per request, then for that buffer, execute the query in parallel (if neccessary)
+            .buffer( entitiesPerRequest ).flatMap( listObservable -> {
 
 
-                            @Override
-                            public Observable<Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean>> call(
-                                    final Observable<List<ScopedRowKey<CollectionPrefixedKey<Id>>>> listObservable ) {
+                //here, we execute our query then emit the items either in parallel, or on the current thread
+                // if we have more than 1 request
+                return Observable.just( listObservable ).map( scopedRowKeys -> {
 
 
-                                //here, we execute our query then emit
-                                // the items either in parallel, or on
-                                // the current thread if we have more
-                                // than 1 request
-                                return listObservable
-                                        .map( new Func1<List<ScopedRowKey<CollectionPrefixedKey<Id>>>,
-                                                Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean>>() {
+                    try {
+                        return keyspace.prepareQuery( CF_ENTITY_DATA ).getKeySlice( rowKeys )
+                                       .withColumnSlice( COL_VALUE ).execute().getResult();
+                    }
+                    catch ( ConnectionException e ) {
+                        throw new CollectionRuntimeException( null, collectionScope,
+                            "An error occurred connecting to cassandra", e );
+                    }
+                } ).subscribeOn( scheduler );
+            }, 10 )
 
+            .reduce( new EntitySetImpl( entityIds.size() ), ( entitySet, rows ) -> {
+                final Iterator<Row<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean>> latestEntityColumns =
+                    rows.iterator();
 
-                                            @Override
-                                            public Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean> call(
-                                                    final List<ScopedRowKey<CollectionPrefixedKey<Id>>> scopedRowKeys
-                                                                                                           ) {
+                while ( latestEntityColumns.hasNext() ) {
+                    final Row<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean> row = latestEntityColumns.next();
 
-                                                try {
-                                                    return keyspace.prepareQuery( CF_ENTITY_DATA )
-                                                                   .getKeySlice( rowKeys )
-                                                                    .withColumnSlice( COL_VALUE )
-                                                                   .execute().getResult();
-                                                }
-                                                catch ( ConnectionException e ) {
-                                                    throw new CollectionRuntimeException( null, collectionScope,
-                                                            "An error occurred connecting to cassandra", e );
-                                                }
-                                            }
-                                        } );
-                            }
-                        }, scheduler )
+                    final ColumnList<Boolean> columns = row.getColumns();
 
-                        //reduce all the output into a single Entity set
-                .reduce( new EntitySetImpl( entityIds.size() ),
-                        new Func2<EntitySetImpl, Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean>, EntitySetImpl>() {
-                            @Override
-                            public EntitySetImpl call( final EntitySetImpl entitySet,
-                                                       final Rows<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean> rows
-                                                     ) {
+                    if ( columns.size() == 0 ) {
+                        continue;
+                    }
 
-                                final Iterator<Row<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean>> latestEntityColumns =
-                                        rows.iterator();
+                    final Id entityId = row.getKey().getKey().getSubKey();
 
-                                while ( latestEntityColumns.hasNext() ) {
-                                    final Row<ScopedRowKey<CollectionPrefixedKey<Id>>, Boolean> row =
-                                            latestEntityColumns.next();
+                    final Column<Boolean> column = columns.getColumnByIndex( 0 );
 
-                                    final ColumnList<Boolean> columns = row.getColumns();
+                    final MvccEntity parsedEntity =
+                        new MvccColumnParser( entityId, entitySerializer ).parseColumn( column );
 
-                                    if ( columns.size() == 0 ) {
-                                        continue;
-                                    }
 
-                                    final Id entityId = row.getKey().getKey().getSubKey();
+                    entitySet.addEntity( parsedEntity );
+                }
 
-                                    final Column<Boolean> column = columns.getColumnByIndex( 0 );
 
-                                    final MvccEntity parsedEntity =
-                                            new MvccColumnParser( entityId, entitySerializer ).parseColumn( column );
+                return entitySet;
+            } ).toBlocking().last();
 
 
-                                    entitySet.addEntity( parsedEntity );
-                                }
-
-
-                                return entitySet;
-                            }
-                        } ).toBlocking().last();
 
         return entitySetResults;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
index f87b5fd..6982857 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
@@ -124,130 +124,107 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
 
 
         final Observable<List<EntityToSaveMessage>> migrated =
-            migrationDataProvider.getData().subscribeOn( Schedulers.io() ).parallel(
-                new Func1<Observable<EntityIdScope>, Observable<List<EntityToSaveMessage>>>() {
+            migrationDataProvider.getData().subscribeOn( Schedulers.io() ).flatMap( entityToSaveList -> Observable.just( entityToSaveList ).flatMap( entityIdScope -> {
 
+                //load the entity
+                final CollectionScope currentScope = entityIdScope.getCollectionScope();
 
-                    //process the ids in parallel
-                    @Override
-                    public Observable<List<EntityToSaveMessage>> call(
-                        final Observable<EntityIdScope> entityIdScopeObservable ) {
-
-
-                        return entityIdScopeObservable.flatMap(
-                            new Func1<EntityIdScope, Observable<EntityToSaveMessage>>() {
-
-
-                                @Override
-                                public Observable<EntityToSaveMessage> call( final EntityIdScope entityIdScope ) {
 
-                                    //load the entity
-                                    final CollectionScope currentScope = entityIdScope.getCollectionScope();
+                //for each element in our
+                // history, we need to copy it
+                // to v2.
+                // Note that
+                // this migration
+                //won't support anything beyond V2
 
+                final Iterator<MvccEntity> allVersions =
+                    migration.from.loadAscendingHistory( currentScope, entityIdScope.getId(), startTime, 100 );
 
-                                    //for each element in our
-                                    // history, we need to copy it
-                                    // to v2.
-                                    // Note that
-                                    // this migration
-                                    //won't support anything beyond V2
-
-                                    final Iterator<MvccEntity> allVersions = migration.from
-                                        .loadAscendingHistory( currentScope, entityIdScope.getId(), startTime, 100 );
+                //emit all the entity versions
+                return Observable.create( new Observable.OnSubscribe<EntityToSaveMessage>() {
+                    @Override
+                    public void call( final Subscriber<? super
+                        EntityToSaveMessage> subscriber ) {
 
-                                    //emit all the entity versions
-                                    return Observable.create( new Observable.OnSubscribe<EntityToSaveMessage>() {
-                                            @Override
-                                            public void call( final Subscriber<? super
-                                                EntityToSaveMessage> subscriber ) {
+                        while ( allVersions.hasNext() ) {
+                            final EntityToSaveMessage message =
+                                new EntityToSaveMessage( currentScope, allVersions.next() );
+                            subscriber.onNext( message );
+                        }
 
-                                                while ( allVersions.hasNext() ) {
-                                                    final EntityToSaveMessage message =  new EntityToSaveMessage( currentScope, allVersions.next() );
-                                                    subscriber.onNext( message );
-                                                }
+                        subscriber.onCompleted();
+                    }
+                } ).buffer( 100 ).doOnNext( entities -> {
 
-                                                subscriber.onCompleted();
-                                            }
-                                        } );
-                                }
-                            } )
-                            //buffer 10 versions
-                            .buffer( 100 ).doOnNext( new Action1<List<EntityToSaveMessage>>() {
-                                @Override
-                                public void call( final List<EntityToSaveMessage> entities ) {
+                        final MutationBatch totalBatch = keyspace.prepareMutationBatch();
 
-                                    final MutationBatch totalBatch = keyspace.prepareMutationBatch();
+                        atomicLong.addAndGet( entities.size() );
 
-                                    atomicLong.addAndGet( entities.size() );
+                        List<EntityVersionCleanupTask> entityVersionCleanupTasks = new ArrayList( entities.size() );
 
-                                    List<EntityVersionCleanupTask> entityVersionCleanupTasks = new ArrayList(entities.size());
+                        for ( EntityToSaveMessage message : entities ) {
+                            final MutationBatch entityRewrite = migration.to.write( message.scope, message.entity );
 
-                                    for ( EntityToSaveMessage message : entities ) {
-                                        final MutationBatch entityRewrite =
-                                            migration.to.write( message.scope, message.entity );
+                            //add to
+                            // the
+                            // total
+                            // batch
+                            totalBatch.mergeShallow( entityRewrite );
 
-                                        //add to
-                                        // the
-                                        // total
-                                        // batch
-                                        totalBatch.mergeShallow( entityRewrite );
+                            //write
+                            // the
+                            // unique values
 
-                                        //write
-                                        // the
-                                        // unique values
+                            if ( !message.entity.getEntity().isPresent() ) {
+                                return;
+                            }
 
-                                        if ( !message.entity.getEntity().isPresent() ) {
-                                            return;
-                                        }
+                            final Entity entity = message.entity.getEntity().get();
 
-                                        final Entity entity = message.entity.getEntity().get();
+                            final Id entityId = entity.getId();
 
-                                        final Id entityId = entity.getId();
+                            final UUID version = message.entity.getVersion();
 
-                                        final UUID version = message.entity.getVersion();
+                            // re-write the unique
+                            // values
+                            // but this
+                            // time with
+                            // no TTL so that cleanup can clean up
+                            // older values
+                            for ( Field field : EntityUtils.getUniqueFields( message.entity.getEntity().get() ) ) {
 
-                                        // re-write the unique
-                                        // values
-                                        // but this
-                                        // time with
-                                        // no TTL so that cleanup can clean up
-                                        // older values
-                                        for ( Field field : EntityUtils
-                                            .getUniqueFields( message.entity.getEntity().get() ) ) {
+                                UniqueValue written = new UniqueValueImpl( field, entityId, version );
 
-                                            UniqueValue written = new UniqueValueImpl( field, entityId, version );
+                                MutationBatch mb = uniqueValueSerializationStrategy.write( message.scope, written );
 
-                                            MutationBatch mb =
-                                                uniqueValueSerializationStrategy.write( message.scope, written );
 
+                                // merge into our
+                                // existing mutation
+                                // batch
+                                totalBatch.mergeShallow( mb );
+                            }
 
-                                            // merge into our
-                                            // existing mutation
-                                            // batch
-                                            totalBatch.mergeShallow( mb );
-                                        }
+                            final EntityVersionCleanupTask task = entityVersionCleanupFactory
+                                .getCleanupTask( message.scope, message.entity.getId(), version, false );
 
-                                        final EntityVersionCleanupTask task = entityVersionCleanupFactory.getCleanupTask( message.scope, message.entity.getId(), version, false );
+                            entityVersionCleanupTasks.add( task );
+                        }
 
-                                        entityVersionCleanupTasks.add( task );
-                                    }
+                        executeBatch( migration.to.getImplementationVersion(), totalBatch, observer, atomicLong );
 
-                                    executeBatch( migration.to.getImplementationVersion(), totalBatch, observer, atomicLong );
+                        //now run our cleanup task
 
-                                    //now run our cleanup task
+                        for ( EntityVersionCleanupTask entityVersionCleanupTask : entityVersionCleanupTasks ) {
+                            try {
+                                entityVersionCleanupTask.call();
+                            }
+                            catch ( Exception e ) {
+                                LOGGER.error( "Unable to run cleanup task", e );
+                            }
+                        }
+                    } ).subscribeOn( Schedulers.io() );
 
-                                    for(EntityVersionCleanupTask entityVersionCleanupTask: entityVersionCleanupTasks){
-                                        try {
-                                            entityVersionCleanupTask.call();
-                                        }
-                                        catch ( Exception e ) {
-                                            LOGGER.error( "Unable to run cleanup task", e );
-                                        }
-                                    }
-                                }
-                            } );
-                    }
-                } );
+            }, 10) );
 
         migrated.toBlocking().lastOrDefault(null);
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
index 2d416a4..a49e533 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
@@ -64,7 +64,7 @@ public class ParallelTest {
         final int expected = size - 1;
 
 
-        // QUESTION Using this thread blocks indefinitely.  The execution of the Hystrix command 
+        // QUESTION Using this thread blocks indefinitely.  The execution of the Hystrix command
          // happens on the computation Thread if this is used
 
         //        final Scheduler scheduler = Schedulers.threadPoolForComputation();
@@ -90,7 +90,7 @@ public class ParallelTest {
          *  non blocking?
          */
 
-        final Observable<String> observable = Observable.from( input ).observeOn( Schedulers.io() );
+        final Observable<String> observable = Observable.just( input ).observeOn( Schedulers.io() );
 
 
         Observable<Integer> thing = observable.flatMap( new Func1<String, Observable<Integer>>() {
@@ -99,7 +99,7 @@ public class ParallelTest {
             public Observable<Integer> call( final String s ) {
                 List<Observable<Integer>> functions = new ArrayList<Observable<Integer>>();
 
-                logger.info( "Creating new set of observables in thread {}", 
+                logger.info( "Creating new set of observables in thread {}",
                         Thread.currentThread().getName() );
 
                 for ( int i = 0; i < size; i++ ) {
@@ -107,13 +107,13 @@ public class ParallelTest {
 
                     final int index = i;
 
-                    // create a new observable and execute the function on it.  
+                    // create a new observable and execute the function on it.
                     // These should happen in parallel when a subscription occurs
 
                     /**
                      * QUESTION: Should this again be the process thread, not the I/O
                      */
-                    Observable<String> newObservable = Observable.from( input ).subscribeOn( Schedulers.io() );
+                    Observable<String> newObservable = Observable.just( input ).subscribeOn( Schedulers.io() );
 
                     Observable<Integer> transformed = newObservable.map( new Func1<String, Integer>() {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java
index 747ea7b..9938caf 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/AbstractMvccEntityDataMigrationV1ToV3ImplTest.java
@@ -119,7 +119,7 @@ public abstract class AbstractMvccEntityDataMigrationV1ToV3ImplTest implements D
         assertEquals( "Same entity", entity2, returned2 );
 
         final Observable<EntityIdScope> entityIdScope =
-            Observable.from( new EntityIdScope( scope, entity1.getId() ), new EntityIdScope( scope, entity2.getId() ) );
+            Observable.just( new EntityIdScope( scope, entity1.getId() ), new EntityIdScope( scope, entity2.getId() ) );
 
 
         final MigrationDataProvider<EntityIdScope> migrationProvider = new MigrationDataProvider<EntityIdScope>() {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/common/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/pom.xml b/stack/corepersistence/common/pom.xml
index 82df1d8..2be1c1a 100644
--- a/stack/corepersistence/common/pom.xml
+++ b/stack/corepersistence/common/pom.xml
@@ -101,15 +101,16 @@
 
     <!-- RX java -->
 
+      <dependency>
+          <groupId>io.reactivex</groupId>
+          <artifactId>rxjava</artifactId>
+          <version>${rx.version}</version>
+      </dependency>
+
     <dependency>
-      <groupId>com.netflix.rxjava</groupId>
-      <artifactId>rxjava-core</artifactId>
-      <version>${rx.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>com.netflix.rxjava</groupId>
+      <groupId>io.reactivex</groupId>
       <artifactId>rxjava-math</artifactId>
-      <version>${rx.version}</version>
+      <version>1.0.0</version>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
index 15f9aab..23661ee 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
@@ -77,7 +77,9 @@ public class MultiKeyColumnNameIterator<C, T> implements Iterable<T>, Iterator<T
 
         for ( ColumnNameIterator<C, T> columnNameIterator : columnNameIterators ) {
 
-            observables[i] = Observable.from( columnNameIterator, Schedulers.io() );
+
+
+            observables[i] = Observable.from( columnNameIterator ).subscribeOn( Schedulers.io() );
 
             i++;
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
index f4f6f9c..3c56763 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
@@ -52,6 +52,7 @@ import com.netflix.astyanax.util.RangeBuilder;
 import rx.Observable;
 import rx.functions.Action1;
 import rx.functions.Func1;
+import rx.schedulers.Schedulers;
 
 import static org.junit.Assert.assertEquals;
 
@@ -125,100 +126,95 @@ public class MultiKeyColumnNameIteratorTest {
 
         final long maxValue = 10000;
 
+
         /**
          * Write to both rows in parallel
          */
         Observable.from( new String[] { rowKey1, rowKey2, rowKey3 } )
-                  .parallel( new Func1<Observable<String>, Observable<String>>() {
-                      @Override
-                      public Observable<String> call( final Observable<String> stringObservable ) {
-                          return stringObservable.doOnNext( new Action1<String>() {
-                              @Override
-                              public void call( final String key ) {
-
-                                  final MutationBatch batch = keyspace.prepareMutationBatch();
-
-                                  for ( long i = 0; i < maxValue; i++ ) {
-                                      batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE );
-
-                                      if ( i % 1000 == 0 ) {
-                                          try {
-                                              batch.execute();
-                                          }
-                                          catch ( ConnectionException e ) {
-                                              throw new RuntimeException( e );
-                                          }
-                                      }
-                                  }
-
-                                  try {
-                                      batch.execute();
-                                  }
-                                  catch ( ConnectionException e ) {
-                                      throw new RuntimeException( e );
-                                  }
+            //perform a flatmap
+                  .flatMap( stringObservable -> Observable.just( stringObservable ).doOnNext( key -> {
+                      final MutationBatch batch = keyspace.prepareMutationBatch();
+
+                      for ( long i = 0; i < maxValue; i++ ) {
+                          batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE );
+
+                          if ( i % 1000 == 0 ) {
+                              try {
+                                  batch.execute();
+                              }
+                              catch ( ConnectionException e ) {
+                                  throw new RuntimeException( e );
                               }
-                          } );
+                          }
                       }
-                  } ).toBlocking().last();
 
+                      try {
+                          batch.execute();
+                      }
+                      catch ( ConnectionException e ) {
+                          throw new RuntimeException( e );
+                      }
+                  } ).subscribeOn( Schedulers.io() ) ).toBlocking().last();
 
-        //create 3 iterators
 
-        ColumnNameIterator<Long, Long> row1Iterator = createIterator( rowKey1, false );
-        ColumnNameIterator<Long, Long> row2Iterator = createIterator( rowKey2, false );
-        ColumnNameIterator<Long, Long> row3Iterator = createIterator( rowKey3, false );
 
-        final Comparator<Long> ascendingComparator = new Comparator<Long>() {
 
-            @Override
-            public int compare( final Long o1, final Long o2 ) {
-                return Long.compare( o1, o2 );
-            }
-        };
+            //create 3 iterators
 
-        /**
-         * Again, arbitrary buffer size to attempt we buffer at some point
-         */
-        final MultiKeyColumnNameIterator<Long, Long> ascendingItr =
+            ColumnNameIterator<Long, Long> row1Iterator = createIterator( rowKey1, false );
+            ColumnNameIterator<Long, Long> row2Iterator = createIterator( rowKey2, false );
+            ColumnNameIterator<Long, Long> row3Iterator = createIterator( rowKey3, false );
+
+            final Comparator<Long> ascendingComparator = new Comparator<Long>() {
+
+                @Override
+                public int compare( final Long o1, final Long o2 ) {
+                    return Long.compare( o1, o2 );
+                }
+            };
+
+            /**
+             * Again, arbitrary buffer size to attempt we buffer at some point
+             */
+            final MultiKeyColumnNameIterator<Long, Long> ascendingItr =
                 new MultiKeyColumnNameIterator<>( Arrays.asList( row1Iterator, row2Iterator, row3Iterator ),
-                        ascendingComparator, 900 );
+                    ascendingComparator, 900 );
 
 
-        //ensure we have to make several trips, purposefully set to a nonsensical value to ensure we make all the
-        // trips required
+            //ensure we have to make several trips, purposefully set to a nonsensical value to ensure we make all the
+            // trips required
 
 
-        for ( long i = 0; i < maxValue; i++ ) {
-            assertEquals( i, ascendingItr.next().longValue() );
-        }
+            for ( long i = 0; i < maxValue; i++ ) {
+                assertEquals( i, ascendingItr.next().longValue() );
+            }
 
-        //now test it in reverse
+            //now test it in reverse
 
-        ColumnNameIterator<Long, Long> row1IteratorDesc = createIterator( rowKey1, true );
-        ColumnNameIterator<Long, Long> row2IteratorDesc = createIterator( rowKey2, true );
-        ColumnNameIterator<Long, Long> row3IteratorDesc = createIterator( rowKey3, true );
+            ColumnNameIterator<Long, Long> row1IteratorDesc = createIterator( rowKey1, true );
+            ColumnNameIterator<Long, Long> row2IteratorDesc = createIterator( rowKey2, true );
+            ColumnNameIterator<Long, Long> row3IteratorDesc = createIterator( rowKey3, true );
 
-        final Comparator<Long> descendingComparator = new Comparator<Long>() {
+            final Comparator<Long> descendingComparator = new Comparator<Long>() {
 
-            @Override
-            public int compare( final Long o1, final Long o2 ) {
-                return ascendingComparator.compare( o1, o2 ) * -1;
-            }
-        };
+                @Override
+                public int compare( final Long o1, final Long o2 ) {
+                    return ascendingComparator.compare( o1, o2 ) * -1;
+                }
+            };
 
-        /**
-         * Again, arbitrary buffer size to attempt we buffer at some point
-         */
-        final MultiKeyColumnNameIterator<Long, Long> descendingItr =
+            /**
+             * Again, arbitrary buffer size to attempt we buffer at some point
+             */
+            final MultiKeyColumnNameIterator<Long, Long> descendingItr =
                 new MultiKeyColumnNameIterator<>( Arrays.asList( row1IteratorDesc, row2IteratorDesc, row3IteratorDesc ),
-                        descendingComparator, 900 );
+                    descendingComparator, 900 );
 
 
-        for ( long i = maxValue - 1; i > -1; i-- ) {
-            assertEquals( i, descendingItr.next().longValue() );
+            for ( long i = maxValue - 1; i > -1; i-- ) {
+                assertEquals( i, descendingItr.next().longValue() );
+            }
         }
-    }
 
 
     @Test
@@ -233,39 +229,28 @@ public class MultiKeyColumnNameIteratorTest {
            /**
             * Write to both rows in parallel
             */
-           Observable.just( rowKey1  )
-                     .parallel( new Func1<Observable<String>, Observable<String>>() {
-                         @Override
-                         public Observable<String> call( final Observable<String> stringObservable ) {
-                             return stringObservable.doOnNext( new Action1<String>() {
-                                 @Override
-                                 public void call( final String key ) {
-
-                                     final MutationBatch batch = keyspace.prepareMutationBatch();
-
-                                     for ( long i = 0; i < maxValue; i++ ) {
-                                         batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE );
-
-                                         if ( i % 1000 == 0 ) {
-                                             try {
-                                                 batch.execute();
-                                             }
-                                             catch ( ConnectionException e ) {
-                                                 throw new RuntimeException( e );
-                                             }
-                                         }
-                                     }
-
-                                     try {
-                                         batch.execute();
-                                     }
-                                     catch ( ConnectionException e ) {
-                                         throw new RuntimeException( e );
-                                     }
-                                 }
-                             } );
-                         }
-                     } ).toBlocking().last();
+           Observable.just( rowKey1  ).flatMap( rowKey -> Observable.just( rowKey ).doOnNext( key -> {
+               final MutationBatch batch = keyspace.prepareMutationBatch();
+
+               for ( long i = 0; i < maxValue; i++ ) {
+                   batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE );
+
+                   if ( i % 1000 == 0 ) {
+                       try {
+                           batch.execute();
+                       }
+                       catch ( ConnectionException e ) {
+                           throw new RuntimeException( e );
+                       }
+                   }
+               }
+
+               try {
+                   batch.execute();
+               }
+               catch ( ConnectionException e ) {
+                   throw new RuntimeException( e );
+               }} ).subscribeOn( Schedulers.io() ) ).toBlocking().last();
 
 
            //create 3 iterators

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
index c32b820..d88ebe5 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
@@ -54,6 +54,7 @@ import rx.Observable;
 import rx.Observer;
 import rx.functions.Action1;
 import rx.functions.Func1;
+import rx.schedulers.Schedulers;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -373,38 +374,29 @@ public class MultiRowColumnIteratorTest {
         /**
          * Write to both rows in parallel
          */
-        Observable.just( rowKey1 ).parallel( new Func1<Observable<String>, Observable<String>>() {
-            @Override
-            public Observable<String> call( final Observable<String> stringObservable ) {
-                return stringObservable.doOnNext( new Action1<String>() {
-                    @Override
-                    public void call( final String key ) {
-
-                        final MutationBatch batch = keyspace.prepareMutationBatch();
-
-                        for ( long i = 0; i < maxValue; i++ ) {
-                            batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE );
-
-                            if ( i % 1000 == 0 ) {
-                                try {
-                                    batch.execute();
-                                }
-                                catch ( ConnectionException e ) {
-                                    throw new RuntimeException( e );
-                                }
-                            }
-                        }
+        Observable.just( rowKey1 ).flatMap( rowKey -> Observable.just( rowKey ).doOnNext( key -> {
+            final MutationBatch batch = keyspace.prepareMutationBatch();
 
-                        try {
-                            batch.execute();
-                        }
-                        catch ( ConnectionException e ) {
-                            throw new RuntimeException( e );
-                        }
+            for ( long i = 0; i < maxValue; i++ ) {
+                batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE );
+
+                if ( i % 1000 == 0 ) {
+                    try {
+                        batch.execute();
+                    }
+                    catch ( ConnectionException e ) {
+                        throw new RuntimeException( e );
                     }
-                } );
+                }
+            }
+
+            try {
+                batch.execute();
+            }
+            catch ( ConnectionException e ) {
+                throw new RuntimeException( e );
             }
-        } ).toBlocking().last();
+        } ).subscribeOn( Schedulers.io() ) ).toBlocking().last();
 
 
         //create 3 iterators


Mime
View raw message