usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [24/50] usergrid git commit: Fixing https://issues.apache.org/jira/browse/USERGRID-1310. Also fixed an NPE found during the fix. ( in abstract connection service when entity us null it throws NPE. Changed it to throw 404)
Date Mon, 22 Aug 2016 15:00:28 GMT
Fixing https://issues.apache.org/jira/browse/USERGRID-1310.
Also fixed an NPE found during the fix. ( in abstract connection service when entity us null
it throws NPE. Changed it to throw 404)


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/b1157a89
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/b1157a89
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/b1157a89

Branch: refs/heads/asf-site
Commit: b1157a8924686557e5c26966c5a0b14fb87eb2d6
Parents: 10e8957
Author: Ayesha Dastagiri <ayesha.amrin@gmail.com>
Authored: Thu Aug 11 11:37:23 2016 -0700
Committer: Ayesha Dastagiri <ayesha.amrin@gmail.com>
Committed: Thu Aug 11 11:37:23 2016 -0700

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      |  55 ++--
 .../pipeline/builder/IdBuilder.java             |  29 +-
 .../pipeline/read/FilterFactory.java            |   8 +
 .../AbstractReadReverseGraphFilter.java         | 291 +++++++++++++++++++
 .../ReadGraphReverseConnectionFilter.java       |  53 ++++
 .../service/ConnectionSearch.java               |   8 +-
 .../service/ConnectionServiceImpl.java          |   9 +-
 .../org/apache/usergrid/persistence/Query.java  |  39 ++-
 .../persistence/EntityConnectionsIT.java        |  67 ++++-
 .../services/AbstractConnectionsService.java    |  35 +--
 .../usergrid/services/ConnectionsServiceIT.java |  74 ++++-
 11 files changed, 559 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index b398562..57b1526 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -17,69 +17,48 @@
 package org.apache.usergrid.corepersistence;
 
 
-import java.util.*;
-
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.index.CollectionSettings;
 import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory;
 import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl;
-import org.apache.usergrid.corepersistence.results.IdQueryExecutor;
-import org.apache.usergrid.persistence.map.MapManager;
-import org.apache.usergrid.persistence.map.MapScope;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.util.Assert;
-
-import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
 import org.apache.usergrid.corepersistence.results.ConnectionRefQueryExecutor;
 import org.apache.usergrid.corepersistence.results.EntityQueryExecutor;
+import org.apache.usergrid.corepersistence.results.IdQueryExecutor;
 import org.apache.usergrid.corepersistence.service.CollectionSearch;
 import org.apache.usergrid.corepersistence.service.CollectionService;
 import org.apache.usergrid.corepersistence.service.ConnectionSearch;
 import org.apache.usergrid.corepersistence.service.ConnectionService;
 import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.ConnectedEntityRef;
-import org.apache.usergrid.persistence.ConnectionRef;
-import org.apache.usergrid.persistence.Entity;
-import org.apache.usergrid.persistence.EntityManager;
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.*;
 import org.apache.usergrid.persistence.Query.Level;
-import org.apache.usergrid.persistence.RelationManager;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.RoleRef;
-import org.apache.usergrid.persistence.Schema;
-import org.apache.usergrid.persistence.SimpleEntityRef;
-import org.apache.usergrid.persistence.SimpleRoleRef;
 import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.entities.Group;
 import org.apache.usergrid.persistence.entities.User;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.SearchByEdge;
-import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.*;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.index.query.Identifier;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapScope;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.schema.CollectionInfo;
 import org.apache.usergrid.utils.InflectionUtils;
 import org.apache.usergrid.utils.MapUtils;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
 import rx.Observable;
 
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionEdge;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionEdge;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionSearchByEdge;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getNameFromEdgeType;
+import java.util.*;
+
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.*;
 import static org.apache.usergrid.persistence.Schema.*;
 import static org.apache.usergrid.utils.ClassUtils.cast;
 import static org.apache.usergrid.utils.InflectionUtils.singularize;
