usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mru...@apache.org
Subject usergrid git commit: Cleanup duplicate edges at the end of addToCollection and filter duplicate edges in our graph results.
Date Thu, 21 Jan 2016 20:49:58 GMT
Repository: usergrid
Updated Branches:
  refs/heads/release f9028b24a -> 62ad5a840


Cleanup duplicate edges at the end of addToCollection and filter duplicate edges in our graph
results.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/62ad5a84
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/62ad5a84
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/62ad5a84

Branch: refs/heads/release
Commit: 62ad5a8403e4894215a5c9ff2ff6ad5e96de024f
Parents: f9028b2
Author: Michael Russo <michaelarusso@gmail.com>
Authored: Thu Jan 21 12:49:25 2016 -0800
Committer: Michael Russo <michaelarusso@gmail.com>
Committed: Thu Jan 21 12:49:25 2016 -0800

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      | 53 +++++++++++---------
 .../read/traverse/AbstractReadGraphFilter.java  | 37 +++++++++++++-
 .../usergrid/persistence/EntityManagerIT.java   | 33 ++++++++++++
 .../graph/impl/stage/EdgeDeleteRepairImpl.java  | 18 +++----
 4 files changed, 105 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/62ad5a84/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index b4cabc4..415fb5e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -409,7 +409,9 @@ public class CpRelationManager implements RelationManager {
             }
         } ).toBlocking().lastOrDefault( null );
 
-        //check if we need to reverse our edges
+
+        // remove any duplicate edges (keeps the duplicate edge with same timestamp)
+        removeDuplicateEdgesAsync(gm, edge);
 
 
         if ( logger.isDebugEnabled() ) {
@@ -696,28 +698,8 @@ public class CpRelationManager implements RelationManager {
         indexService.queueNewEdge( applicationScope, targetEntity, edge );
 
 
-        //now read all older versions of an edge, and remove them.  Finally calling delete
-        final SearchByEdge searchByEdge =
-            new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
Long.MAX_VALUE,
-                SearchByEdgeType.Order.DESCENDING, Optional.absent() );
-
-
-        //load our versions, only retain the most recent one
-        gm.loadEdgeVersions(searchByEdge).skip(1).flatMap(edgeToDelete -> {
-            if (logger.isDebugEnabled()) {
-                logger.debug("Marking edge {} for deletion", edgeToDelete);
-            }
-            return gm.markEdge(edgeToDelete );
-        }).lastOrDefault(null).doOnNext(lastEdge -> {
-            //no op if we hit our default
-            if (lastEdge == null) {
-                return;
-            }
-
-            //don't queue delete b/c that de-indexes, we need to delete the edges only since
we have a version still existing to index.
-
-            gm.deleteEdge(lastEdge).toBlocking().lastOrDefault(null); // this should throw
an exception
-        }).toBlocking().lastOrDefault(null);//this should throw an exception
+        // remove any duplicate edges (keeps the duplicate edge with same timestamp)
+        removeDuplicateEdgesAsync(gm, edge);
 
 
         return connection;
@@ -1023,4 +1005,29 @@ public class CpRelationManager implements RelationManager {
         }
         return entity;
     }
+
+    private void removeDuplicateEdgesAsync(GraphManager gm, Edge edge){
+
+        //now read all older versions of an edge, and remove them.  Finally calling delete
+        final SearchByEdge searchByEdge =
+            new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
Long.MAX_VALUE,
+                SearchByEdgeType.Order.DESCENDING, Optional.absent() );
+
+        //load our versions, only retain the most recent one
+        gm.loadEdgeVersions(searchByEdge).skip(1).flatMap(edgeToDelete -> {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Duplicate edge. Marking edge {} for deletion", edgeToDelete);
+            }
+            return gm.markEdge(edgeToDelete );
+        }).lastOrDefault(null).doOnNext(lastEdge -> {
+            //no op if we hit our default
+            if (lastEdge == null) {
+                return;
+            }
+            //don't queue delete b/c that de-indexes, we need to delete the edges only since
we have a version still existing to index.
+            gm.deleteEdge(lastEdge).toBlocking().lastOrDefault(null); // this should throw
an exception
+        }).toBlocking().lastOrDefault(null);//this should throw an exception
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/62ad5a84/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index 862a80e..dd59b77 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -44,7 +44,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import com.google.common.base.Optional;
 
 import rx.Observable;
