usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From toddn...@apache.org
Subject incubator-usergrid git commit: Refactored iterator to use a cursor and invoke service with each page to perform seeking. Otherwise the subscription will run infinitely.
Date Tue, 14 Jul 2015 00:09:09 GMT
Repository: incubator-usergrid
Updated Branches:
  refs/heads/observable-query-fix cc80ba926 -> 208332da9 (forced update)


Refactored iterator to use a cursor and invoke service with each page to perform seeking.  Otherwise the subscription will run infinitely.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/208332da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/208332da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/208332da

Branch: refs/heads/observable-query-fix
Commit: 208332da978e01d04cef501f906e4b575ccdd2ef
Parents: 48689eb
Author: Todd Nine <tnine@apigee.com>
Authored: Mon Jul 13 17:13:27 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Mon Jul 13 18:08:58 2015 -0600

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    |  15 +-
 .../corepersistence/CpEntityManager.java        |  19 +-
 .../corepersistence/CpEntityManagerFactory.java |  13 +-
 .../corepersistence/CpRelationManager.java      | 217 +++++++++----------
 .../results/ConnectionRefQueryExecutor.java     |   8 +-
 .../results/EntityQueryExecutor.java            |  11 +-
 .../results/ObservableQueryExecutor.java        |  71 ++++--
 .../service/CollectionSearch.java               |  87 ++++++++
 .../service/CollectionService.java              |  38 ++++
 .../service/CollectionServiceImpl.java          |  76 +++++++
 .../service/ConnectionSearch.java               |  87 ++++++++
 .../service/ConnectionService.java              |  49 +++++
 .../service/ConnectionServiceImpl.java          | 120 ++++++++++
 .../persistence/core/astyanax/CassandraFig.java |   2 +-
 .../rx/ObservableToBlockingIteratorFactory.java | 125 -----------
 .../persistence/core/rx/OrderedMergeTest.java   |  68 ++----
 16 files changed, 673 insertions(+), 333 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/208332da/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index dd0e1ab..d31099b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -34,6 +34,10 @@ import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemImpl;
 import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
 import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservableImpl;
 import org.apache.usergrid.corepersistence.rx.impl.AllNodesInGraphImpl;
+import org.apache.usergrid.corepersistence.service.CollectionService;
+import org.apache.usergrid.corepersistence.service.CollectionServiceImpl;
+import org.apache.usergrid.corepersistence.service.ConnectionService;
+import org.apache.usergrid.corepersistence.service.ConnectionServiceImpl;
 import org.apache.usergrid.persistence.collection.guice.CollectionModule;
 import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.guice.CommonModule;
