usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [10/50] [abbrv] git commit: Merged hystrix into asyncqueue
Date Thu, 27 Mar 2014 21:21:24 GMT
Merged hystrix into asyncqueue


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/136edaba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/136edaba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/136edaba

Branch: refs/pull/77/merge
Commit: 136edaba0d9132282a097ca8ae743bf0075da7c2
Parents: 51381a3
Author: Todd Nine <tnine@apigee.com>
Authored: Tue Mar 25 13:18:51 2014 -0700
Committer: Todd Nine <tnine@apigee.com>
Committed: Tue Mar 25 13:18:51 2014 -0700

----------------------------------------------------------------------
 .../usergrid/persistence/graph/EdgeManager.java |  163 --
 .../persistence/graph/EdgeManagerFactory.java   |   40 -
 .../persistence/graph/GraphManager.java         |  169 ++
 .../persistence/graph/GraphManagerFactory.java  |   40 +
 .../graph/consistency/AsyncProcessorImpl.java   |    2 +-
 .../persistence/graph/guice/GraphModule.java    |   10 +-
 .../graph/impl/CollectionIndexObserver.java     |   14 +-
 .../graph/impl/EdgeDeleteListener.java          |   66 +-
 .../persistence/graph/impl/EdgeManagerImpl.java |  410 -----
 .../graph/impl/EdgeWriteListener.java           |  116 +-
 .../graph/impl/GraphManagerImpl.java            |  393 +++++
 .../graph/impl/NodeDeleteListener.java          |  139 +-
 .../graph/impl/stage/AbstractEdgeRepair.java    |   27 +-
 .../graph/serialization/EdgeSerialization.java  |   12 +-
 .../impl/EdgeSerializationImpl.java             |  221 ++-
 .../persistence/graph/EdgeManagerIT.java        |  110 +-
 .../graph/EdgeManagerStressTest.java            |   54 +-
 .../persistence/graph/EdgeManagerTimeoutIT.java | 1562 ------------------
 .../graph/GraphManagerTimeoutIT.java            | 1562 ++++++++++++++++++
 .../graph/consistency/AsyncProcessorTest.java   |   23 +-
 .../consistency/LocalTimeoutQueueTest.java      |    2 +-
 .../graph/guice/TestGraphModule.java            |    4 +-
 .../graph/impl/NodeDeleteListenerTest.java      |  210 ++-
 .../graph/impl/stage/EdgeDeleteRepairTest.java  |    6 +-
 .../graph/impl/stage/EdgeMetaRepairTest.java    |   83 +-
 .../graph/impl/stage/EdgeWriteRepairTest.java   |   40 +-
 .../EdgeSerializationChopTest.java              |   35 +-
 .../serialization/EdgeSerializationTest.java    |  163 +-
 .../serialization/NodeSerializationTest.java    |   13 +-
 .../serialization/util/EdgeHasherTest.java      |   14 +-
 stack/corepersistence/pom.xml                   |    1 +
 31 files changed, 2986 insertions(+), 2718 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/EdgeManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/EdgeManager.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/EdgeManager.java