@@ -954,7 +933,7 @@ public class CpRelationManager implements RelationManager {
         final Id sourceId = headEntity.asId();
 
         final Optional<String> queryString = query.isGraphSearch()? Optional.<String>absent():
query.getQl();
-
+        final boolean isConnecting = query.isConnecting();
 
         if ( query.getResultsLevel() == Level.REFS || query.getResultsLevel() == Level.IDS
) {
 
@@ -968,7 +947,7 @@ public class CpRelationManager implements RelationManager {
 
                     final ConnectionSearch search =
                         new ConnectionSearch( applicationScope, sourceId, entityType, connection,
toExecute.getLimit(),
-                            queryString, cursor );
+                            queryString, cursor, isConnecting );
                     return connectionService.searchConnectionAsRefs( search );
                 }
             }.next();
@@ -983,7 +962,7 @@ public class CpRelationManager implements RelationManager {
                 //we need the callback so as we get a new cursor, we execute a new search
and re-initialize our builders
                 final ConnectionSearch search =
                     new ConnectionSearch( applicationScope, sourceId, entityType, connection,
toExecute.getLimit(),
-                        queryString, cursor );
+                        queryString, cursor, isConnecting );
                 return connectionService.searchConnection( search );
             }
         }.next();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
index 781d7d5..b7d1f86 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
@@ -20,9 +20,10 @@
 package org.apache.usergrid.corepersistence.pipeline.builder;
 
 
+import com.google.common.base.Optional;
+import org.apache.usergrid.corepersistence.pipeline.Pipeline;
 import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
 import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory;
-import org.apache.usergrid.corepersistence.pipeline.Pipeline;
 import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
 import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefFilter;
@@ -30,13 +31,9 @@ import org.apache.usergrid.corepersistence.pipeline.read.collect.ConnectionRefRe
 import org.apache.usergrid.corepersistence.pipeline.read.collect.IdResumeFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector;
 import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate;
-import org.apache.usergrid.corepersistence.pipeline.read.traverse.IdFilter;
 import org.apache.usergrid.persistence.ConnectionRef;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Optional;
-
 import rx.Observable;
 
 