-
+import rx.functions.Func1;
 
 
 /**
@@ -175,7 +175,8 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id,
Id,
                 return !isDeleted && !isSourceNodeDeleted && !isTargetNodeDelete;
 
 
-            })
+            })  // any non-deleted edges should be de-duped here so the results are unique
+                .distinct( new EdgeDistinctKey() )
                 //set the edge state for cursors
                 .doOnNext( edge -> {
                     logger.trace( "Seeking over edge {}", edge );
@@ -253,4 +254,36 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id,
Id,
 
     }
 
+    /**
+     *  Return a key that Rx can use for determining a distinct edge.  Build a string containing
the hash code
+     *  of the source, target, and type to ensure uniqueness rather than the int sum of the
hash codes.  Edge
+     *  timestamp is specifically left out as edges with the same source,target,type but
different timestamps
+     *  are considered duplicates.
+     */
+    private class EdgeDistinctKey implements Func1<Edge,String> {
+
+        @Override
+        public String call(Edge edge) {
+
+            return buildDistinctKey(edge.getSourceNode().hashCode(), edge.getTargetNode().hashCode(),
+                edge.getType().hashCode());
+        }
+    }
+
+    protected static String buildDistinctKey(final int sourceHash, final int targetHash,
final int typeHash){
+
+        final String DISTINCT_KEY_SEPARATOR = ":";
+        StringBuilder stringBuilder = new StringBuilder();
+
+        stringBuilder
+            .append(sourceHash)
+            .append(DISTINCT_KEY_SEPARATOR)
+            .append(targetHash)
+            .append(DISTINCT_KEY_SEPARATOR)
+            .append(typeHash);
+
+        return stringBuilder.toString();
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/62ad5a84/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java
b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java
index d81cad2..bc693ce 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java
@@ -583,4 +583,37 @@ public class EntityManagerIT extends AbstractCoreIT {
 
         assertNotNull( em.get( user.getUuid() ) );
     }
+
+
+    @Test
+    public void testFilteringOfDuplicateEdges() throws Exception {
+        LOG.info( "EntityManagerIT.testFilteringOfDuplicateEdges" );
+
+        EntityManager em = app.getEntityManager();
+
+        Map<String, Object> properties = new LinkedHashMap<String, Object>();
+        properties.put( "name", "fluffy1" );
+
+        Entity entity = em.create( "fluffy", properties );
+
+
+        EntityRef appRef =  new SimpleEntityRef("application", app.getId());
+        EntityRef entityRef = new SimpleEntityRef(entity.getType(), entity.getUuid());
+
+        assertNotNull( entity );
+
+
+        // create duplicate edges
+        em.addToCollection(appRef, "fluffies", entityRef);
+        em.addToCollection(appRef, "fluffies", entityRef);
+
+        //app.refreshIndex();
+
+        Results results = em.getCollection(appRef,
+            "fluffies", null, 10, Level.ALL_PROPERTIES, true);
+        
+        // we should be filtering duplicate edges so only assert 1 result back and not the
# of edges
+        assertEquals(1, results.getEntities().size());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/62ad5a84/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
index a87a079..147eea9 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
@@ -20,20 +20,14 @@
 package org.apache.usergrid.persistence.graph.impl.stage;
 
 
-import java.util.Iterator;
 import java.util.UUID;
 
-import com.google.common.base.Optional;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
 
 import com.google.common.base.Preconditions;
@@ -42,8 +36,6 @@ import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
-import rx.functions.Action1;
-import rx.functions.Func1;
 
 
 /**
@@ -53,7 +45,7 @@ import rx.functions.Func1;
 public class EdgeDeleteRepairImpl implements EdgeDeleteRepair {
 
 
-    private static final Logger LOG = LoggerFactory.getLogger( EdgeDeleteRepairImpl.class
);
+    private static final Logger logger = LoggerFactory.getLogger( EdgeDeleteRepairImpl.class
);
 
     protected final EdgeSerialization storageSerialization;
     protected final GraphFig graphFig;
@@ -81,8 +73,12 @@ public class EdgeDeleteRepairImpl implements EdgeDeleteRepair {
         //merge source and target then deal with the distinct values
         return Observable.just( edge ).filter( markedEdge-> markedEdge.isDeleted() )
                 .doOnNext( markedEdge -> {
-                    //it's still in the same state as it was when we queued it. Remove it
-                        LOG.info( "Removing edge {} ", markedEdge );
+
+                        //it's still in the same state as it was when we queued it. Remove
it
+                        if(logger.isDebugEnabled()){
+                            logger.debug( "Removing edge {} ", markedEdge );
+                        }
+
 
                         //remove from the commit log
 


Mime
View raw message