deleted file mode 100644
index 9f9c510..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/EdgeManager.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph;
-
-
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-
-
-/**
- * Represents operations that can be performed on edges within our graph.  A graph should be within an
- * OrganizationScope
- *
- * An Edge: is defined as the following.
- *
- * The edge is directed It has 2 Identifiers (Id).  1 Id is the source node, 1 Id is the target node It has an edge type
- * (a string name)
- *
- * All edges are directed edges.  By definition, the direction is from Source to Target.
- *
- * I.E Source ---- type -----> Target Ex:
- *
- * Dave (user) ----"follows"---> Alex (user)
- *
- * Alex (user)  ----"likes"---> Guinness (beer)
- *
- * Todd (user) ----"worksfor"-----> Apigee (company)
- *
- * Note that edges are directed.  All implementations always have an implicit inverse of the directed edge. This can be
- * used to search both incoming and outgoing edges within the graph.
- *
- * @author tnine
- * @see Edge
- */
-public interface EdgeManager {
-
-
-    /**
-     * @param edge The edge to write
-     *
-     * Create or update an edge.  Note that the implementation should also create incoming (reversed) edges for this
-     * edge.
-     */
-    Observable<Edge> writeEdge( Edge edge );
-
-
-    /**
-     * @param edge The edge to delete
-     *
-     *
-     * EdgeDelete the edge. Implementation should also delete the incoming (reversed) edge.
-     */
-    Observable<Edge> deleteEdge( Edge edge );
-
-    /**
-     * TODO: This needs to mark a node as deleted while consistency processing occurs, our reads would need to check this filter on read
-     *
-     * Remove the node from the graph.
-     *
-     * @param node
-     * @return
-     */
-    Observable<Id> deleteNode(Id node);
-
-    /**
-     * Returns an observable that emits all edges where the specified node is the source node. The edges will match the
-     * search criteria of the edge type
-     *
-     * @param search The search parameters
-     *
-     * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
-     */
-    Observable<Edge> loadEdgesFromSource( SearchByEdgeType search );
-
-    /**
-     * Returns an observable that emits all edges where the specified node is the target node. The edges will match the
-     * search criteria of the edge type
-     *
-     * @param search The search parameters
-     *
-     * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
-     */
-    Observable<Edge> loadEdgesToTarget( SearchByEdgeType search );
-
-
-    /**
-     * Returns an observable that emits all edges where the specified node is the source node. The edges will match the
-     * search criteria of the edge type and the target type
-     *
-     * @param search The search parameters
-     *
-     * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
-     */
-    Observable<Edge> loadEdgesFromSourceByType( SearchByIdType search );
-
-
-    /**
-     * Returns an observable that emits all edges where the specified node is the target node. The edges will match the
-     * search criteria of the edge type and the target type
-     *
-     * @param search The search parameters
-     *
-     * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
-     */
-    Observable<Edge> loadEdgesToTargetByType( SearchByIdType search );
-
-    /**
-     * Get all edge types to this node.  The node provided by search is the target node.
-     *
-     * @param search The search
-     *
-     * @return An observable that emits strings for edge types
-     */
-    Observable<String> getEdgeTypesFromSource( SearchEdgeType search );
-
-
-    /**
-     * Get all id types to this node.  The node provided by search is the target node with the edge type to search.
-     *
-     * @param search The search criteria
-     *
-     * @return An observable of all source id types
-     */
-    Observable<String> getIdTypesFromSource( SearchIdType search );
-
-
-    /**
-     * Get all edge types from this node.  The node provided by search is the source node.
-     *
-     * @param search The search
-     *
-     * @return An observable that emits strings for edge types
-     */
-    Observable<String> getEdgeTypesToTarget( SearchEdgeType search );
-
-
-    /**
-     * Get all id types from this node.  The node provided by search is the source node with the edge type to search.
-     *
-     * @param search The search criteria
-     *
-     * @return An observable of all source id types
-     */
-    Observable<String> getIdTypesToTarget( SearchIdType search );
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/EdgeManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/EdgeManagerFactory.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/EdgeManagerFactory.java
deleted file mode 100644
index f01f876..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/EdgeManagerFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph;
-
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-
-
-/**
- *
- * @author: tnine
- *
- */
-public interface EdgeManagerFactory
-{
-
-    /**
-     * Create an graph manager for the collection context
-     *
-     * @param collectionScope The context to use when creating the graph manager
-     */
-    public EdgeManager createEdgeManager( OrganizationScope collectionScope );
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
new file mode 100644
index 0000000..8af77d6
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph;
+
+
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+
+
+/**
+ * Represents operations that can be performed on edges within our graph.  A graph should be within an
+ * OrganizationScope
+ *
+ * An Edge: is defined as the following.
+ *
+ * The edge is directed It has 2 Identifiers (Id).  1 Id is the source node, 1 Id is the target node It has an edge type
+ * (a string name)
+ *
+ * All edges are directed edges.  By definition, the direction is from Source to Target.
+ *
+ * I.E Source ---- type -----> Target Ex:
+ *
+ * Dave (user) ----"follows"---> Alex (user)
+ *
+ * Alex (user)  ----"likes"---> Guinness (beer)
+ *
+ * Todd (user) ----"worksfor"-----> Apigee (company)
+ *
+ * Note that edges are directed.  All implementations always have an implicit inverse of the directed edge. This can be
+ * used to search both incoming and outgoing edges within the graph.
+ *
+ * @author tnine
+ * @see Edge
+ */
+public interface GraphManager {
+
+
+    /**
+     * @param edge The edge to write
+     *
+     * Create or update an edge.  Note that the implementation should also create incoming (reversed) edges for this
+     * edge.
+     */
+    Observable<Edge> writeEdge( Edge edge );
+
+
+    /**
+     * @param edge The edge to delete
+     *
+     *
+     * EdgeDelete the edge. Implementation should also delete the incoming (reversed) edge. Only deletes the specific version
+     */
+    Observable<Edge> deleteEdge( Edge edge );
+
+    /**
+     *
+     * Remove the node from the graph.
+     *
+     * @param node
+     * @return
+     */
+    Observable<Id> deleteNode(Id node);
+
+    /**
+     * Get all versions of this edge where versions <= max version
+     * @param edge
+     * @return
+     */
+    Observable<Edge> loadEdgeVersions( SearchByEdge edge );
+
+    /**
+     * Returns an observable that emits all edges where the specified node is the source node. The edges will match the
+     * search criteria of the edge type
+     *
+     * @param search The search parameters
+     *
+     * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
+     */
+    Observable<Edge> loadEdgesFromSource( SearchByEdgeType search );
+
+    /**
+     * Returns an observable that emits all edges where the specified node is the target node. The edges will match the
+     * search criteria of the edge type
+     *
+     * @param search The search parameters
+     *
+     * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
+     */
+    Observable<Edge> loadEdgesToTarget( SearchByEdgeType search );
+
+
+    /**
+     * Returns an observable that emits all edges where the specified node is the source node. The edges will match the
+     * search criteria of the edge type and the target type
+     *
+     * @param search The search parameters
+     *
+     * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
+     */
+    Observable<Edge> loadEdgesFromSourceByType( SearchByIdType search );
+
+
+    /**
+     * Returns an observable that emits all edges where the specified node is the target node. The edges will match the
+     * search criteria of the edge type and the target type
+     *
+     * @param search The search parameters
+     *
+     * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption.
+     */
+    Observable<Edge> loadEdgesToTargetByType( SearchByIdType search );
+
+    /**
+     * Get all edge types to this node.  The node provided by search is the target node.
+     *
+     * @param search The search
+     *
+     * @return An observable that emits strings for edge types
+     */
+    Observable<String> getEdgeTypesFromSource( SearchEdgeType search );
+
+
+    /**
+     * Get all id types to this node.  The node provided by search is the target node with the edge type to search.
+     *
+     * @param search The search criteria
+     *
+     * @return An observable of all source id types
+     */
+    Observable<String> getIdTypesFromSource( SearchIdType search );
+
+
+    /**
+     * Get all edge types from this node.  The node provided by search is the source node.
+     *
+     * @param search The search
+     *
+     * @return An observable that emits strings for edge types
+     */
+    Observable<String> getEdgeTypesToTarget( SearchEdgeType search );
+
+
+    /**
+     * Get all id types from this node.  The node provided by search is the source node with the edge type to search.
+     *
+     * @param search The search criteria
+     *
+     * @return An observable of all source id types
+     */
+    Observable<String> getIdTypesToTarget( SearchIdType search );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
new file mode 100644
index 0000000..499259b
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph;
+
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+
+
+/**
+ *
+ * @author: tnine
+ *
+ */
+public interface GraphManagerFactory
+{
+
+    /**
+     * Create an graph manager for the collection context
+     *
+     * @param collectionScope The context to use when creating the graph manager
+     */
+    public GraphManager createEdgeManager( OrganizationScope collectionScope );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
index 2b00d1a..20411e7 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
@@ -45,7 +45,7 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
 
 
     @Inject
-    public AsyncProcessorImpl( final TimeoutQueue<T> queue, final GraphFig graphFig ) {
+    public AsyncProcessorImpl( final TimeoutQueue<T> queue,  final GraphFig graphFig ) {
         this.queue = queue;
         this.graphFig = graphFig;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index b499e4c..d575471 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -24,15 +24,15 @@ import org.safehaus.guicyfig.GuicyFigModule;
 import org.apache.usergrid.persistence.collection.guice.CollectionModule;
 import org.apache.usergrid.persistence.collection.migration.Migration;
 import org.apache.usergrid.persistence.collection.mvcc.event.PostProcessObserver;
-import org.apache.usergrid.persistence.graph.EdgeManager;
-import org.apache.usergrid.persistence.graph.EdgeManagerFactory;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
 import org.apache.usergrid.persistence.graph.consistency.AsyncProcessorImpl;
 import org.apache.usergrid.persistence.graph.consistency.LocalTimeoutQueue;
 import org.apache.usergrid.persistence.graph.consistency.TimeoutQueue;
 import org.apache.usergrid.persistence.graph.impl.CollectionIndexObserver;
-import org.apache.usergrid.persistence.graph.impl.EdgeManagerImpl;
+import org.apache.usergrid.persistence.graph.impl.GraphManagerImpl;
 import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteRepair;
 import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteRepairImpl;
 import org.apache.usergrid.persistence.graph.impl.stage.EdgeMetaRepair;
@@ -74,8 +74,8 @@ public class GraphModule extends AbstractModule {
         bind( CassandraConfig.class).to( CassandraConfigImpl.class );
 
         // create a guice factory for getting our collection manager
-        install( new FactoryModuleBuilder().implement( EdgeManager.class, EdgeManagerImpl.class )
-                                           .build( EdgeManagerFactory.class ) );
+        install( new FactoryModuleBuilder().implement( GraphManager.class, GraphManagerImpl.class )
+                                           .build( GraphManagerFactory.class ) );
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/CollectionIndexObserver.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/CollectionIndexObserver.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/CollectionIndexObserver.java
index 6175b0b..a41cd4a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/CollectionIndexObserver.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/CollectionIndexObserver.java
@@ -24,8 +24,8 @@ import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
 import org.apache.usergrid.persistence.collection.mvcc.event.PostProcessObserver;
 import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.EdgeManager;
-import org.apache.usergrid.persistence.graph.EdgeManagerFactory;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
@@ -38,13 +38,13 @@ import com.google.inject.Singleton;
 @Singleton
 public class CollectionIndexObserver implements PostProcessObserver {
 
-    private final EdgeManagerFactory edgeManagerFactory;
+    private final GraphManagerFactory graphManagerFactory;
 
 
     @Inject
-    public CollectionIndexObserver( final EdgeManagerFactory edgeManagerFactory ) {
-        Preconditions.checkNotNull( edgeManagerFactory, "edgeManagerFactory cannot be null" );
-        this.edgeManagerFactory = edgeManagerFactory;
+    public CollectionIndexObserver( final GraphManagerFactory graphManagerFactory ) {
+        Preconditions.checkNotNull( graphManagerFactory, "graphManagerFactory cannot be null" );
+        this.graphManagerFactory = graphManagerFactory;
     }
 
 
@@ -53,7 +53,7 @@ public class CollectionIndexObserver implements PostProcessObserver {
     public void postCommit( final CollectionScope scope, final MvccEntity entity ) {
 
         //get the edge manager for the org scope
-        EdgeManager em = edgeManagerFactory.createEdgeManager( scope );
+        GraphManager em = graphManagerFactory.createEdgeManager( scope );
 
         /**
          * create an edge from owner->entity of the type name in the scope.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
index a5715c9..11c052b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListener.java
@@ -1,21 +1,18 @@
 package org.apache.usergrid.persistence.graph.impl;
 
 
-import java.util.Iterator;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.collection.OrganizationScope;
 import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.EdgeManager;
-import org.apache.usergrid.persistence.graph.EdgeManagerFactory;
 import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
 import org.apache.usergrid.persistence.graph.consistency.MessageListener;
 import org.apache.usergrid.persistence.graph.guice.EdgeDelete;
 import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
 import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -26,7 +23,7 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import rx.Observable;
 import rx.functions.Action1;
 import rx.functions.Func1;
-import rx.functions.Func5;
+import rx.functions.Func4;
 
 
 /**
@@ -38,7 +35,7 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
 
     private final EdgeSerialization edgeSerialization;
     private final EdgeMetadataSerialization edgeMetadataSerialization;
-    private final EdgeManagerFactory edgeManagerFactory;
+    private final GraphManagerFactory graphManagerFactory;
     private final Keyspace keyspace;
     private final GraphFig graphFig;
 
@@ -46,12 +43,11 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
     @Inject
     public EdgeDeleteListener( final EdgeSerialization edgeSerialization,
                                final EdgeMetadataSerialization edgeMetadataSerialization,
-                               final EdgeManagerFactory edgeManagerFactory, final Keyspace keyspace,
-                               final GraphFig graphFig,
-                               @EdgeDelete final AsyncProcessor edgeDelete ) {
+                               final GraphManagerFactory graphManagerFactory, final Keyspace keyspace,
+                               @EdgeDelete final AsyncProcessor edgeDelete, final GraphFig graphFig ) {
         this.edgeSerialization = edgeSerialization;
         this.edgeMetadataSerialization = edgeMetadataSerialization;
-        this.edgeManagerFactory = edgeManagerFactory;
+        this.graphManagerFactory = graphManagerFactory;
         this.keyspace = keyspace;
         this.graphFig = graphFig;
 
@@ -65,7 +61,7 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
         final Edge edge = delete.getData();
         final OrganizationScope scope = delete.getOrganizationScope();
         final UUID maxVersion = edge.getVersion();
-        final EdgeManager edgeManager = edgeManagerFactory.createEdgeManager( scope );
+        final GraphManager graphManager = graphManagerFactory.createEdgeManager( scope );
 
 
         return Observable.from( edge ).flatMap( new Func1<Edge, Observable<MutationBatch>>() {
@@ -75,26 +71,26 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
                 final MutationBatch batch = keyspace.prepareMutationBatch();
 
 
-                //go through every version of this edge <= the current version and remove it
-                Observable<MarkedEdge> edges = Observable.create( new ObservableIterator<MarkedEdge>( ) {
-                    @Override
-                    protected Iterator<MarkedEdge> getIterator() {
-                        return edgeSerialization.getEdgeToTarget( scope,
-                                new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
-                                        edge.getVersion(), null ) );
-                    }
-                } ).doOnNext( new Action1<MarkedEdge>() {
-                    @Override
-                    public void call( final MarkedEdge markedEdge ) {
-                        final MutationBatch delete = edgeSerialization.deleteEdge( scope, markedEdge );
-                        batch.mergeShallow( delete );
-                    }
-                } );
-
+//             TODO T.N. no longer needed since each version is explicity deleted
+//                //go through every version of this edge <= the current version and remove it
+//                Observable<MarkedEdge> edges = Observable.create( new ObservableIterator<MarkedEdge>() {
+//                    @Override
+//                    protected Iterator<MarkedEdge> getIterator() {
+//                        return edgeSerialization.getEdgeVersions( scope,
+//                                new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
+//                                        edge.getVersion(), null ) );
+//                    }
+//                } ).doOnNext( new Action1<MarkedEdge>() {
+//                    @Override
+//                    public void call( final MarkedEdge markedEdge ) {
+//                        final MutationBatch delete = edgeSerialization.deleteEdge( scope, markedEdge );
+//                        batch.mergeShallow( delete );
+//                    }
+//                } );
 
                 //search by edge type and target type.  If any other edges with this target type exist,
                 // we can't delete it
-                Observable<Integer> sourceIdType = edgeManager.loadEdgesFromSourceByType(
+                Observable<Integer> sourceIdType = graphManager.loadEdgesFromSourceByType(
                         new SimpleSearchByIdType( edge.getSourceNode(), edge.getType(), maxVersion,
                                 edge.getTargetNode().getType(), null ) ).take( 2 ).count()
                                                               .doOnNext( new Action1<Integer>() {
@@ -115,7 +111,7 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
                                                               } );
 
 
-                Observable<Integer> targetIdType = edgeManager.loadEdgesToTargetByType(
+                Observable<Integer> targetIdType = graphManager.loadEdgesToTargetByType(
                         new SimpleSearchByIdType( edge.getTargetNode(), edge.getType(), maxVersion,
                                 edge.getSourceNode().getType(), null ) ).take( 2 ).count()
                                                               .doOnNext( new Action1<Integer>() {
@@ -138,7 +134,7 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
 
                 //search by edge type and target type.  If any other edges with this target type exist,
                 // we can't delete it
-                Observable<Integer> sourceType = edgeManager.loadEdgesFromSource(
+                Observable<Integer> sourceType = graphManager.loadEdgesFromSource(
                         new SimpleSearchByEdgeType( edge.getSourceNode(), edge.getType(), maxVersion, null ) ).take( 2 )
                                                             .count().doOnNext( new Action1<Integer>() {
                             @Override
@@ -156,7 +152,7 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
                         } );
 
 
-                Observable<Integer> targetType = edgeManager.loadEdgesToTarget(
+                Observable<Integer> targetType = graphManager.loadEdgesToTarget(
                         new SimpleSearchByEdgeType( edge.getTargetNode(), edge.getType(), maxVersion, null ) ).take( 2 )
                                                             .count().doOnNext( new Action1<Integer>() {
                             @Override
@@ -175,10 +171,10 @@ public class EdgeDeleteListener implements MessageListener<EdgeEvent<Edge>, Edge
 
 
                 //no op, just wait for each observable to populate the mutation before returning it
-                return Observable.zip( edges, sourceIdType, targetIdType, sourceType, targetType,
-                        new Func5<MarkedEdge, Integer, Integer, Integer, Integer, MutationBatch>() {
+                return Observable.zip(sourceIdType, targetIdType, sourceType, targetType,
+                        new Func4<Integer, Integer, Integer, Integer, MutationBatch>() {
                             @Override
-                            public MutationBatch call( final MarkedEdge markedEdge, final Integer integer,
+                            public MutationBatch call( final Integer integer,
                                                        final Integer integer2, final Integer integer3,
                                                        final Integer integer4 ) {
                                 return batch;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
deleted file mode 100644
index 4e5746a..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
+++ /dev/null
@@ -1,410 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.impl;
-
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.collection.mvcc.entity.ValidationUtils;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.EdgeManager;
-import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.SearchByIdType;
-import org.apache.usergrid.persistence.graph.SearchEdgeType;
-import org.apache.usergrid.persistence.graph.SearchIdType;
-import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
-import org.apache.usergrid.persistence.graph.consistency.AsynchronousMessage;
-import org.apache.usergrid.persistence.graph.guice.EdgeDelete;
-import org.apache.usergrid.persistence.graph.guice.EdgeWrite;
-import org.apache.usergrid.persistence.graph.guice.NodeDelete;
-import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
-import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
-import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
-import org.apache.usergrid.persistence.graph.serialization.util.EdgeUtils;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import rx.Observable;
-import rx.Scheduler;
-import rx.functions.Func1;
-import rx.schedulers.Schedulers;
-
-
-/**
- *
- *
- */
-public class EdgeManagerImpl implements EdgeManager {
-
-
-    private final OrganizationScope scope;
-
-    private final EdgeMetadataSerialization edgeMetadataSerialization;
-
-
-    private final EdgeSerialization edgeSerialization;
-
-    private final NodeSerialization nodeSerialization;
-
-    private final AsyncProcessor<Edge> edgeWriteAsyncProcessor;
-    private final AsyncProcessor<Edge> edgeDeleteAsyncProcessor;
-    private final AsyncProcessor<Id> nodeDeleteAsyncProcessor;
-
-    private final GraphFig graphFig;
-
-
-    @Inject
-    public EdgeManagerImpl( final EdgeMetadataSerialization edgeMetadataSerialization,
-                            final EdgeSerialization edgeSerialization, final NodeSerialization nodeSerialization,
-                            final GraphFig graphFig, @EdgeWrite final AsyncProcessor edgeWrite,
-                            @EdgeDelete final AsyncProcessor edgeDelete, @NodeDelete final AsyncProcessor nodeDelete,
-                            @Assisted final OrganizationScope scope ) {
-
-        ValidationUtils.validateOrganizationScope( scope );
-
-
-        this.scope = scope;
-        this.edgeMetadataSerialization = edgeMetadataSerialization;
-        this.edgeSerialization = edgeSerialization;
-        this.nodeSerialization = nodeSerialization;
-        this.graphFig = graphFig;
-
-
-        this.edgeWriteAsyncProcessor = edgeWrite;
-
-
-        this.edgeDeleteAsyncProcessor = edgeDelete;
-
-
-        this.nodeDeleteAsyncProcessor = nodeDelete;
-    }
-
-
-    @Override
-    public Observable<Edge> writeEdge( final Edge edge ) {
-        EdgeUtils.validateEdge( edge );
-
-        return Observable.from( edge ).subscribeOn(  Schedulers.io() ).map( new Func1<Edge, Edge>() {
-            @Override
-            public Edge call( final Edge edge ) {
-                final MutationBatch mutation = edgeMetadataSerialization.writeEdge( scope, edge );
-
-                final MutationBatch edgeMutation = edgeSerialization.writeEdge( scope, edge );
-
-                mutation.mergeShallow( edgeMutation );
-
-                final AsynchronousMessage<Edge> event = edgeWriteAsyncProcessor.setVerification( edge, getTimeout() );
-
-                try {
-                    mutation.execute();
-                }
-                catch ( ConnectionException e ) {
-                    throw new RuntimeException( "Unable to connect to cassandra", e );
-                }
-
-                edgeWriteAsyncProcessor.start( event );
-
-                return edge;
-            }
-        } );
-    }
-
-
-    @Override
-    public Observable<Edge> deleteEdge( final Edge edge ) {
-        EdgeUtils.validateEdge( edge );
-
-        return Observable.from( edge ).subscribeOn(  Schedulers.io() ).map( new Func1<Edge, Edge>() {
-            @Override
-            public Edge call( final Edge edge ) {
-                final MutationBatch edgeMutation = edgeSerialization.markEdge( scope, edge );
-
-                final AsynchronousMessage<Edge> event = edgeDeleteAsyncProcessor.setVerification( edge, getTimeout() );
-
-
-                try {
-                    edgeMutation.execute();
-                }
-                catch ( ConnectionException e ) {
-                    throw new RuntimeException( "Unable to connect to cassandra", e );
-                }
-
-                edgeDeleteAsyncProcessor.start( event );
-
-
-                return edge;
-            }
-        } );
-    }
-
-
-    @Override
-    public Observable<Id> deleteNode( final Id node ) {
-        return Observable.from( node ).subscribeOn(  Schedulers.io() ).map( new Func1<Id, Id>() {
-            @Override
-            public Id call( final Id id ) {
-
-                //mark the node as deleted
-                final UUID deleteTime = UUIDGenerator.newTimeUUID();
-
-                final MutationBatch nodeMutation = nodeSerialization.mark( scope, id, deleteTime );
-
-                final AsynchronousMessage<Id> event = nodeDeleteAsyncProcessor.setVerification( node, getTimeout() );
-
-
-                try {
-                    nodeMutation.execute();
-                }
-                catch ( ConnectionException e ) {
-                    throw new RuntimeException( "Unable to connect to cassandra", e );
-                }
-
-                nodeDeleteAsyncProcessor.start( event );
-
-                return id;
-            }
-        } );
-    }
-
-
-    @Override
-    public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) {
-
-
-        return
-
-                Observable.create( new ObservableIterator<MarkedEdge>() {
-                    @Override
-                    protected Iterator<MarkedEdge> getIterator() {
-                        return edgeSerialization.getEdgesFromSource( scope, search );
-                    }
-                } )//we intentionally use distinct until changed.  This way we won't store all the keys since this
-                        //would hog far too much memory.
-                        .distinctUntilChanged( new Func1<Edge, Id>() {
-                            @Override
-                            public Id call( final Edge edge ) {
-                                return edge.getTargetNode();
-                            }
-                        } ).buffer( graphFig.getScanPageSize() )
-                        .flatMap( new EdgeBufferFilter( search.getMaxVersion() ) ).cast( Edge.class );
-    }
-
-
-    @Override
-    public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) {
-        return Observable.create( new ObservableIterator<MarkedEdge>() {
-            @Override
-            protected Iterator<MarkedEdge> getIterator() {
-                return edgeSerialization.getEdgesToTarget( scope, search );
-            }
-        } )
-                //we intentionally use distinct until changed.  This way we won't store all the keys since this
-                //would hog far too much memory.
-                .distinctUntilChanged( new Func1<Edge, Id>() {
-                    @Override
-                    public Id call( final Edge edge ) {
-                        return edge.getSourceNode();
-                    }
-                } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( search.getMaxVersion() ) )
-                .cast( Edge.class );
-    }
-
-
-    @Override
-    public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) {
-        return Observable.create( new ObservableIterator<MarkedEdge>() {
-            @Override
-            protected Iterator<MarkedEdge> getIterator() {
-                return edgeSerialization.getEdgesFromSourceByTargetType( scope, search );
-            }
-        } ).distinctUntilChanged( new Func1<Edge, Id>() {
-            @Override
-            public Id call( final Edge edge ) {
-                return edge.getTargetNode();
-            }
-        } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( search.getMaxVersion() ) )
-
-                         .cast( Edge.class );
-    }
-
-
-    @Override
-    public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) {
-        return Observable.create( new ObservableIterator<MarkedEdge>() {
-            @Override
-            protected Iterator<MarkedEdge> getIterator() {
-                return edgeSerialization.getEdgesToTargetBySourceType( scope, search );
-            }
-        } ).distinctUntilChanged( new Func1<Edge, Id>() {
-            @Override
-            public Id call( final Edge edge ) {
-                return edge.getSourceNode();
-            }
-        } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( search.getMaxVersion() ) )
-                         .cast( Edge.class );
-    }
-
-
-    @Override
-    public Observable<String> getEdgeTypesFromSource( final SearchEdgeType search ) {
-
-        return Observable.create( new ObservableIterator<String>() {
-            @Override
-            protected Iterator<String> getIterator() {
-                return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
-            }
-        } );
-    }
-
-
-    @Override
-    public Observable<String> getIdTypesFromSource( final SearchIdType search ) {
-        return Observable.create( new ObservableIterator<String>() {
-            @Override
-            protected Iterator<String> getIterator() {
-                return edgeMetadataSerialization.getIdTypesFromSource( scope, search );
-            }
-        } );
-    }
-
-
-    @Override
-    public Observable<String> getEdgeTypesToTarget( final SearchEdgeType search ) {
-
-        return Observable.create( new ObservableIterator<String>() {
-            @Override
-            protected Iterator<String> getIterator() {
-                return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
-            }
-        } );
-    }
-
-
-    @Override
-    public Observable<String> getIdTypesToTarget( final SearchIdType search ) {
-        return Observable.create( new ObservableIterator<String>() {
-            @Override
-            protected Iterator<String> getIterator() {
-                return edgeMetadataSerialization.getIdTypesToTarget( scope, search );
-            }
-        } );
-    }
-
-
-    /**
-     * Get our timeout for write consistency
-     */
-    private long getTimeout() {
-        return graphFig.getRepairTimeout() * 2;
-    }
-
-
-    /**
-     * Helper filter to perform mapping and return an observable of pre-filtered edges
-     */
-    private class EdgeBufferFilter implements Func1<List<MarkedEdge>, Observable<MarkedEdge>> {
-
-        private final UUID maxVersion;
-
-
-        private EdgeBufferFilter( final UUID maxVersion ) {
-            this.maxVersion = maxVersion;
-        }
-
-
-        /**
-         * Takes a buffered list of marked edges.  It then does a single round trip to fetch marked ids These are then
-         * used in conjunction with the max version filter to filter any edges that should not be returned
-         *
-         * @return An observable that emits only edges that can be consumed.  There could be multiple versions of the
-         *         same edge so those need de-duped.
-         */
-        @Override
-        public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
-
-            final Map<Id, UUID> markedVersions = nodeSerialization.getMaxVersions( scope, markedEdges );
-            return Observable.from( markedEdges ).subscribeOn(  Schedulers.io() )
-                             .filter( new EdgeFilter( this.maxVersion, markedVersions ) );
-        }
-    }
-
-
-    /**
-     * Filter the returned values based on the max uuid and if it's been marked for deletion or not
-     */
-    private static class EdgeFilter implements Func1<MarkedEdge, Boolean> {
-
-        private final UUID maxVersion;
-
-        private final Map<Id, UUID> markCache;
-
-
-        private EdgeFilter( final UUID maxVersion, Map<Id, UUID> markCache ) {
-            this.maxVersion = maxVersion;
-            this.markCache = markCache;
-        }
-
-
-        @Override
-        public Boolean call( final MarkedEdge edge ) {
-
-
-            final UUID edgeVersion = edge.getVersion();
-
-            //our edge needs to not be deleted and have a version that's > max Version
-            if ( edge.isDeleted() || UUIDComparator.staticCompare( edgeVersion, maxVersion ) > 0 ) {
-                return false;
-            }
-
-
-            final UUID sourceVersion = markCache.get( edge.getSourceNode() );
-
-            //the source Id has been marked for deletion.  It's version is <= to the marked version for deletion,
-            // so we need to discard it
-            if ( sourceVersion != null && UUIDComparator.staticCompare( edgeVersion, sourceVersion ) < 1 ) {
-                return false;
-            }
-
-            final UUID targetVersion = markCache.get( edge.getTargetNode() );
-
-            //the target Id has been marked for deletion.  It's version is <= to the marked version for deletion,
-            // so we need to discard it
-            if ( targetVersion != null && UUIDComparator.staticCompare( edgeVersion, targetVersion ) < 1 ) {
-                return false;
-            }
-
-
-            return true;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
index e146ba9..90146dc 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.usergrid.persistence.graph.impl;
 
 
@@ -52,52 +71,55 @@ public class EdgeWriteListener implements MessageListener<EdgeEvent<Edge>, EdgeE
         final OrganizationScope scope = write.getOrganizationScope();
         final UUID maxVersion = edge.getVersion();
 
-        return Observable.create( new ObservableIterator<MarkedEdge>(  ) {
-            @Override
-            protected Iterator<MarkedEdge> getIterator() {
-
-                final SimpleSearchByEdge search =
-                        new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), maxVersion,
-                                null );
-
-                return edgeSerialization.getEdgeFromSource( scope, search );
-            }
-        } ).filter( new Func1<MarkedEdge, Boolean>() {
-
-            //TODO, reuse this for delete operation
-
-
-            /**
-             * We only want to return edges < this version so we remove them
-             * @param markedEdge
-             * @return
-             */
-            @Override
-            public Boolean call( final MarkedEdge markedEdge ) {
-                return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 0;
-            }
-            //buffer the deletes and issue them in a single mutation
-        } ).buffer( graphFig.getScanPageSize() ).map( new Func1<List<MarkedEdge>, EdgeEvent<Edge>>() {
-            @Override
-            public EdgeEvent<Edge> call( final List<MarkedEdge> markedEdges ) {
-
-                final MutationBatch batch = keyspace.prepareMutationBatch();
-
-                for ( MarkedEdge edge : markedEdges ) {
-                    final MutationBatch delete = edgeSerialization.deleteEdge( scope, edge );
-
-                    batch.mergeShallow( delete );
-                }
-
-                try {
-                    batch.execute();
-                }
-                catch ( ConnectionException e ) {
-                    throw new RuntimeException( "Unable to issue write to cassandra", e );
-                }
-
-                return write;
-            }
-        } );
+        return Observable.empty();
+
+//      TODO T.N, some async processing for balancing here
+//  return Observable.create( new ObservableIterator<MarkedEdge>() {
+//            @Override
+//            protected Iterator<MarkedEdge> getIterator() {
+//
+//                final SimpleSearchByEdge search =
+//                        new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), maxVersion,
+//                                null );
+//
+//                return edgeSerialization.getEdgeVersions( scope, search );
+//            }
+//        } ).filter( new Func1<MarkedEdge, Boolean>() {
+//
+//            //TODO, reuse this for delete operation
+//
+//
+//            /**
+//             * We only want to return edges < this version so we remove them
+//             * @param markedEdge
+//             * @return
+//             */
+//            @Override
+//            public Boolean call( final MarkedEdge markedEdge ) {
+//                return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 0;
+//            }
+//            //buffer the deletes and issue them in a single mutation
+//        } ).buffer( graphFig.getScanPageSize() ).map( new Func1<List<MarkedEdge>, EdgeEvent<Edge>>() {
+//            @Override
+//            public EdgeEvent<Edge> call( final List<MarkedEdge> markedEdges ) {
+//
+//                final MutationBatch batch = keyspace.prepareMutationBatch();
+//
+//                for ( MarkedEdge edge : markedEdges ) {
+//                    final MutationBatch delete = edgeSerialization.deleteEdge( scope, edge );
+//
+//                    batch.mergeShallow( delete );
+//                }
+//
+//                try {
+//                    batch.execute();
+//                }
+//                catch ( ConnectionException e ) {
+//                    throw new RuntimeException( "Unable to issue write to cassandra", e );
+//                }
+//
+//                return write;
+//            }
+//        } );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
new file mode 100644
index 0000000..94d4dc3
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.impl;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.OrganizationScope;
+import org.apache.usergrid.persistence.collection.mvcc.entity.ValidationUtils;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.SearchByIdType;
+import org.apache.usergrid.persistence.graph.SearchEdgeType;
+import org.apache.usergrid.persistence.graph.SearchIdType;
+import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
+import org.apache.usergrid.persistence.graph.consistency.AsynchronousMessage;
+import org.apache.usergrid.persistence.graph.guice.EdgeDelete;
+import org.apache.usergrid.persistence.graph.guice.EdgeWrite;
+import org.apache.usergrid.persistence.graph.guice.NodeDelete;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
+import org.apache.usergrid.persistence.graph.serialization.util.EdgeUtils;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+
+/**
+ * Implementation of graph edges
+ */
+public class GraphManagerImpl implements GraphManager {
+
+
+    private final OrganizationScope scope;
+
+    private final EdgeMetadataSerialization edgeMetadataSerialization;
+
+
+    private final EdgeSerialization edgeSerialization;
+
+    private final NodeSerialization nodeSerialization;
+
+    private final AsyncProcessor<Edge> edgeWriteAsyncProcessor;
+    private final AsyncProcessor<Edge> edgeDeleteAsyncProcessor;
+    private final AsyncProcessor<Id> nodeDeleteAsyncProcessor;
+
+    private final GraphFig graphFig;
+
+
+    @Inject
+    public GraphManagerImpl( final EdgeMetadataSerialization edgeMetadataSerialization,
+                            final EdgeSerialization edgeSerialization, final NodeSerialization nodeSerialization,
+                            final GraphFig graphFig, @EdgeWrite final AsyncProcessor edgeWrite,
+                            @EdgeDelete final AsyncProcessor edgeDelete, @NodeDelete final AsyncProcessor nodeDelete,
+                            @Assisted final OrganizationScope scope ) {
+
+        ValidationUtils.validateOrganizationScope( scope );
+
+
+        this.scope = scope;
+        this.edgeMetadataSerialization = edgeMetadataSerialization;
+        this.edgeSerialization = edgeSerialization;
+        this.nodeSerialization = nodeSerialization;
+        this.graphFig = graphFig;
+
+
+        this.edgeWriteAsyncProcessor = edgeWrite;
+
+
+        this.edgeDeleteAsyncProcessor = edgeDelete;
+
+
+        this.nodeDeleteAsyncProcessor = nodeDelete;
+    }
+
+
+    @Override
+    public Observable<Edge> writeEdge( final Edge edge ) {
+        EdgeUtils.validateEdge( edge );
+
+        return Observable.from( edge ).subscribeOn(  Schedulers.io() ).map( new Func1<Edge, Edge>() {
+            @Override
+            public Edge call( final Edge edge ) {
+                final MutationBatch mutation = edgeMetadataSerialization.writeEdge( scope, edge );
+
+                final MutationBatch edgeMutation = edgeSerialization.writeEdge( scope, edge );
+
+                mutation.mergeShallow( edgeMutation );
+
+                final AsynchronousMessage<Edge> event = edgeWriteAsyncProcessor.setVerification( edge, getTimeout() );
+
+                try {
+                    mutation.execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to connect to cassandra", e );
+                }
+
+                edgeWriteAsyncProcessor.start( event );
+
+                return edge;
+            }
+        } );
+    }
+
+
+    @Override
+    public Observable<Edge> deleteEdge( final Edge edge ) {
+        EdgeUtils.validateEdge( edge );
+
+        return Observable.from( edge ).subscribeOn(  Schedulers.io() ).map( new Func1<Edge, Edge>() {
+            @Override
+            public Edge call( final Edge edge ) {
+                final MutationBatch edgeMutation = edgeSerialization.markEdge( scope, edge );
+
+                final AsynchronousMessage<Edge> event = edgeDeleteAsyncProcessor.setVerification( edge, getTimeout() );
+
+
+                try {
+                    edgeMutation.execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to connect to cassandra", e );
+                }
+
+                edgeDeleteAsyncProcessor.start( event );
+
+
+                return edge;
+            }
+        } );
+    }
+
+
+    @Override
+    public Observable<Id> deleteNode( final Id node ) {
+        return Observable.from( node ).subscribeOn(  Schedulers.io() ).map( new Func1<Id, Id>() {
+            @Override
+            public Id call( final Id id ) {
+
+                //mark the node as deleted
+                final UUID deleteTime = UUIDGenerator.newTimeUUID();
+
+                final MutationBatch nodeMutation = nodeSerialization.mark( scope, id, deleteTime );
+
+                final AsynchronousMessage<Id> event = nodeDeleteAsyncProcessor.setVerification( node, getTimeout() );
+
+
+                try {
+                    nodeMutation.execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to connect to cassandra", e );
+                }
+
+                nodeDeleteAsyncProcessor.start( event );
+
+                return id;
+            }
+        } );
+    }
+
+
+    @Override
+    public Observable<Edge> loadEdgeVersions( final SearchByEdge searchByEdge ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>() {
+            @Override
+            protected Iterator<MarkedEdge> getIterator() {
+                return edgeSerialization.getEdgeVersions( scope, searchByEdge );
+            }
+        } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( searchByEdge.getMaxVersion() ) )
+                         .cast( Edge.class );
+    }
+
+
+    @Override
+    public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>() {
+            @Override
+            protected Iterator<MarkedEdge> getIterator() {
+                return edgeSerialization.getEdgesFromSource( scope, search );
+            }
+        } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( search.getMaxVersion() ) )
+                         .cast( Edge.class );
+    }
+
+
+    @Override
+    public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>() {
+            @Override
+            protected Iterator<MarkedEdge> getIterator() {
+                return edgeSerialization.getEdgesToTarget( scope, search );
+            }
+        } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( search.getMaxVersion() ) )
+                         .cast( Edge.class );
+    }
+
+
+    @Override
+    public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>() {
+            @Override
+            protected Iterator<MarkedEdge> getIterator() {
+                return edgeSerialization.getEdgesFromSourceByTargetType( scope, search );
+            }
+        } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( search.getMaxVersion() ) )
+
+                         .cast( Edge.class );
+    }
+
+
+    @Override
+    public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) {
+        return Observable.create( new ObservableIterator<MarkedEdge>() {
+            @Override
+            protected Iterator<MarkedEdge> getIterator() {
+                return edgeSerialization.getEdgesToTargetBySourceType( scope, search );
+            }
+        } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( search.getMaxVersion() ) )
+                         .cast( Edge.class );
+    }
+
+
+    @Override
+    public Observable<String> getEdgeTypesFromSource( final SearchEdgeType search ) {
+
+        return Observable.create( new ObservableIterator<String>() {
+            @Override
+            protected Iterator<String> getIterator() {
+                return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
+            }
+        } );
+    }
+
+
+    @Override
+    public Observable<String> getIdTypesFromSource( final SearchIdType search ) {
+        return Observable.create( new ObservableIterator<String>() {
+            @Override
+            protected Iterator<String> getIterator() {
+                return edgeMetadataSerialization.getIdTypesFromSource( scope, search );
+            }
+        } );
+    }
+
+
+    @Override
+    public Observable<String> getEdgeTypesToTarget( final SearchEdgeType search ) {
+
+        return Observable.create( new ObservableIterator<String>() {
+            @Override
+            protected Iterator<String> getIterator() {
+                return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
+            }
+        } );
+    }
+
+
+    @Override
+    public Observable<String> getIdTypesToTarget( final SearchIdType search ) {
+        return Observable.create( new ObservableIterator<String>() {
+            @Override
+            protected Iterator<String> getIterator() {
+                return edgeMetadataSerialization.getIdTypesToTarget( scope, search );
+            }
+        } );
+    }
+
+
+    /**
+     * Get our timeout for write consistency
+     */
+    private long getTimeout() {
+        return graphFig.getRepairTimeout() * 2;
+    }
+
+
+    /**
+     * Helper filter to perform mapping and return an observable of pre-filtered edges
+     */
+    private class EdgeBufferFilter implements Func1<List<MarkedEdge>, Observable<MarkedEdge>> {
+
+        private final UUID maxVersion;
+
+
+        private EdgeBufferFilter( final UUID maxVersion ) {
+            this.maxVersion = maxVersion;
+        }
+
+
+        /**
+         * Takes a buffered list of marked edges.  It then does a single round trip to fetch marked ids These are then
+         * used in conjunction with the max version filter to filter any edges that should not be returned
+         *
+         * @return An observable that emits only edges that can be consumed.  There could be multiple versions of the
+         *         same edge so those need de-duped.
+         */
+        @Override
+        public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
+
+            final Map<Id, UUID> markedVersions = nodeSerialization.getMaxVersions( scope, markedEdges );
+            return Observable.from( markedEdges )
+                             .filter( new EdgeFilter( this.maxVersion, markedVersions ) );
+        }
+    }
+
+
+    /**
+     * Filter the returned values based on the max uuid and if it's been marked for deletion or not
+     */
+    private static class EdgeFilter implements Func1<MarkedEdge, Boolean> {
+
+        private final UUID maxVersion;
+
+        private final Map<Id, UUID> markCache;
+
+
+        private EdgeFilter( final UUID maxVersion, Map<Id, UUID> markCache ) {
+            this.maxVersion = maxVersion;
+            this.markCache = markCache;
+        }
+
+
+        @Override
+        public Boolean call( final MarkedEdge edge ) {
+
+
+            final UUID edgeVersion = edge.getVersion();
+
+            //our edge needs to not be deleted and have a version that's > max Version
+            if ( edge.isDeleted() || UUIDComparator.staticCompare( edgeVersion, maxVersion ) > 0 ) {
+                return false;
+            }
+
+
+            final UUID sourceVersion = markCache.get( edge.getSourceNode() );
+
+            //the source Id has been marked for deletion.  It's version is <= to the marked version for deletion,
+            // so we need to discard it
+            if ( sourceVersion != null && UUIDComparator.staticCompare( edgeVersion, sourceVersion ) < 1 ) {
+                return false;
+            }
+
+            final UUID targetVersion = markCache.get( edge.getTargetNode() );
+
+            //the target Id has been marked for deletion.  It's version is <= to the marked version for deletion,
+            // so we need to discard it
+            if ( targetVersion != null && UUIDComparator.staticCompare( edgeVersion, targetVersion ) < 1 ) {
+                return false;
+            }
+
+
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
index 190bbff..4ba0142 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -2,11 +2,14 @@ package org.apache.usergrid.persistence.graph.impl;
 
 
 import java.util.Iterator;
+import java.util.List;
 import java.util.UUID;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.thrift.Mutation;
+
 import org.apache.usergrid.persistence.collection.OrganizationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphFig;
@@ -26,11 +29,13 @@ import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Optional;
 import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
-import rx.Scheduler;
 import rx.functions.Action0;
+import rx.functions.Action1;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
@@ -48,16 +53,30 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
     private final EdgeMetadataSerialization edgeMetadataSerialization;
     private final EdgeDeleteRepair edgeDeleteRepair;
     private final EdgeMetaRepair edgeMetaRepair;
+<<<<<<< Updated upstream
+=======
+    private final GraphFig graphFig;
+    protected final Keyspace keyspace;
+>>>>>>> Stashed changes
 
 
     /**
      * Wire the serialization dependencies
      */
     @Inject
+<<<<<<< Updated upstream
     public NodeDeleteListener( final NodeSerialization nodeSerialization, final EdgeSerialization edgeSerialization, @NodeDelete final AsyncProcessor nodeDelete,
                                final EdgeMetadataSerialization edgeMetadataSerialization,
                                final EdgeDeleteRepair edgeDeleteRepair, final EdgeMetaRepair edgeMetaRepair
                                 ) {
+=======
+    public NodeDeleteListener( final NodeSerialization nodeSerialization, final EdgeSerialization edgeSerialization,
+
+                               final EdgeMetadataSerialization edgeMetadataSerialization,
+                               final EdgeDeleteRepair edgeDeleteRepair, final EdgeMetaRepair edgeMetaRepair,
+                               final GraphFig graphFig, @NodeDelete final AsyncProcessor nodeDelete,
+                               final Keyspace keyspace ) {
+>>>>>>> Stashed changes
 
 
         this.nodeSerialization = nodeSerialization;
@@ -65,6 +84,8 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
         this.edgeMetadataSerialization = edgeMetadataSerialization;
         this.edgeDeleteRepair = edgeDeleteRepair;
         this.edgeMetaRepair = edgeMetaRepair;
+        this.graphFig = graphFig;
+        this.keyspace = keyspace;
 
         nodeDelete.addListener( this );
     }
@@ -72,8 +93,11 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
 
     /**
      * Removes this node from the graph.
+     *
      * @param edgeEvent The edge event that was fired.
-     * @return An observable that emits the total number of edges that have been removed with this node both as the target and source
+     *
+     * @return An observable that emits the total number of edges that have been removed with this node both as the
+     *         target and source
      */
     @Override
     public Observable<Integer> receive( final EdgeEvent<Id> edgeEvent ) {
@@ -112,17 +136,9 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
                                             @Override
                                             public Observable<MarkedEdge> call( final String edgeType ) {
                                                 return loadEdgesToTarget( scope,
-                                                        new SimpleSearchByEdgeType( node, edgeType, version,
-                                                                null ) );
+                                                        new SimpleSearchByEdgeType( node, edgeType, version, null ) );
                                             }
-                                        } )
-                                        //filter "old" edges, since we'll be deleting them in 1 shot
-                                        .distinctUntilChanged( new Func1<Edge, Id>() {
-                                    @Override
-                                    public Id call( final Edge edge ) {
-                                        return edge.getSourceNode();
-                                    }
-                                } );
+                                        } );
 
 
                         //get all edges pointing to the source node and buffer them into groups for deletion