@@ -69,6 +66,28 @@ public class IdBuilder {
 
 
     /**
+     * Traverse all connection edges to our input Id
+     * @param connectionName The name of the connection
+     * @param entityType The optional type of the entity
+     * @return
+     */
+    public IdBuilder traverseReverseConnection( final String connectionName, final Optional<String>
entityType ) {
+
+        final PipelineOperation<FilterResult<Id>, FilterResult<Id>> filter;
+
+        if(entityType.isPresent()){
+            //todo: change this too.
+            filter = filterFactory.readGraphConnectionByTypeFilter( connectionName, entityType.get()
);
+        }else{
+            filter = filterFactory.readGraphReverseConnectionFilter( connectionName );
+        }
+
+
+        return new IdBuilder( pipeline.withFilter(filter ), filterFactory );
+    }
+
+
+    /**
      * Traverse all the collection edges from our input Id
      * @param collectionName
      * @return

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
index 883fdc8..4b615d8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java
@@ -62,6 +62,14 @@ public interface FilterFactory {
      */
     ReadGraphConnectionFilter readGraphConnectionFilter( final String connectionName );
 
+
+    /**
+     * Generate a new instance of the command with the specified parameters
+     *
+     * @param connectionName The connection name to use when reverse traversing the graph
+     */
+    ReadGraphReverseConnectionFilter readGraphReverseConnectionFilter( final String connectionName
);
+
     /**
      * Generate a new instance of the command with the specified parameters
      *

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
new file mode 100644
index 0000000..dcda98f
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+
+import rx.Observable;
+import rx.functions.Func1;
+
+
+/**
+ * Command for reading graph edges in reverse order.
+ */
+public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<Id, Id,
MarkedEdge> {
+
+    private static final Logger logger = LoggerFactory.getLogger( AbstractReadGraphFilter.class
);
+
+    private final GraphManagerFactory graphManagerFactory;
+    private final RxTaskScheduler rxTaskScheduler;
+    private final EventBuilder eventBuilder;
+    private final AsyncEventService asyncEventService;
+
+
+    /**
+     * Create a new instance of our command
+     */
+    public AbstractReadReverseGraphFilter( final GraphManagerFactory graphManagerFactory,
+                                    final RxTaskScheduler rxTaskScheduler,
+                                    final EventBuilder eventBuilder,
+                                    final AsyncEventService asyncEventService ) {
+        this.graphManagerFactory = graphManagerFactory;
+        this.rxTaskScheduler = rxTaskScheduler;
+        this.eventBuilder = eventBuilder;
+        this.asyncEventService = asyncEventService;
+    }
+
+
+    @Override
+    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>>
previousIds ) {
+
+
+        final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
+
+        //get the graph manager
+        final GraphManager graphManager =
+            graphManagerFactory.createEdgeManager( applicationScope );
+
+
+        final String edgeName = getEdgeTypeName();
+        final EdgeState edgeCursorState = new EdgeState();
+
+
+        //return all ids that are emitted from this edge
+        return previousIds.flatMap( previousFilterValue -> {
+
+            //set our our constant state
+            final Optional<MarkedEdge> startFromCursor = getSeekValue();
+            final Id id = previousFilterValue.getValue();
+
+
+            final Optional<Edge> typeWrapper = Optional.fromNullable(startFromCursor.orNull());
+
+            /**
+             * We do not want to filter.  This is intentional DO NOT REMOVE!!!
+             *
+             * We want to fire events on these edges if they exist, the delete was missed.
+             */
+            final SimpleSearchByEdgeType search =
+                new SimpleSearchByEdgeType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                    typeWrapper, false );
+
+            /**
+             * TODO, pass a message with pointers to our cursor values to be generated later
+             */
+            return graphManager.loadEdgesToTarget( search ).filter(markedEdge -> {
+
+                final boolean isDeleted = markedEdge.isDeleted();
+                final boolean isSourceNodeDeleted = markedEdge.isSourceNodeDelete();
+                final boolean isTargetNodeDelete = markedEdge.isTargetNodeDeleted();
+
+
+                if (isDeleted) {
+
+                    logger.info("Edge {} is deleted when seeking, deleting the edge", markedEdge);
+                    final Observable<IndexOperationMessage> indexMessageObservable
= eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
+
+                    indexMessageObservable
+                        .compose(applyCollector())
+                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+                        .subscribe();
+
+                }
+
+                if (isSourceNodeDeleted) {
+
+                    final Id sourceNodeId = markedEdge.getSourceNode();
+                    logger.info("Edge {} has a deleted source node, deleting the entity for
id {}", markedEdge, sourceNodeId);
+
+                    final EventBuilderImpl.EntityDeleteResults
+                        entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope,
sourceNodeId);
+
+                    entityDeleteResults.getIndexObservable()
+                        .compose(applyCollector())
+                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+                        .subscribe();
+
+                    Observable.merge(entityDeleteResults.getEntitiesDeleted(),
+                        entityDeleteResults.getCompactedNode())
+                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
+                        subscribe();
+
+                }
+
+                if (isTargetNodeDelete) {
+
+                    final Id targetNodeId = markedEdge.getTargetNode();
+                    logger.info("Edge {} has a deleted target node, deleting the entity for
id {}", markedEdge, targetNodeId);
+
+                    final EventBuilderImpl.EntityDeleteResults
+                        entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope,
targetNodeId);
+
+                    entityDeleteResults.getIndexObservable()
+                        .compose(applyCollector())
+                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
+                        .subscribe();
+
+                    Observable.merge(entityDeleteResults.getEntitiesDeleted(),
+                        entityDeleteResults.getCompactedNode())
+                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
+                        subscribe();
+
+                }
+
+
+                //filter if any of them are marked
+                return !isDeleted && !isSourceNodeDeleted && !isTargetNodeDelete;
+
+
+            })  // any non-deleted edges should be de-duped here so the results are unique
+                .distinct( new EdgeDistinctKey() )
+                //set the edge state for cursors
+                .doOnNext( edge -> {
+                    if (logger.isTraceEnabled()) {
+                        logger.trace("Seeking over edge {}", edge);
+                    }
+                    edgeCursorState.update( edge );
+                } )
+
+                //map our id from the target edge  and set our cursor every edge we traverse
+                .map( edge -> createFilterResult( edge.getSourceNode(), edgeCursorState.getCursorEdge(),
+                    previousFilterValue.getPath() ) );
+        } );
+    }
+
+
+    @Override
+    protected FilterResult<Id> createFilterResult( final Id emit, final MarkedEdge
cursorValue,
+                                                   final Optional<EdgePath> parent
) {
+
+        //if it's our first pass, there's no cursor to generate
+        if(cursorValue == null){
+            return new FilterResult<>( emit, parent );
+        }
+
+        return super.createFilterResult( emit, cursorValue, parent );
+    }
+
+
+    @Override
+    protected CursorSerializer<MarkedEdge> getCursorSerializer() {
+        return EdgeCursorSerializer.INSTANCE;
+    }
+
+
+    /**
+     * Get the edge type name we should use when traversing
+     */
+    protected abstract String getEdgeTypeName();
+
+
+    /**
+     * Wrapper class. Because edges seek > the last returned, we need to keep our n-1
value. This will be our cursor We
+     * always try to seek to the same position as we ended.  Since we don't deal with a persistent
read result, if we
+     * seek to a value = to our last, we may skip data.
+     */
+    private final class EdgeState {
+
+        private MarkedEdge cursorEdge = null;
+        private MarkedEdge currentEdge = null;
+
+
+        /**
+         * Update the pointers
+         */
+        private void update( final MarkedEdge newEdge ) {
+            cursorEdge = currentEdge;
+            currentEdge = newEdge;
+        }
+
+
+        /**
+         * Get the edge to use in cursors for resume
+         */
+        private MarkedEdge getCursorEdge() {
+            return cursorEdge;
+        }
+    }
+
+    private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector()
{
+
+        return observable -> observable
+            .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
+            .filter(msg -> !msg.isEmpty())
+            .doOnNext(indexOperation -> {
+                asyncEventService.queueIndexOperationMessage(indexOperation);
+            });
+
+    }
+
+    /**
+     *  Return a key that Rx can use for determining a distinct edge.  Build a string containing
the UUID
+     *  of the source and target nodes, with the type to ensure uniqueness rather than the
int sum of the hash codes.
+     *  Edge timestamp is specifically left out as edges with the same source,target,type
but different timestamps
+     *  are considered duplicates.
+     */
+    private class EdgeDistinctKey implements Func1<Edge,String> {
+
+        @Override
+        public String call(Edge edge) {
+
+            return buildDistinctKey(edge.getSourceNode().getUuid().toString(), edge.getTargetNode().getUuid().toString(),
+                edge.getType().toLowerCase());
+        }
+    }
+
+    protected static String buildDistinctKey(final String sourceNode, final String targetNode,
final String type){
+
+        final String DISTINCT_KEY_SEPARATOR = ":";
+        StringBuilder stringBuilder = new StringBuilder();
+
+        stringBuilder
+            .append(sourceNode)
+            .append(DISTINCT_KEY_SEPARATOR)
+            .append(targetNode)
+            .append(DISTINCT_KEY_SEPARATOR)
+            .append(type);
+
+        return stringBuilder.toString();
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphReverseConnectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphReverseConnectionFilter.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphReverseConnectionFilter.java
new file mode 100644
index 0000000..aa369c2
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphReverseConnectionFilter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
+import org.apache.usergrid.corepersistence.rx.impl.AsyncRepair;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromConnectionType;
+
+/**
+ * Created by ayeshadastagiri on 8/9/16.
+ */
+public class ReadGraphReverseConnectionFilter extends AbstractReadReverseGraphFilter{
+    private final String connectionName;
+
+    /**
+     * Create a new instance of our command
+     */
+    @Inject
+    public ReadGraphReverseConnectionFilter( final GraphManagerFactory graphManagerFactory,
+                                      @AsyncRepair final RxTaskScheduler rxTaskScheduler,
+                                      final EventBuilder eventBuilder,
+                                      final AsyncEventService asyncEventService,
+                                      @Assisted final String connectionName ) {
+        super( graphManagerFactory, rxTaskScheduler, eventBuilder, asyncEventService );
+        this.connectionName = connectionName;
+    }
+    @Override
+    protected String getEdgeTypeName() {
+        return getEdgeTypeFromConnectionType( connectionName );    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionSearch.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionSearch.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionSearch.java
index 51f6768..8ad57fb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionSearch.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionSearch.java
@@ -36,11 +36,12 @@ public class ConnectionSearch {
     private final int limit;
     private final Optional<String> query;
     private final Optional<String> cursor;
+    private final boolean isConnecting;
 
 
     public ConnectionSearch( final ApplicationScope applicationScope, final Id sourceNodeId,
final Optional<String> entityType,
                              final String connectionName, final int limit, final Optional<String>
query, final
-                             Optional<String> cursor ) {
+                             Optional<String> cursor, boolean isConnecting ) {
         this.applicationScope = applicationScope;
         this.sourceNodeId = sourceNodeId;
         this.entityType = entityType;
@@ -48,6 +49,7 @@ public class ConnectionSearch {
         this.limit = limit;
         this.query = query;
         this.cursor = cursor;
+        this.isConnecting = isConnecting;
     }
 
 
@@ -84,4 +86,8 @@ public class ConnectionSearch {
     public Optional<String> getEntityType() {
         return entityType;
     }
+
+    public boolean getIsConnecting(){
+        return isConnecting;
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
index 4b7e66c..926c676 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
@@ -94,8 +94,13 @@ public class ConnectionServiceImpl implements ConnectionService {
 
 
         if ( !query.isPresent() ) {
-            results =
-                pipelineBuilder.traverseConnection( search.getConnectionName(), search.getEntityType()
).loadEntities();
+            if(search.getIsConnecting()){
+                results = pipelineBuilder.traverseReverseConnection(search.getConnectionName(),
search.getEntityType()).loadEntities();
+            }
+            else {
+                results =
+                    pipelineBuilder.traverseConnection(search.getConnectionName(), search.getEntityType()).loadEntities();
+            }
         }
 
         else {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
index 150a1b0..d68c085 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
@@ -19,36 +19,25 @@
 package org.apache.usergrid.persistence;
 
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.codec.binary.Base64;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
 import org.apache.commons.lang.StringUtils;
-
 import org.apache.usergrid.persistence.index.SelectFieldMapping;
 import org.apache.usergrid.persistence.index.exceptions.QueryParseException;
 import org.apache.usergrid.persistence.index.query.CounterResolution;
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.persistence.index.query.tree.Operand;
 import org.apache.usergrid.persistence.index.utils.ClassUtils;
-import org.apache.usergrid.persistence.index.utils.ConversionUtils;
 import org.apache.usergrid.persistence.index.utils.ListUtils;
 import org.apache.usergrid.persistence.index.utils.MapUtils;
 
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Optional;
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.*;
+import java.util.Map.Entry;
 
 
 public class Query {
@@ -82,6 +71,7 @@ public class Query {
     private Long startTime;
     private Long finishTime;
     private boolean pad;
+    private boolean connecting = false;
     private CounterResolution resolution = CounterResolution.ALL;
     private List<Identifier> identifiers;
     private List<CounterFilterPredicate> counterFilters;
@@ -611,6 +601,15 @@ public class Query {
         this.pad = pad;
     }
 
+    //set the flag to retrieve the edges in the reverse direction.
+    public void setConnecting( boolean connecting ) {
+        this.connecting = connecting;
+    }
+
+    public boolean isConnecting() {
+        return connecting;
+    }
+
 
     public void setResolution( CounterResolution resolution ) {
         this.resolution = resolution;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java
b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java
index be2f06e..3d4e53c 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityConnectionsIT.java
@@ -17,24 +17,17 @@
 package org.apache.usergrid.persistence;
 
 
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
+import org.apache.usergrid.AbstractCoreIT;
+import org.apache.usergrid.persistence.Query.Level;
+import org.apache.usergrid.persistence.entities.User;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.AbstractCoreIT;
-import org.apache.usergrid.persistence.entities.User;
-import org.apache.usergrid.persistence.Query.Level;
+import java.util.*;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 public class EntityConnectionsIT extends AbstractCoreIT {
     private static final Logger logger = LoggerFactory.getLogger( EntityConnectionsIT.class
);
@@ -335,6 +328,54 @@ public class EntityConnectionsIT extends AbstractCoreIT {
         assertEquals( "user", res.getEntity().getType() );
     }
 
+    //not required . addd tests at service layer.
+    @Ignore
+    @Test
+    public void testGetConnectingEntitiesCursor() throws Exception {
+
+        UUID applicationId = app.getId( );
+        assertNotNull( applicationId );
+
+        EntityManager em = app.getEntityManager();
+        assertNotNull( em );
+
+        User fred = new User();
+        fred.setUsername( "fred" );
+        fred.setEmail( "fred@flintstones.com" );
+        Entity fredEntity = em.create( fred );
+        assertNotNull( fredEntity );
+
+        User wilma = new User();
+        wilma.setUsername( "wilma" );
+        wilma.setEmail( "wilma@flintstones.com" );
+        Entity wilmaEntity = em.create( wilma );
+        assertNotNull( wilmaEntity );
+
+        User John = new User();
+        John.setUsername( "John" );
+        John.setEmail( "John@flintstones.com" );
+        Entity JohnEntity = em.create( John );
+        assertNotNull( JohnEntity );
+
+        em.createConnection( fredEntity, "likes", wilmaEntity );
+        em.createConnection( fredEntity, "likes", JohnEntity );
+
+
+        app.refreshIndex();
+
+        // now query via the testConnection, this should work
+
+        Query query = Query.fromQLNullSafe("" );
+        query.setConnectionType( "likes" );
+//        query.setConnecting(true);
+        query.setEntityType( "user" );
+
+        // goes through "traverseReverseConnection"
+        Results r = em.searchTargetEntities(fredEntity, query);
+
+        assertEquals( 2, r.size() );
+    }
+
 
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java
b/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java
index 83549dd..0a9f6a7 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java
@@ -17,21 +17,8 @@
 package org.apache.usergrid.services;
 
 
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.ConnectionRef;
-import org.apache.usergrid.persistence.Entity;
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.*;
 import org.apache.usergrid.persistence.Query.Level;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.Schema;
-import org.apache.usergrid.persistence.SimpleEntityRef;
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.services.ServiceParameter.IdParameter;
 import org.apache.usergrid.services.ServiceParameter.NameParameter;
@@ -39,10 +26,15 @@ import org.apache.usergrid.services.ServiceParameter.QueryParameter;
 import org.apache.usergrid.services.ServiceResults.Type;
 import org.apache.usergrid.services.exceptions.ServiceResourceNotFoundException;
 import org.apache.usergrid.services.exceptions.UnsupportedServiceOperationException;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import rx.Observable;
 import rx.schedulers.Schedulers;
 
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
 import static org.apache.usergrid.services.ServiceParameter.filter;
 import static org.apache.usergrid.services.ServiceParameter.firstParameterIsName;
 import static org.apache.usergrid.utils.ClassUtils.cast;
@@ -307,6 +299,7 @@ public class AbstractConnectionsService extends AbstractService {
         Results r = null;
 
         if ( connecting() ) {
+            query.setConnecting(true);
             if ( query.hasQueryPredicates() ) {
                 if (logger.isTraceEnabled()) {
                     logger.trace("Attempted query of backwards connections");
@@ -314,13 +307,7 @@ public class AbstractConnectionsService extends AbstractService {
                 return null;
             }
             else {
-//            	r = em.getSourceEntities( context.getOwner().getUuid(), query.getConnectionType(),
-//            			query.getEntityType(), level );
-                // usergrid-2389: User defined limit in the query is ignored. Fixed it by
adding
-                // the limit to the method parameter downstream.
-            	r = em.getSourceEntities(
-                    new SimpleEntityRef(context.getOwner().getType(), context.getOwner().getUuid()),
-                    query.getConnectionType(), query.getEntityType(), level, query.getLimit());
+                r = em.searchTargetEntities(context.getOwner(),query);
             }
         }
         else {
@@ -381,6 +368,10 @@ public class AbstractConnectionsService extends AbstractService {
             }
             else {
                 entity = em.create( query.getEntityType(), context.getProperties() );
+                //if entity is null here it throws NPE. Fixing it to throw 404.
+                if ( entity == null ) {
+                    throw new ServiceResourceNotFoundException( context );
+                }
             }
             entity = importEntity( context, entity );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1157a89/stack/services/src/test/java/org/apache/usergrid/services/ConnectionsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/ConnectionsServiceIT.java
b/stack/services/src/test/java/org/apache/usergrid/services/ConnectionsServiceIT.java
index a1f19d4..4e65f54 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/ConnectionsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/ConnectionsServiceIT.java
@@ -17,19 +17,17 @@
 package org.apache.usergrid.services;
 
 
-import java.util.Map;
-
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.Query;
 import org.junit.Assert;
 import org.junit.Test;
-
-import org.apache.usergrid.persistence.Entity;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
 
 
 public class ConnectionsServiceIT extends AbstractServiceIT {
@@ -86,6 +84,66 @@ public class ConnectionsServiceIT extends AbstractServiceIT {
         app.testRequest( ServiceAction.POST, 1, "users", "conn-user1", "manages", "user"
);
     }
 
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void testUserConnectionsCursor() throws Exception {
+        app.put("username", "conn-user1");
+        app.put("email", "conn-user1@apigee.com");
+
+        Entity user1 = app.testRequest(ServiceAction.POST, 1, "users").getEntity();
+        assertNotNull(user1);
+
+        app.testRequest(ServiceAction.GET, 1, "users", "conn-user1");
+
+        app.put("username", "conn-user2");
+        app.put("email", "conn-user2@apigee.com");
+
+        Entity user2 = app.testRequest(ServiceAction.POST, 1, "users").getEntity();
+        assertNotNull(user2);
+
+
+        app.put("username", "conn-user3");
+        app.put("email", "conn-user3@apigee.com");
+
+        Entity user3 = app.testRequest(ServiceAction.POST, 1, "users").getEntity();
+        assertNotNull(user3);
+
+
+        //POST users/conn-user2/manages/user2/conn-user1
+        app.testRequest(ServiceAction.POST, 1, "users", "conn-user2", "likes", "users", "conn-user1");
+        //POST users/conn-user3/reports/users/conn-user1
+        app.testRequest(ServiceAction.POST, 1, "users", "conn-user3", "likes", "users", "conn-user1");
+
+        Query query = new Query().fromQLNullSafe("");
+        query.setLimit(1);
+
+        //the result should return a valid cursor.
+        ServiceResults result = app.testRequest(ServiceAction.GET, 1, "users", "conn-user1",
"connecting", "likes",query);
+        assertNotNull(result.getCursor());
+        String enityName1 = result.getEntity().getProperty("email").toString();
+
+        Query newquery = new Query().fromQLNullSafe("");
+        query.setCursor(result.getCursor());
+        result = app.testRequest(ServiceAction.GET,1,"users","conn-user1","connecting","likes",query);
+        String enityName2 = result.getEntity().getProperty("email").toString();
+
+        //ensure the two entities returned in above requests are different.
+        assertNotEquals(enityName1,enityName2);
+
+        newquery = new Query().fromQLNullSafe("");
+        query.setCursor(result.getCursor());
+        result = app.testRequest(ServiceAction.GET,0,"users","conn-user1","connecting","likes",query);
+        //return empty cursor when no more entitites found.
+        assertNull(result.getCursor());
+
+        //DELETE users/conn-user1/manages/user2/conn-user2 (qualified by collection type
on second entity)
+        app.testRequest(ServiceAction.DELETE, 1, "users", "conn-user2", "likes", "users",
"conn-user1");
+
+        app.testRequest(ServiceAction.GET,1,"users","conn-user1","connecting","likes");
+
+
+    }
+
     @Test
     public void testNonExistentEntity() throws Exception {
 


Mime
View raw message