@@ -65,9 +69,6 @@ public class CoreModule  extends AbstractModule {
     protected void configure() {
 
 
-//        //See TODO, this is fugly
-//        bind(EntityManagerFactory.class).toProvider( lazyEntityManagerFactoryProvider );
-
         install( new CommonModule());
         install( new CollectionModule() {
             /**
@@ -156,6 +157,14 @@ public class CoreModule  extends AbstractModule {
         //install our pipeline modules
         install(new PipelineModule());
 
+        /**
+         * Install our service operations
+         */
+
+        bind( CollectionService.class).to( CollectionServiceImpl.class );
+
+        bind( ConnectionService.class).to( ConnectionServiceImpl.class);
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/208332da/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 6184616..b530715 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -36,6 +36,8 @@ import java.util.UUID;
 
 import com.google.common.base.Optional;
 import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
+import org.apache.usergrid.corepersistence.service.CollectionService;
+import org.apache.usergrid.corepersistence.service.ConnectionService;
 import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.IndexLocationStrategy;
 import org.apache.usergrid.persistence.index.IndexRefreshCommand;
@@ -194,7 +196,9 @@ public class CpEntityManager implements EntityManager {
 
     private final AsyncEventService indexService;
 
-    private final PipelineBuilderFactory pipelineBuilderFactory;
+    private final CollectionService collectionService;
+    private final ConnectionService connectionService;
+
 
     private final GraphManagerFactory graphManagerFactory;
 
@@ -239,8 +243,9 @@ public class CpEntityManager implements EntityManager {
     public CpEntityManager( final CassandraService cass, final CounterUtils counterUtils, final AsyncEventService indexService, final ManagerCache managerCache,
                             final MetricsFactory metricsFactory,
                             final EntityManagerFig entityManagerFig,
-                            final PipelineBuilderFactory pipelineBuilderFactory ,
                             final GraphManagerFactory graphManagerFactory,
+                            final CollectionService collectionService,
+                            final ConnectionService connectionService,
                             final UUID applicationId ) {
 
         this.entityManagerFig = entityManagerFig;
@@ -250,10 +255,14 @@ public class CpEntityManager implements EntityManager {
         Preconditions.checkNotNull( managerCache, "managerCache must not be null" );
         Preconditions.checkNotNull( applicationId, "applicationId must not be null" );
         Preconditions.checkNotNull( indexService, "indexService must not be null" );
-        Preconditions.checkNotNull( pipelineBuilderFactory, "pipelineBuilderFactory must not be null" );
+
         Preconditions.checkNotNull( graphManagerFactory, "graphManagerFactory must not be null" );
-        this.pipelineBuilderFactory = pipelineBuilderFactory;
+        Preconditions.checkNotNull( connectionService, "connectionService must not be null" );
+        Preconditions.checkNotNull( collectionService, "collectionService must not be null" );
+
         this.graphManagerFactory = graphManagerFactory;
+        this.connectionService = connectionService;
+        this.collectionService = collectionService;
 
 
 
@@ -778,7 +787,7 @@ public class CpEntityManager implements EntityManager {
         Preconditions.checkNotNull( entityRef, "entityRef cannot be null" );
 
         CpRelationManager relationManager =
-            new CpRelationManager( managerCache, pipelineBuilderFactory, indexService, this, entityManagerFig, applicationId, entityRef );
+            new CpRelationManager( managerCache, indexService, collectionService, connectionService,  this, entityManagerFig, applicationId, entityRef );
         return relationManager;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/208332da/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 048c558..a679407 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -34,6 +34,8 @@ import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
 import org.apache.usergrid.corepersistence.index.ReIndexService;
 import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory;
+import org.apache.usergrid.corepersistence.service.CollectionService;
+import org.apache.usergrid.corepersistence.service.ConnectionService;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.exception.ConflictException;
 import org.apache.usergrid.persistence.AbstractEntity;
@@ -118,7 +120,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
     private final ReIndexService reIndexService;
     private final MetricsFactory metricsFactory;
     private final AsyncEventService indexService;
-    private final PipelineBuilderFactory pipelineBuilderFactory;
+    private final CollectionService collectionService;
+    private final ConnectionService connectionService;
     private final GraphManagerFactory graphManagerFactory;
 
     public CpEntityManagerFactory( final CassandraService cassandraService, final CounterUtils counterUtils,
@@ -132,11 +135,15 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         this.managerCache = injector.getInstance( ManagerCache.class );
         this.metricsFactory = injector.getInstance( MetricsFactory.class );
         this.indexService = injector.getInstance( AsyncEventService.class );
-        this.pipelineBuilderFactory = injector.getInstance( PipelineBuilderFactory.class );
         this.graphManagerFactory = injector.getInstance( GraphManagerFactory.class );
+        this.collectionService = injector.getInstance( CollectionService.class );
+        this.connectionService = injector.getInstance( ConnectionService.class );
+
+        //this line always needs to be last due to the temporary cicular dependency until spring is removed
         this.applicationIdCache = injector.getInstance(ApplicationIdCacheFactory.class).getInstance(
             getManagementEntityManager() );
 
+
     }
 
 
@@ -192,7 +199,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
 
     private EntityManager _getEntityManager( UUID applicationId ) {
         EntityManager em = new CpEntityManager(cassandraService, counterUtils, indexService, managerCache,
-            metricsFactory, entityManagerFig, pipelineBuilderFactory, graphManagerFactory, applicationId );
+            metricsFactory, entityManagerFig, graphManagerFactory,  collectionService, connectionService, applicationId );
 
         return em;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/208332da/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 3df09e6..d92b97f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -26,18 +26,19 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
-import org.apache.usergrid.persistence.index.EntityIndex;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
-import org.apache.usergrid.corepersistence.pipeline.builder.EntityBuilder;
-import org.apache.usergrid.corepersistence.pipeline.builder.IdBuilder;
 import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory;
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
 import org.apache.usergrid.corepersistence.results.ConnectionRefQueryExecutor;
 import org.apache.usergrid.corepersistence.results.EntityQueryExecutor;
+import org.apache.usergrid.corepersistence.service.CollectionSearch;
+import org.apache.usergrid.corepersistence.service.CollectionService;
+import org.apache.usergrid.corepersistence.service.ConnectionSearch;
+import org.apache.usergrid.corepersistence.service.ConnectionService;
 import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.ConnectedEntityRef;
@@ -62,11 +63,10 @@ import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.SearchByEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
+import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.IndexEdge;
 import org.apache.usergrid.persistence.index.SearchEdge;
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -85,8 +85,6 @@ import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createColle
 import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionSearchEdge;
 import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionEdge;
 import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionSearchByEdge;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createSearchEdgeFromSource;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource;
 import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getNameFromEdgeType;
 import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
 import static org.apache.usergrid.persistence.Schema.PROPERTY_INACTIVITY;
@@ -123,13 +121,14 @@ public class CpRelationManager implements RelationManager {
     private final AsyncEventService indexService;
 
 
-    private final PipelineBuilderFactory pipelineBuilderFactory;
-
+    private final CollectionService collectionService;
+    private final ConnectionService connectionService;
 
 
-    public CpRelationManager(  final ManagerCache managerCache,
-                              final PipelineBuilderFactory pipelineBuilderFactory, final AsyncEventService indexService,
-                              final EntityManager em, final EntityManagerFig entityManagerFig, final UUID applicationId,
+    public CpRelationManager( final ManagerCache managerCache,
+                              final AsyncEventService indexService, final CollectionService collectionService,
+                              final ConnectionService connectionService, final EntityManager em,
+                              final EntityManagerFig entityManagerFig, final UUID applicationId,
                               final EntityRef headEntity ) {
 
 
@@ -138,6 +137,9 @@ public class CpRelationManager implements RelationManager {
         Assert.notNull( headEntity, "Head entity cannot be null" );
         Assert.notNull( headEntity.getUuid(), "Head entity uuid cannot be null" );
         Assert.notNull( indexService, "indexService cannot be null" );
+        Assert.notNull( collectionService, "collectionService cannot be null" );
+        Assert.notNull( connectionService, "connectionService cannot be null" );
+
         this.entityManagerFig = entityManagerFig;
 
         // TODO: this assert should not be failing
@@ -148,7 +150,8 @@ public class CpRelationManager implements RelationManager {
         this.managerCache = managerCache;
         this.applicationScope = CpNamingUtils.getApplicationScope( applicationId );
 
-        this.pipelineBuilderFactory = pipelineBuilderFactory;
+        this.collectionService = collectionService;
+        this.connectionService = connectionService;
 
         if ( logger.isDebugEnabled() ) {
             logger.debug( "Loading head entity {}:{} from app {}", new Object[] {
@@ -266,9 +269,9 @@ public class CpRelationManager implements RelationManager {
         } );
 
         GraphManager gm = managerCache.getGraphManager( applicationScope );
-        Observable<Edge> edges = gm.loadEdgeVersions(
-           CpNamingUtils.createEdgeFromConnectionType(new SimpleId(headEntity.getUuid(), headEntity.getType()), connectionType, entityId)
-        );
+        Observable<Edge> edges = gm.loadEdgeVersions( CpNamingUtils
+            .createEdgeFromConnectionType( new SimpleId( headEntity.getUuid(), headEntity.getType() ), connectionType,
+                entityId ) );
 
         return edges.toBlocking().firstOrDefault( null ) != null;
     }
@@ -286,9 +289,9 @@ public class CpRelationManager implements RelationManager {
         } );
 
         GraphManager gm = managerCache.getGraphManager( applicationScope );
-        Observable<Edge> edges = gm.loadEdgeVersions(
-            CpNamingUtils.createEdgeFromCollectionName(new SimpleId(headEntity.getUuid(), headEntity.getType()), collectionName, entityId)
-         );
+        Observable<Edge> edges = gm.loadEdgeVersions( CpNamingUtils
+            .createEdgeFromCollectionName( new SimpleId( headEntity.getUuid(), headEntity.getType() ), collectionName,
+                entityId ) );
 
         return edges.toBlocking().firstOrDefault( null ) != null;
     }
@@ -346,15 +349,16 @@ public class CpRelationManager implements RelationManager {
     @Override
     public Entity addToCollection( String collectionName, EntityRef itemRef ) throws Exception {
 
-        Preconditions.checkNotNull(itemRef,"itemref is null");
+        Preconditions.checkNotNull( itemRef, "itemref is null" );
         CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
-        if ( ( collection != null && collection.getType()!=null ) && !collection.getType().equals( itemRef.getType() ) ) {
+        if ( ( collection != null && collection.getType() != null ) && !collection.getType()
+                                                                                  .equals( itemRef.getType() ) ) {
             return null;
         }
 
 
         Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() );
-        org.apache.usergrid.persistence.model.entity.Entity memberEntity = ( ( CpEntityManager ) em ).load(entityId);
+        org.apache.usergrid.persistence.model.entity.Entity memberEntity = ( ( CpEntityManager ) em ).load( entityId );
 
 
         // don't fetch entity if we've already got one
@@ -371,7 +375,6 @@ public class CpRelationManager implements RelationManager {
         }
 
 
-
         if ( memberEntity == null ) {
             throw new RuntimeException(
                 "Unable to load entity uuid=" + itemRef.getUuid() + " type=" + itemRef.getType() );
@@ -388,33 +391,28 @@ public class CpRelationManager implements RelationManager {
         final Edge edge = createCollectionEdge( cpHeadEntity.getId(), collectionName, memberEntity.getId() );
         final String linkedCollection = collection.getLinkedCollection();
 
-        GraphManager gm = managerCache.getGraphManager(applicationScope);
+        GraphManager gm = managerCache.getGraphManager( applicationScope );
 
-        gm.writeEdge( edge )
-            .doOnNext( writtenEdge -> {
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Wrote edge {}", writtenEdge);
-                }
-            })
-            .filter(writtenEdge -> linkedCollection != null )
-            .flatMap(writtenEdge -> {
-                final String pluralType = InflectionUtils.pluralize( cpHeadEntity.getId().getType() );
-                final Edge reverseEdge = createCollectionEdge( memberEntity.getId(), pluralType, cpHeadEntity.getId() );
-
-                //reverse
-                return gm.writeEdge(reverseEdge).doOnNext(reverseEdgeWritten -> {
-                    indexService.queueNewEdge(applicationScope, cpHeadEntity, reverseEdge);
-                });
-            })
-            .doOnCompleted(() -> {
-                indexService.queueNewEdge(applicationScope, memberEntity, edge);
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Added entity {}:{} to collection {}", new Object[]{
-                        itemRef.getUuid().toString(), itemRef.getType(), collectionName
-                    });
-                }
-            })
-            .toBlocking().lastOrDefault( null );
+        gm.writeEdge( edge ).doOnNext( writtenEdge -> {
+            if ( logger.isDebugEnabled() ) {
+                logger.debug( "Wrote edge {}", writtenEdge );
+            }
+        } ).filter( writtenEdge -> linkedCollection != null ).flatMap( writtenEdge -> {
+            final String pluralType = InflectionUtils.pluralize( cpHeadEntity.getId().getType() );
+            final Edge reverseEdge = createCollectionEdge( memberEntity.getId(), pluralType, cpHeadEntity.getId() );
+
+            //reverse
+            return gm.writeEdge( reverseEdge ).doOnNext( reverseEdgeWritten -> {
+                indexService.queueNewEdge( applicationScope, cpHeadEntity, reverseEdge );
+            } );
+        } ).doOnCompleted( () -> {
+            indexService.queueNewEdge( applicationScope, memberEntity, edge );
+            if ( logger.isDebugEnabled() ) {
+                logger.debug( "Added entity {}:{} to collection {}", new Object[] {
+                    itemRef.getUuid().toString(), itemRef.getType(), collectionName
+                } );
+            }
+        } ).toBlocking().lastOrDefault( null );
 
         //check if we need to reverse our edges
 
@@ -430,7 +428,6 @@ public class CpRelationManager implements RelationManager {
     }
 
 
-
     @Override
     public Entity createItemInCollection( String collectionName, String itemType, Map<String, Object> properties )
         throws Exception {
@@ -521,11 +518,9 @@ public class CpRelationManager implements RelationManager {
 
         //run our delete
         gm.loadEdgeVersions(
-               CpNamingUtils.createEdgeFromCollectionName(cpHeadEntity.getId(), collectionName, memberEntity.getId())
-            )
-            .flatMap(edge -> gm.markEdge(edge))
-            .flatMap(edge -> gm.deleteEdge(edge))
-            .toBlocking().lastOrDefault(null);
+            CpNamingUtils.createEdgeFromCollectionName( cpHeadEntity.getId(), collectionName, memberEntity.getId() ) )
+          .flatMap( edge -> gm.markEdge( edge ) ).flatMap( edge -> gm.deleteEdge( edge ) ).toBlocking()
+          .lastOrDefault( null );
 
 
         /**
@@ -620,26 +615,23 @@ public class CpRelationManager implements RelationManager {
 
 
         query.setEntityType( collection.getType() );
-        query = adjustQuery( query );
-
-
-        final IdBuilder pipelineBuilder =
-            pipelineBuilderFactory.create( applicationScope ).withCursor( query.getCursor() )
-                                  .withLimit( query.getLimit() ).fromId( cpHeadEntity.getId() );
+        final Query toExecute = adjustQuery( query );
+        final Optional<String> queryString = query.isGraphSearch()? Optional.<String>absent(): query.getQl();
+        final Id ownerId = headEntity.asId();
 
+        //wire the callback so we can get each page
+        return new EntityQueryExecutor( toExecute.getCursor() ) {
+            @Override
+            protected Observable<ResultsPage<org.apache.usergrid.persistence.model.entity.Entity>> buildNewResultsPage(
+                final Optional<String> cursor ) {
 
-        final EntityBuilder results;
-
-        if ( query.isGraphSearch() ) {
-            results = pipelineBuilder.traverseCollection( collectionName ).loadEntities();
-        }
-        else {
-            final String entityType = collection.getType();
-            results = pipelineBuilder.searchCollection( collectionName, query.getQl().get() ,  entityType).loadEntities();
-        }
-
+                final CollectionSearch search =
+                    new CollectionSearch( applicationScope, ownerId, collectionName, collection.getType(), toExecute.getLimit(),
+                        queryString, cursor );
 
-        return new EntityQueryExecutor( results.build() ).next();
+                return collectionService.searchCollection( search );
+            }
+        }.next();
     }
 
 
@@ -658,10 +650,13 @@ public class CpRelationManager implements RelationManager {
             if ( found ) {
                 break;
             }
-            Thread.sleep(sleepTime);
-        }while (!found && length <= maxLength);
-        if(logger.isInfoEnabled()){
-            logger.info(String.format("Consistent Search finished in %s,  results=%s, expected=%s...dumping stack",length, results.size(),expectedResults));
+            Thread.sleep( sleepTime );
+        }
+        while ( !found && length <= maxLength );
+        if ( logger.isInfoEnabled() ) {
+            logger.info( String
+                .format( "Consistent Search finished in %s,  results=%s, expected=%s...dumping stack", length,
+                    results.size(), expectedResults ) );
         }
         return results;
     }
@@ -692,7 +687,8 @@ public class CpRelationManager implements RelationManager {
         }
 
         final Id entityId = new SimpleId( connectedEntityRef.getUuid(), connectedEntityRef.getType() );
-        final org.apache.usergrid.persistence.model.entity.Entity targetEntity = ( ( CpEntityManager ) em ).load( entityId );
+        final org.apache.usergrid.persistence.model.entity.Entity targetEntity =
+            ( ( CpEntityManager ) em ).load( entityId );
 
         // create graph edge connection from head entity to member entity
         final Edge edge = createConnectionEdge( cpHeadEntity.getId(), connectionType, targetEntity.getId() );
@@ -778,9 +774,9 @@ public class CpRelationManager implements RelationManager {
         final SearchByEdge search = createConnectionSearchByEdge( sourceId, connectionType, targetEntity.getId() );
 
         //delete all the edges and queue their processing
-        gm.loadEdgeVersions( search ).flatMap( returnedEdge -> gm.markEdge( returnedEdge ) ).doOnNext(
-                returnedEdge -> indexService.queueDeleteEdge( applicationScope, returnedEdge ) ).toBlocking()
-              .lastOrDefault( null );
+        gm.loadEdgeVersions( search ).flatMap( returnedEdge -> gm.markEdge( returnedEdge ) )
+          .doOnNext( returnedEdge -> indexService.queueDeleteEdge( applicationScope, returnedEdge ) ).toBlocking()
+          .lastOrDefault( null );
     }
 
 
@@ -880,9 +876,9 @@ public class CpRelationManager implements RelationManager {
         headEntity = em.validate( headEntity );
 
 
-        query = adjustQuery( query );
+        final Query toExecute = adjustQuery( query );
 
-        final Optional<String> entityType = Optional.fromNullable(query.getEntityType()) ;
+        final Optional<String> entityType = Optional.fromNullable( query.getEntityType() );
         //set startid -- graph | es query filter -- load entities filter (verifies exists) --> results page collector
         // -> 1.0 results
 
@@ -893,53 +889,42 @@ public class CpRelationManager implements RelationManager {
         //startid -- eq query candiddate -- candidate id verify --> filter to connection ref --> connection ref
         // collector
 
+        final Id sourceId = headEntity.asId();
 
-        final IdBuilder
-            pipelineBuilder = pipelineBuilderFactory.create( applicationScope ).withCursor( query.getCursor() ).withLimit( query.getLimit() ).fromId(
-            cpHeadEntity.getId() );
+        final Optional<String> queryString = query.isGraphSearch()? Optional.<String>absent(): query.getQl();
 
 
         if ( query.getResultsLevel() == Level.REFS || query.getResultsLevel() == Level.IDS ) {
 
-            final IdBuilder traversedIds;
-
-            if ( query.isGraphSearch() ) {
-                traversedIds = pipelineBuilder.traverseConnection( connection, entityType );
-            }
-            else {
-                traversedIds =
-                    pipelineBuilder.searchConnection( connection, query.getQl().get(), entityType ).loadIds();
-            }
-
-            //create connection refs
-
-            final Observable<ResultsPage<ConnectionRef>> results =
-                traversedIds.loadConnectionRefs( cpHeadEntity.getId(), connection ).build();
-
-            return new ConnectionRefQueryExecutor( results ).next();
-        }
-
-
-
-
-        //we want to load all entities
-
-        final Observable<ResultsPage<org.apache.usergrid.persistence.model.entity.Entity>> results;
 
+            return new ConnectionRefQueryExecutor( toExecute.getCursor() ) {
+                @Override
+                protected Observable<ResultsPage<ConnectionRef>> buildNewResultsPage( final Optional<String> cursor ) {
 
-        if ( query.isGraphSearch() ) {
-            results = pipelineBuilder.traverseConnection( connection, entityType ).loadEntities().build();
-        }
 
-        else {
+                //we need the callback so as we get a new cursor, we execute a new search and re-initialize our builders
 
-            results = pipelineBuilder.searchConnection( connection,  query.getQl().get() , entityType).loadEntities().build();
+                    final ConnectionSearch search =
+                        new ConnectionSearch( applicationScope, sourceId, entityType, connection, toExecute.getLimit(),
+                            queryString, cursor );
+                    return connectionService.searchConnectionAsRefs( search );
+                }
+            }.next();
         }
 
 
+        return new EntityQueryExecutor( toExecute.getCursor() ) {
+            @Override
+            protected Observable<ResultsPage<org.apache.usergrid.persistence.model.entity.Entity>> buildNewResultsPage(
+                final Optional<String> cursor ) {
 
-
-        return new EntityQueryExecutor( results ).next();
+                //we need the callback so as we get a new cursor, we execute a new search and re-initialize our builders
+                final ConnectionSearch search =
+                    new ConnectionSearch( applicationScope, sourceId, entityType, connection, toExecute.getLimit(),
+                        queryString, cursor );
+                return connectionService.searchConnection( search );
+            }
+        }.next();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/208332da/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java
index 3dfd98a..cd66dad 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefQueryExecutor.java
@@ -32,6 +32,8 @@ import org.apache.usergrid.persistence.Results;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.google.common.base.Optional;
+
 import rx.Observable;
 
 
@@ -39,11 +41,11 @@ import rx.Observable;
  * Processes our results of connection refs
  */
 @Deprecated//Required for 1.0 compatibility
-public class ConnectionRefQueryExecutor extends ObservableQueryExecutor<ConnectionRef> {
+public abstract class ConnectionRefQueryExecutor extends ObservableQueryExecutor<ConnectionRef> {
 
 
-    public ConnectionRefQueryExecutor( final Observable<ResultsPage<ConnectionRef>> resultsObservable ) {
-        super( resultsObservable );
+    protected ConnectionRefQueryExecutor( final Optional<String> startCursor ) {
+        super( startCursor );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/208332da/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java
index 0e18e31..5e80d24 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityQueryExecutor.java
@@ -31,6 +31,8 @@ import org.apache.usergrid.persistence.Results;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.google.common.base.Optional;
+
 import rx.Observable;
 
 
@@ -38,11 +40,11 @@ import rx.Observable;
  * Processes our results of entities
  */
 @Deprecated//Required for 1.0 compatibility
-public class EntityQueryExecutor extends ObservableQueryExecutor<Entity> {
+public abstract class EntityQueryExecutor extends ObservableQueryExecutor<Entity> {
 
 
-    public EntityQueryExecutor( final Observable<ResultsPage<Entity>> resultsObservable ) {
-        super( resultsObservable );
+    protected EntityQueryExecutor( final Optional<String> startCursor ) {
+        super( startCursor );
     }
 
 
@@ -81,4 +83,7 @@ public class EntityQueryExecutor extends ObservableQueryExecutor<Entity> {
 
         return entity;
     }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/208332da/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
index 548e584..7b31d19 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
@@ -21,21 +21,14 @@ package org.apache.usergrid.corepersistence.results;
 
 
 import java.util.Iterator;
-import java.util.Map;
 import java.util.NoSuchElementException;
 
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
-import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
-import org.apache.usergrid.persistence.EntityFactory;
 import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.core.rx.ObservableToBlockingIteratorFactory;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Optional;
 
 import rx.Observable;
-import rx.schedulers.Schedulers;
 
 
 /**
@@ -45,19 +38,19 @@ import rx.schedulers.Schedulers;
 @Deprecated//Required for 1.0 compatibility
 public abstract class ObservableQueryExecutor<T> implements QueryExecutor {
 
-    private final Observable<Results> resultsObservable;
 
-    public Iterator<Results> iterator;
 
+    private Results results;
+    private Optional<String> cursor;
+    private boolean complete;
 
-    public ObservableQueryExecutor( final Observable<ResultsPage<T>> resultsObservable ) {
-        //map to our old results objects, return a default empty if required
-        this.resultsObservable = resultsObservable.map( resultsPage -> createResultsInternal( resultsPage ) )
-                                                  .defaultIfEmpty(new Results())
-            .subscribeOn(Schedulers.io());
+    protected ObservableQueryExecutor(final Optional<String> startCursor){
+        this.cursor = startCursor;
     }
 
 
+
+
     /**
      * Transform the results
      * @param resultsPage
@@ -66,6 +59,14 @@ public abstract class ObservableQueryExecutor<T> implements QueryExecutor {
     protected abstract Results createResults( final ResultsPage<T> resultsPage );
 
 
+    /**
+     * Build new results page from the cursor
+     * @param cursor
+     * @return
+     */
+    protected abstract Observable<ResultsPage<T>> buildNewResultsPage(final Optional<String>  cursor);
+
+
 
     /**
      * Legacy to transform our results page to a new results
@@ -90,6 +91,8 @@ public abstract class ObservableQueryExecutor<T> implements QueryExecutor {
 
 
 
+
+
     @Override
     public Iterator<Results> iterator() {
         return this;
@@ -99,13 +102,11 @@ public abstract class ObservableQueryExecutor<T> implements QueryExecutor {
     @Override
     public boolean hasNext() {
 
-        if ( iterator == null ) {
-            iterator = ObservableToBlockingIteratorFactory.toIterator( resultsObservable );
+        if ( !complete && results == null) {
+            advance();
         }
 
-        boolean hasNext = iterator.hasNext();
-
-        return hasNext;
+        return results != null;
     }
 
 
@@ -114,16 +115,38 @@ public abstract class ObservableQueryExecutor<T> implements QueryExecutor {
         if ( !hasNext() ) {
             throw new NoSuchElementException( "No more results present" );
         }
-        final Results next = iterator.next();
+
+        final Results next = results;
+
+        results = null;
 
         next.setQueryExecutor( this );
 
         return next;
     }
 
-    @Override
-    protected void finalize() throws Throwable {
-        resultsObservable.unsubscribeOn(Schedulers.io());
-        super.finalize();
+    private void advance(){
+         //map to our old results objects, return a default empty if required
+        final Observable<Results>
+            observable = buildNewResultsPage( cursor ).map( resultsPage -> createResultsInternal( resultsPage ) ).defaultIfEmpty(
+            new Results() );
+
+        //take the first from our observable
+        final Results resultsPage = observable.take(1).toBlocking().first();
+
+        //set the results for the iterator
+        this.results = resultsPage;
+
+        //set the complete flag
+        this.complete =  !resultsPage.hasCursor();
+
+        //if not comlete, set our cursor for the next iteration
+        if(!complete){
+            this.cursor = Optional.of( results.getCursor());
+        }else{
+            this.cursor = Optional.absent();
+        }
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/208332da/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java
new file mode 100644
index 0000000..ab8a8bc
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.service;
+
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * Bean for input on searching a collection
+ */
+public class CollectionSearch {
+
+    private final ApplicationScope applicationScope;
+    private final Id collectionOwnerId;
+    private final String collectionName;
+    private final String entityType;
+    private final int limit;
+    private final Optional<String> query;
+    private final Optional<String> cursor;
+
+
+    public CollectionSearch( final ApplicationScope applicationScope, final Id collectionOwnerId, final String
+        collectionName,
+                             final String entityType, final int limit, final Optional<String> query, final Optional<String> cursor ) {
+        this.applicationScope = applicationScope;
+        this.collectionOwnerId = collectionOwnerId;
+        this.collectionName = collectionName;
+        this.entityType = entityType;
+        this.limit = limit;
+        this.query = query;
+        this.cursor = cursor;
+    }
+
+
+    public ApplicationScope getApplicationScope() {
+        return applicationScope;
+    }
+
+
+    public String getCollectionName() {
+        return collectionName;
+    }
+
+
+    public Optional<String> getCursor() {
+        return cursor;
+    }
+
+
+    public Optional<String> getQuery() {
+        return query;
+    }
+
+
+    public int getLimit() {
+        return limit;
+    }
+
+
+    public String getEntityType() {
+        return entityType;
+    }
+
+
+    public Id getCollectionOwnerId() {
+        return collectionOwnerId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/208332da/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java
new file mode 100644
index 0000000..eef741a
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionService.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.service;
+
+
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import rx.Observable;
+
+
+/**
+ * Interface for operating on collections
+ */
+public interface CollectionService {
+
+    /**
+     * Search a collection and return an observable of results pages
+     * @param search The search to perform
+     * @return An observable with results page entries for the stream
+     */
+    Observable<ResultsPage<Entity>> searchCollection(final CollectionSearch search);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/208332da/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java
new file mode 100644
index 0000000..fa79d09
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.service;
+
+
+import org.apache.usergrid.corepersistence.pipeline.builder.EntityBuilder;
+import org.apache.usergrid.corepersistence.pipeline.builder.IdBuilder;
+import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory;
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+
+
+/**
+ * Implementation of the collection service
+ */
+@Singleton
+public class CollectionServiceImpl implements CollectionService {
+
+
+    private final PipelineBuilderFactory pipelineBuilderFactory;
+
+
+    @Inject
+    public CollectionServiceImpl( final PipelineBuilderFactory pipelineBuilderFactory ) {
+        this.pipelineBuilderFactory = pipelineBuilderFactory;
+    }
+
+
+    @Override
+    public Observable<ResultsPage<Entity>> searchCollection( final CollectionSearch search ) {
+
+
+        final ApplicationScope applicationScope = search.getApplicationScope();
+        final String collectionName = search.getCollectionName();
+        final Optional<String> query = search.getQuery();
+
+        final IdBuilder pipelineBuilder =
+            pipelineBuilderFactory.create( applicationScope ).withCursor( search.getCursor() )
+                                  .withLimit( search.getLimit() ).fromId( search.getCollectionOwnerId() );
+
+
+        final EntityBuilder results;
+
+        if ( !query.isPresent()) {
+            results = pipelineBuilder.traverseCollection( collectionName ).loadEntities();
+        }
+        else {
+            results = pipelineBuilder.searchCollection( collectionName, query.get(),search.getEntityType()).loadEntities();
+        }
+
+
+        return results.build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/208332da/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionSearch.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionSearch.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionSearch.java
new file mode 100644
index 0000000..51f6768
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionSearch.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.service;
+
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * Bean for input on searching a connection
+ */
+public class ConnectionSearch {
+
+    private final ApplicationScope applicationScope;
+    private final Id sourceNodeId;
+    private final Optional<String> entityType;
+    private final String connectionName;
+    private final int limit;
+    private final Optional<String> query;
+    private final Optional<String> cursor;
+
+
+    public ConnectionSearch( final ApplicationScope applicationScope, final Id sourceNodeId, final Optional<String> entityType,
+                             final String connectionName, final int limit, final Optional<String> query, final
+                             Optional<String> cursor ) {
+        this.applicationScope = applicationScope;
+        this.sourceNodeId = sourceNodeId;
+        this.entityType = entityType;
+        this.connectionName = connectionName;
+        this.limit = limit;
+        this.query = query;
+        this.cursor = cursor;
+    }
+
+
+    public ApplicationScope getApplicationScope() {
+        return applicationScope;
+    }
+
+
+    public String getConnectionName() {
+        return connectionName;
+    }
+
+
+    public Optional<String> getCursor() {
+        return cursor;
+    }
+
+
+    public int getLimit() {
+        return limit;
+    }
+
+
+    public Optional<String> getQuery() {
+        return query;
+    }
+
+
+    public Id getSourceNodeId() {
+        return sourceNodeId;
+    }
+
+
+    public Optional<String> getEntityType() {
+        return entityType;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/208332da/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionService.java
new file mode 100644
index 0000000..71a25c9
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionService.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.service;
+
+
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.persistence.ConnectionRef;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+
+
+/**
+ * Interface for operating on connections
+ */
+public interface ConnectionService {
+
+
+    /**
+     * Search a collection and return an observable of results pages
+     * @param search The search to perform
+     * @return An observable with results page entries for the stream
+     */
+    Observable<ResultsPage<Entity>> searchConnection(final ConnectionSearch search);
+
+
+    /**
+     * Search the connections and return ids instead of entities in results pages
+     * @param search
+     * @return
+     */
+    Observable<ResultsPage<ConnectionRef>> searchConnectionAsRefs( final ConnectionSearch search );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/208332da/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
new file mode 100644
index 0000000..c7e0fee
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.service;
+
+
+import org.apache.usergrid.corepersistence.pipeline.builder.EntityBuilder;
+import org.apache.usergrid.corepersistence.pipeline.builder.IdBuilder;
+import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory;
+import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.persistence.ConnectionRef;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+
+
+@Singleton
+public class ConnectionServiceImpl implements ConnectionService {
+
+    private final PipelineBuilderFactory pipelineBuilderFactory;
+
+
+    @Inject
+    public ConnectionServiceImpl( final PipelineBuilderFactory pipelineBuilderFactory ) {
+        this.pipelineBuilderFactory = pipelineBuilderFactory;
+    }
+
+
+    @Override
+    public Observable<ResultsPage<Entity>> searchConnection( final ConnectionSearch search ) {
+        //set startid -- graph | es query filter -- load entities filter (verifies exists) --> results page collector
+        // -> 1.0 results
+
+        //  startid -- graph edge load -- entity load (verify) from ids -> results page collector
+        // startid -- eq query candiddate -- entity load (verify) from canddiates -> results page collector
+
+        //startid -- graph edge load -- entity id verify --> filter to connection ref --> connection ref collector
+        //startid -- eq query candiddate -- candidate id verify --> filter to connection ref --> connection ref
+        // collector
+
+
+        final Optional<String> query = search.getQuery();
+
+        final IdBuilder pipelineBuilder =
+            pipelineBuilderFactory.create( search.getApplicationScope() ).withCursor( search.getCursor() )
+                                  .withLimit( search.getLimit() ).fromId( search.getSourceNodeId() );
+
+
+        //we want to load all entities
+
+        final EntityBuilder results;
+
+
+        if ( !query.isPresent() ) {
+            results =
+                pipelineBuilder.traverseConnection( search.getConnectionName(), search.getEntityType() ).loadEntities();
+        }
+
+        else {
+
+            results =
+                pipelineBuilder.searchConnection( search.getConnectionName(), query.get(), search.getEntityType() )
+                               .loadEntities();
+        }
+
+
+        return results.build();
+    }
+
+
+    @Override
+    public Observable<ResultsPage<ConnectionRef>> searchConnectionAsRefs( final ConnectionSearch search ) {
+
+        final Optional<String> query = search.getQuery();
+
+        final Id sourceNodeId = search.getSourceNodeId();
+
+        final IdBuilder pipelineBuilder =
+            pipelineBuilderFactory.create( search.getApplicationScope() ).withCursor( search.getCursor() )
+                                  .withLimit( search.getLimit() ).fromId( sourceNodeId );
+
+
+        final IdBuilder traversedIds;
+        final String connectionName = search.getConnectionName();
+
+        if ( !query.isPresent() ) {
+            traversedIds = pipelineBuilder.traverseConnection( connectionName, search.getEntityType() );
+        }
+        else {
+            traversedIds =
+                pipelineBuilder.searchConnection( connectionName, query.get(), search.getEntityType() ).loadIds();
+        }
+
+        //create connection refs
+
+        final Observable<ResultsPage<ConnectionRef>> results =
+            traversedIds.loadConnectionRefs( sourceNodeId, connectionName ).build();
+
+        return results;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/208332da/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
index 7cdc996..0426e37 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
@@ -48,7 +48,7 @@ public interface CassandraFig extends GuicyFig {
     String getHosts();
 
     @Key( "cassandra.version" )
-    @Default( "1.2" )
+    @Default( "2.1" )
     String getVersion();
 
     @Key( "cassandra.cluster_name" )

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/208332da/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
deleted file mode 100644
index 9807749..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.core.rx;
-
-
-
-import rx.Notification;
-import rx.Observable;
-import rx.Subscriber;
-import rx.Subscription;
-import rx.exceptions.Exceptions;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * Returns an Iterator that iterates over all items emitted by a specified Observable.
- * This blocks with an array blocking queue of 1
- * <p>
- * <img width="640" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.toIterator.png" alt="">
- * <p>
- *
- * @see <a href="https://github.com/ReactiveX/RxJava/issues/50">Issue #50</a>
- */
-public final class ObservableToBlockingIteratorFactory {
-    private ObservableToBlockingIteratorFactory() {
-        throw new IllegalStateException("No instances!");
-    }
-
-    /**
-     * Returns an iterator that iterates all values of the observable.
-     *
-     * @param <T>
-     *            the type of source.
-     * @return the iterator that could be used to iterate over the elements of the observable.
-     */
-    public static <T> Iterator<T> toIterator(Observable<? extends T> source) {
-        final BlockingQueue<Notification<? extends T>> notifications = new ArrayBlockingQueue<>(1);
-
-        // using subscribe instead of unsafeSubscribe since this is a BlockingObservable "final subscribe"
-        final Subscription subscription = source.materialize().subscribe(new Subscriber<Notification<? extends T>>() {
-            @Override
-            public void onCompleted() {
-                // ignore
-            }
-
-            @Override
-            public void onError(Throwable e) {
-                try{
-                    notifications.put(Notification.<T>createOnError(e));
-                }catch (Exception t){
-
-                }
-            }
-
-            @Override
-            public void onNext(Notification<? extends T> args) {
-                try{
-                    notifications.put(args);
-                }catch (Exception t){
-
-                }
-            }
-        });
-
-        return new Iterator<T>() {
-            private Notification<? extends T> buf;
-
-            @Override
-            public boolean hasNext() {
-                if (buf == null) {
-                    buf = take();
-                }
-                if (buf.isOnError()) {
-                    throw Exceptions.propagate(buf.getThrowable());
-                }
-                return !buf.isOnCompleted();
-            }
-
-            @Override
-            public T next() {
-                if (hasNext()) {
-                    T result = buf.getValue();
-                    buf = null;
-                    return result;
-                }
-                throw new NoSuchElementException();
-            }
-
-            private Notification<? extends T> take() {
-                try {
-                    return notifications.take();
-                } catch (InterruptedException e) {
-                    subscription.unsubscribe();
-                    throw Exceptions.propagate(e);
-                }
-            }
-
-            @Override
-            public void remove() {
-                throw new UnsupportedOperationException("Read-only iterator");
-            }
-        };
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/208332da/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
index 649ac7a..6d8a466 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
@@ -19,23 +19,19 @@
 package org.apache.usergrid.persistence.core.rx;
 
 
-import java.util.*;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import rx.Notification;
 import rx.Observable;
-import rx.Observable.Transformer;
 import rx.Subscriber;
-import rx.Subscription;
-import rx.exceptions.Exceptions;
 import rx.schedulers.Schedulers;
 
 import static org.junit.Assert.assertEquals;
@@ -156,20 +152,20 @@ public class OrderedMergeTest {
 
         List<Integer> expected1List = Arrays.asList( 5, 3, 2, 0 );
 
-        Observable<Integer> expected1 = Observable.from(expected1List);
+        Observable<Integer> expected1 = Observable.from( expected1List );
 
-        List<Integer> expected2List = Arrays.asList(10, 7, 6, 4);
+        List<Integer> expected2List = Arrays.asList( 10, 7, 6, 4 );
 
-        Observable<Integer> expected2 = Observable.from(expected2List);
+        Observable<Integer> expected2 = Observable.from( expected2List );
 
-        List<Integer> expected3List = Arrays.asList(9, 8, 1);
+        List<Integer> expected3List = Arrays.asList( 9, 8, 1 );
 
-        Observable<Integer> expected3 = Observable.from(expected3List);
+        Observable<Integer> expected3 = Observable.from( expected3List );
 
         //set our buffer size to 2.  We should easily exceed this since every observable has more than 2 elements
 
         Observable<Integer> ordered =
-                OrderedMerge.orderedMerge(new ReverseIntegerComparator(), 2, expected1, expected2, expected3);
+                OrderedMerge.orderedMerge( new ReverseIntegerComparator(), 2, expected1, expected2, expected3 );
 
         final CountDownLatch latch = new CountDownLatch( 1 );
         final List<Integer> results = new ArrayList();
@@ -208,9 +204,9 @@ public class OrderedMergeTest {
         /**
          * Since we're on the same thread, we should blow up before we begin producing elements our size
          */
-        assertEquals(0, results.size());
+        assertEquals( 0, results.size() );
 
-        assertTrue("An exception was thrown", errorThrown[0]);
+        assertTrue( "An exception was thrown", errorThrown[0] );
     }
 
 
@@ -247,14 +243,14 @@ public class OrderedMergeTest {
             @Override
             public void onError(final Throwable e) {
                 e.printStackTrace();
-                fail("An error was thrown ");
+                fail( "An error was thrown " );
             }
 
 
             @Override
             public void onNext(final Integer integer) {
                 log.info("onNext invoked with {}", integer);
-                results.add(integer);
+                results.add( integer );
             }
         });
 
@@ -262,7 +258,7 @@ public class OrderedMergeTest {
 
         List<Integer> expected = Arrays.asList( 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 );
 
-        assertEquals(expected.size(), results.size());
+        assertEquals( expected.size(), results.size() );
 
 
         for ( int i = 0; i < expected.size(); i++ ) {
@@ -310,7 +306,7 @@ public class OrderedMergeTest {
 
             @Override
             public void onError( final Throwable e ) {
-                log.error("Expected error thrown", e);
+                log.error( "Expected error thrown", e );
 
                 if ( e.getMessage().contains( "The maximum queue size of 2 has been reached" ) ) {
                     errorThrown[0] = true;
@@ -322,14 +318,14 @@ public class OrderedMergeTest {
 
             @Override
             public void onNext( final Integer integer ) {
-                log.info("onNext invoked with {}", integer);
+                log.info( "onNext invoked with {}", integer );
             }
         } );
 
         latch.await();
 
 
-        assertTrue("An exception was thrown", errorThrown[0]);
+        assertTrue( "An exception was thrown", errorThrown[0] );
     }
 
 
@@ -539,34 +535,6 @@ public class OrderedMergeTest {
           }
       }
 
-    @Test
-    public void obsIterator() {
-        Iterator<Object> iterator = ObservableToBlockingIteratorFactory.toIterator(Observable.create(subscriber -> {
-            int count = 0;
-            while (!subscriber.isUnsubscribed()) {
-                //pull from source
-                for (int i = 0; i < 10 && !subscriber.isUnsubscribed(); i++) {
-                    //emit
-                    log.info("loop " + count);
-                    subscriber.onNext(count++);
-                }
-            }
-
-            subscriber.onCompleted();
-        })
-            .onBackpressureBlock(1)
-            .doOnNext(o -> {
-                log.info("iteration " + o);
-            }).subscribeOn(Schedulers.io()));
-        //never
-        Object it =iterator.next();
-        it = iterator.next();
-        log.info("iterate");
-        it = iterator.next();
-        log.info("iterate");
-
-        Object size = it;
-    }
 
 
 


Mime
View raw message