@@ -132,60 +148,71 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
                                             @Override
                                             public Observable<MarkedEdge> call( final String edgeType ) {
                                                 return loadEdgesFromSource( scope,
-                                                        new SimpleSearchByEdgeType( node, edgeType, version,
-                                                                null ) );
+                                                        new SimpleSearchByEdgeType( node, edgeType, version, null ) );
                                             }
-                                        } )
-                                        //filter "old" edges, since we'll be deleting them in 1 shot
-                                        .distinctUntilChanged( new Func1<Edge, Id>() {
-                                    @Override
-                                    public Id call( final Edge edge ) {
-                                        return edge.getTargetNode();
-                                    }
-                                } );
+                                        } );
 
 
                         //each time an edge is emitted, delete it via batch mutation since we'll already be buffered
-                        return Observable.concat( targetEdges, sourceEdges );
+                        return Observable.merge( targetEdges, sourceEdges );
                     }
-                } ).flatMap( new Func1<MarkedEdge, Observable<MarkedEdge>>() {
-                    @Override
-                    public Observable<MarkedEdge> call( final MarkedEdge edge ) {
-
-                        //delete the newest edge <= the version on the node delete
-                        LOG.debug( "Deleting edge {}", edge );
-                        return edgeDeleteRepair.repair( scope, edge );
-
-
-                    }
-                } ).flatMap( new Func1<MarkedEdge, Observable<MarkedEdge>>() {
-                    @Override
-                    public Observable<MarkedEdge> call( final MarkedEdge edge ) {
+                } )
+                //buffer and delete marked edges in our buffer size
+                .buffer( graphFig.getScanPageSize() ).flatMap(
+                        new Func1<List<MarkedEdge>, Observable<MarkedEdge>>() {
+                            @Override
+                            public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
 
+                                final MutationBatch batch = keyspace.prepareMutationBatch();
 
+                                for(MarkedEdge edge: markedEdges){
 
-                        //delete both the source and target meta data in parallel for the edge we deleted in the previous step
-                        //if nothing else is using them
-                        Observable<Integer> sourceMetaRepaired =
-                                edgeMetaRepair.repairSources( scope, edge.getSourceNode(), edge.getType(), version );
+                                //delete the newest edge <= the version on the node delete
+                                    LOG.debug( "Deleting edge {}", edge );
+                                    final MutationBatch delete = edgeSerialization.deleteEdge( scope,  edge );
 
-                        Observable<Integer>  targetMetaRepaired = edgeMetaRepair.repairTargets( scope,
-                                edge.getTargetNode(), edge.getType(), version );
+                                    batch.mergeShallow( delete );
+                                }
 
-                        //sum up the number of subtypes we retain
-                        return Observable.concat(sourceMetaRepaired, targetMetaRepaired ).last().map( new Func1
-                                <Integer, MarkedEdge>() {
-                            @Override
-                            public MarkedEdge call( final Integer integer ) {
+                                try {
+                                    batch.execute();
+                                }
+                                catch ( ConnectionException e ) {
+                                    throw new RuntimeException( "Unable to delete edges", e );
+                                }
 
-                                LOG.debug( "Retained {} subtypes for edge {}", integer, edge );
-
-                                return edge;
+                                return Observable.from(markedEdges);
                             }
-                        } );
-                    }
-                } ).count()
-                //if nothing is ever emitted, emit 0 so that we know no operations took place. Finally remove the target node in the mark
+                        } )
+        .flatMap( new Func1<MarkedEdge, Observable<MarkedEdge>>() {
+            @Override
+            public Observable<MarkedEdge> call( final MarkedEdge edge ) {
+
+
+                return Observable.just( edge );
+//                //delete both the source and target meta data in parallel for the edge we deleted in the previous step
+//                //if nothing else is using them
+//                Observable<Integer> sourceMetaRepaired =
+//                        edgeMetaRepair.repairSources( scope, edge.getSourceNode(), edge.getType(), version );
+//
+//                Observable<Integer> targetMetaRepaired =
+//                        edgeMetaRepair.repairTargets( scope, edge.getTargetNode(), edge.getType(), version );
+//
+//                //sum up the number of subtypes we retain
+//                return Observable.concat( sourceMetaRepaired, targetMetaRepaired ).last()
+//                                 .map( new Func1<Integer, MarkedEdge>() {
+//                                     @Override
+//                                     public MarkedEdge call( final Integer integer ) {
+//
+//                                         LOG.debug( "Retained {} subtypes for edge {}", integer, edge );
+//
+//                                         return edge;
+//                                     }
+//                                 } );
+            }
+        } ).count()
+                //if nothing is ever emitted, emit 0 so that we know no operations took place. Finally remove the
+                // target node in the mark
                 .defaultIfEmpty( 0 ).doOnCompleted( new Action0() {
                     @Override
                     public void call() {
@@ -193,7 +220,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
                             nodeSerialization.delete( scope, node, version ).execute();
                         }
                         catch ( ConnectionException e ) {
-                            throw new RuntimeException("Unable to delete marked graph node " + node, e);
+                            throw new RuntimeException( "Unable to delete marked graph node " + node, e );
                         }
                     }
                 } );
@@ -254,6 +281,4 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
             }
         } ).subscribeOn( Schedulers.io() );
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/136edaba/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
index 3ff7178..310fa1f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
@@ -77,15 +77,13 @@ public abstract class AbstractEdgeRepair  {
         final UUID maxVersion = edge.getVersion();
 
         //get source edges
-        Observable<MarkedEdge> sourceEdges = getEdgeVersionsFromSource( scope, edge );
+        Observable<MarkedEdge> edgeVersions = getEdgeVersions( scope, edge );
 
-        //get target edges
-        Observable<MarkedEdge> targetEdges = getEdgeVersionsToTarget( scope, edge );
 
 
 
         //merge source and target then deal with the distinct values
-        return Observable.merge( sourceEdges, targetEdges ).filter( getFilter( maxVersion ) ).distinctUntilChanged().buffer( graphFig.getScanPageSize() )
+        return edgeVersions.filter( getFilter( maxVersion ) ).buffer( graphFig.getScanPageSize() )
                          .flatMap( new Func1<List<MarkedEdge>, Observable<MarkedEdge>>() {
                              @Override
                              public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
@@ -122,7 +120,7 @@ public abstract class AbstractEdgeRepair  {
     /**
      * Get all edge versions <= the specified max from the source
      */
-    private Observable<MarkedEdge> getEdgeVersionsFromSource( final OrganizationScope scope, final Edge edge ) {
+    private Observable<MarkedEdge> getEdgeVersions( final OrganizationScope scope, final Edge edge ) {
 
         return Observable.create( new ObservableIterator<MarkedEdge>(  ) {
             @Override
@@ -130,24 +128,7 @@ public abstract class AbstractEdgeRepair  {
 
                 final SimpleSearchByEdge search = getSearchByEdge(edge);
 
-                return edgeSerialization.getEdgeFromSource( scope, search );
-            }
-        } ).subscribeOn( Schedulers.io() );
-    }
-
-
-    /**
-     * Get all edge versions <= the specified max from the source
-     */
-    private Observable<MarkedEdge> getEdgeVersionsToTarget( final OrganizationScope scope, final Edge edge ) {
-
-        return Observable.create( new ObservableIterator<MarkedEdge>(  ) {
-            @Override
-            protected Iterator<MarkedEdge> getIterator() {
-
-                final SimpleSearchByEdge search = getSearchByEdge(edge);
-
-                return edgeSerialization.getEdgeToTarget( scope, search );
+                return edgeSerialization.getEdgeVersions( scope, search );
             }
         } ).subscribeOn( Schedulers.io() );
     }


Mime
View raw message