usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From toddn...@apache.org
Subject usergrid git commit: Added test to prove issue in graph. Need to fix version load.
Date Tue, 15 Sep 2015 19:35:30 GMT
Repository: usergrid
Updated Branches:
  refs/heads/USERGRID-1018 fe5d88eef -> 13d259419 (forced update)


Added test to prove issue in graph.  Need to fix version load.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/13d25941
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/13d25941
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/13d25941

Branch: refs/heads/USERGRID-1018
Commit: 13d2594196bd6315af4305e95e8314cbb600ee3e
Parents: 85e1e7c
Author: Todd Nine <tnine@apigee.com>
Authored: Tue Sep 15 13:34:34 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Tue Sep 15 13:35:21 2015 -0600

----------------------------------------------------------------------
 .../service/ConnectionServiceImpl.java          | 26 ++++---
 .../service/ConnectionServiceImplTest.java      | 36 ++++++++--
 stack/core/src/test/resources/log4j.properties  |  3 +-
 .../migration/schema/MigrationManagerFig.java   |  1 +
 .../impl/stage/EdgeDeleteListenerImpl.java      |  4 +-
 .../graph/impl/stage/EdgeDeleteRepairImpl.java  | 61 +++++-----------
 .../persistence/graph/GraphManagerIT.java       | 73 +++++++++++++++++++-
 7 files changed, 136 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/13d25941/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
index 41e6f80..d60426c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
@@ -147,18 +147,18 @@ public class ConnectionServiceImpl implements ConnectionService {
         final Observable<ApplicationScope> applicationScopeObservable ) {
 
 
-        final Observable<EntityIdScope> entityIds =allEntityIdsObservable.getEntities(
applicationScopeObservable );
+        final Observable<EntityIdScope> entityIds = allEntityIdsObservable.getEntities(
applicationScopeObservable );
         //now we have an observable of entityIds.  Walk each connection type
 
         //get all edge types for connections
-       return  entityIds.flatMap( entityIdScope -> {
+        return entityIds.flatMap( entityIdScope -> {
 
             final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
             final Id entityId = entityIdScope.getId();
 
-            final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope
);
+            final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope
);
 
-           logger.debug( "Checking connections of id {} in application {}", entityId, applicationScope
);
+            logger.debug( "Checking connections of id {} in application {}", entityId, applicationScope
);
 
             return gm.getEdgeTypesFromSource(
                 new SimpleSearchEdgeType( entityId, CpNamingUtils.EDGE_CONN_PREFIX, Optional.absent()
) )
@@ -176,7 +176,7 @@ public class ConnectionServiceImpl implements ConnectionService {
                     return gm.loadEdgesFromSource( searchByEdge );
                 } )
 
-                //now that we have a stream of edges, stream all versions
+                    //now that we have a stream of edges, stream all versions
                 .flatMap( edge -> {
 
                     logger.debug( "Found edge {}, searching for multiple versions of edge",
edge );
@@ -187,19 +187,17 @@ public class ConnectionServiceImpl implements ConnectionService {
                     return gm.loadEdgeVersions( searchByEdge );
                 } )
 
-            //skip the first version since it's the one we want to retain
-            // validate there is only 1 version of it, delete anything > than the min
-                .skip( 1 )
-                .flatMap( edgeToDelete -> {
+                    //skip the first version since it's the one we want to retain
+                    // validate there is only 1 version of it, delete anything > than
the min
+                .skip( 1 ).flatMap( edgeToDelete -> {
 
                     logger.debug( "Deleting edge {}", edgeToDelete );
 
                     //mark the edge and ignore the cleanup result
                     return gm.markEdge( edgeToDelete )
-                             //delete the edge
-                             .flatMap( edge -> gm.deleteEdge( edgeToDelete ));
-                } )
-                .map( deletedEdge -> new ConnectionScope( applicationScope, deletedEdge
) ) ;
-        });
+                        //delete the edge
+                        .flatMap( edge -> gm.deleteEdge( edgeToDelete ) );
+                } ).map( deletedEdge -> new ConnectionScope( applicationScope, deletedEdge
) );
+        } );
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/13d25941/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
index e03a9b3..152721a 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
@@ -23,6 +23,8 @@ import java.util.List;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.corepersistence.TestCoreModule;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
@@ -53,6 +55,9 @@ import static org.junit.Assert.assertNotNull;
 @UseModules( TestCoreModule.class )
 public class ConnectionServiceImplTest {
 
+
+    private static final Logger logger = LoggerFactory.getLogger(ConnectionServiceImplTest.class);
+
     @Inject
     private GraphManagerFactory graphManagerFactory;
 
@@ -158,19 +163,23 @@ public class ConnectionServiceImplTest {
 
         final Edge written1 = gm.writeEdge( connection1 ).toBlocking().last();
 
+        logger.info( "Wrote edge 1 with edge {}", written1 );
+
 
         //write the second
         final Edge connection2 = CpNamingUtils.createConnectionEdge( source, connectionType,
target );
 
         final Edge written2 = gm.writeEdge( connection2 ).toBlocking().last();
 
+        logger.info( "Wrote edge 2 with edge {}", written2 );
+
 
         //write the 3rd
         final Edge connection3 = CpNamingUtils.createConnectionEdge( source, connectionType,
target );
 
         final Edge written3 = gm.writeEdge( connection3 ).toBlocking().last();
 
-
+        logger.info( "Wrote edge 3 with edge {}", written3 );
 
 
         //now run the cleanup
@@ -178,13 +187,16 @@ public class ConnectionServiceImplTest {
         final List<ConnectionScope> deletedConnections =
             connectionService.deDupeConnections( Observable.just( applicationScope ) ).toList().toBlocking().last();
 
-        assertEquals( "2 edges deleted", 2, deletedConnections.size() );
+//        assertEquals( "2 edges deleted", 2, deletedConnections.size() );
 
         //check our oldest was deleted first
 
-        assertEquals(written2, deletedConnections.get( 0 ));
 
-        assertEquals(written3, deletedConnections.get( 1 ));
+        assertEdgeData( written2, deletedConnections.get( 0 ).getEdge() );
+
+        assertEdgeData( written3, deletedConnections.get( 1 ).getEdge() );
+
+        assertEquals( "2 edges deleted", 2, deletedConnections.size() );
 
 
 
@@ -201,4 +213,20 @@ public class ConnectionServiceImplTest {
 
         assertEquals( written1, edges.get( 0 ) );
     }
+
+
+    /**
+     * Compares edges based on their sourceId, type, targetId and timestamp. It ignores the
deleted flag
+     * @param expected
+     * @param asserted
+     */
+    private void assertEdgeData(final Edge expected, final Edge asserted){
+        assertEquals("SourceId the same", expected.getSourceNode(), expected.getTargetNode());
+        assertEquals("TargetId the same", expected.getTargetNode(), expected.getTargetNode());
+
+        assertEquals("Type the same", expected.getType(), expected.getType());
+
+        assertEquals("Timestamp the same", expected.getTimestamp(), expected.getTimestamp());
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/13d25941/stack/core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/core/src/test/resources/log4j.properties b/stack/core/src/test/resources/log4j.properties
index dd29671..93951fe 100644
--- a/stack/core/src/test/resources/log4j.properties
+++ b/stack/core/src/test/resources/log4j.properties
@@ -47,7 +47,8 @@ log4j.logger.org.apache.usergrid.persistence.PerformanceEntityRebuildIndexTest=D
 log4j.logger.org.apache.usergrid.corepersistence.migration=WARN
 
 #Debug our queries
-log4j.logger.org.apache.usergrid.persistence.index.impl.EsApplicationEntityIndexImpl=DEBUG
+#log4j.logger.org.apache.usergrid.persistence.index.impl.EsApplicationEntityIndexImpl=DEBUG
+log4j.logger.org.apache.usergrid.corepersistence.service.ConnectionServiceImpl=DEBUG
 
 #log4j.logger.org.apache.usergrid.persistence.index.impl=DEBUG
 #log4j.logger.org.apache.usergrid.corepersistence.CpSetup=INFO

http://git-wip-us.apache.org/repos/asf/usergrid/blob/13d25941/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerFig.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerFig.java
index b95338a..d8f3d1f 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerFig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerFig.java
@@ -35,6 +35,7 @@ public interface MigrationManagerFig extends GuicyFig {
     String getStrategyClass();
 
     @Key( "collections.keyspace.strategy.options" )
+    @Default("replication_factor:1")
     String getStrategyOptions();
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/13d25941/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListenerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListenerImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListenerImpl.java
index 22a5ad0..fa289bd 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListenerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteListenerImpl.java
@@ -30,6 +30,7 @@ import com.google.inject.Singleton;
 import rx.Observable;
 import rx.functions.Func1;
 import rx.functions.Func2;
+import rx.observables.MathObservable;
 
 
 /**
@@ -69,8 +70,7 @@ public class EdgeDeleteListenerImpl implements EdgeDeleteListener {
                                    Observable<Integer> targetDelete = edgeMetaRepair
                                            .repairTargets( scope, edge.getTargetNode(), edge.getType(),
maxTimestamp );
 
-                                   return Observable.zip( sourceDelete, targetDelete,
-                                       ( sourceCount, targetCount ) -> sourceCount + targetCount
);
+                                   return MathObservable.sumInteger( Observable.merge( sourceDelete,
targetDelete ) );
                                } );
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/13d25941/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
index 9b84978..643dcc1 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
@@ -79,52 +79,25 @@ public class EdgeDeleteRepairImpl implements EdgeDeleteRepair {
 
 
         //merge source and target then deal with the distinct values
-        return Observable.just( edge ).flatMap( new Func1<MarkedEdge, Observable<?
extends MarkedEdge>>() {
-            @Override
-            public Observable<? extends MarkedEdge> call( final MarkedEdge edge ) {
-
-                return getEdgeVersions( scope, edge, storageSerialization ).take( 1 )
-                        .doOnNext( new Action1<MarkedEdge>() {
-                            @Override
-                            public void call( final MarkedEdge markedEdge ) {
-                                //it's still in the same state as it was when we queued it.
Remove it
-                                if ( edge.equals( markedEdge ) ) {
-                                    LOG.info( "Removing edge {} ", edge );
-
-                                    //remove from the commit log
-
-
-                                    //remove from storage
-                                    try {
-                                        storageSerialization.deleteEdge( scope, edge, timestamp
).execute();
-                                    }
-                                    catch ( ConnectionException e ) {
-                                        throw new RuntimeException( "Unable to connect to
casandra", e );
-                                    }
-                                }
-                            }
-                        } );
-            }
-        } );
+        return Observable.just( edge )
+                .doOnNext( markedEdge -> {
+                    //it's still in the same state as it was when we queued it. Remove it
+                        LOG.info( "Removing edge {} ", markedEdge );
+
+                        //remove from the commit log
+
+
+                        //remove from storage
+                        try {
+                            storageSerialization.deleteEdge( scope, markedEdge, timestamp
).execute();
+                        }
+                        catch ( ConnectionException e ) {
+                            throw new RuntimeException( "Unable to connect to casandra",
e );
+                        }
+                    }
+              );
     }
 
 
-    /**
-     * Get all edge versions <= the specified max from the source
-     */
-    private Observable<MarkedEdge> getEdgeVersions( final ApplicationScope scope, final
Edge edge,
-                                                    final EdgeSerialization serialization
) {
 
-        return Observable.create( new ObservableIterator<MarkedEdge>( "edgeVersions"
) {
-            @Override
-            protected Iterator<MarkedEdge> getIterator() {
-
-                final SimpleSearchByEdge search =
-                        new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
-                                edge.getTimestamp(), SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent()
);
-
-                return serialization.getEdgeVersions( scope, search );
-            }
-        } );
-    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/13d25941/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
index d416b1d..4b2ffc8 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
@@ -20,9 +20,9 @@ package org.apache.usergrid.persistence.graph;
 
 
 import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.TimeoutException;
 
-import com.google.common.base.Optional;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -35,11 +35,14 @@ import org.apache.usergrid.persistence.core.test.ITRunner;
 import org.apache.usergrid.persistence.core.test.UseModules;
 import org.apache.usergrid.persistence.core.util.IdGenerator;
 import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 
+import com.google.common.base.Optional;
 import com.google.inject.Inject;
 
 import rx.Observable;
@@ -836,7 +839,7 @@ public class GraphManagerIT {
         //now load the next page
 
         //tests that even if a prefix is specified, the last takes precedence
-        edgeTypes = new SimpleSearchEdgeType( testTargetEdge.getSourceNode(), null, Optional.fromNullable("test")
);
+        edgeTypes = new SimpleSearchEdgeType( testTargetEdge.getSourceNode(), null, Optional.fromNullable(
"test" ) );
 
         edges = gm.getEdgeTypesFromSource( edgeTypes );
 
@@ -914,7 +917,7 @@ public class GraphManagerIT {
 
         //now load the next page
 
-        edgeTypes = new SimpleSearchEdgeType( testTargetEdge2.getTargetNode(), null,  Optional.fromNullable("test")
);
+        edgeTypes = new SimpleSearchEdgeType( testTargetEdge2.getTargetNode(), null, Optional.fromNullable(
"test" ) );
 
         edges = gm.getEdgeTypesToTarget( edgeTypes );
 
@@ -2321,6 +2324,70 @@ public class GraphManagerIT {
 
         em.markEdge( null );
     }
+
+
+    @Test
+    public void testReadMultipleVersionOrder() {
+
+        GraphManager gm = emf.createEdgeManager( scope );
+
+        final Id sourceId = createId( "source" );
+
+        final Id target = createId( "target" );
+
+
+        //write 3 edges with 3 different timestamp
+        final Edge edge1 = createEdge( sourceId, "test", target, 1, false );
+
+        gm.writeEdge( edge1 ).toBlocking().last();
+
+        final Edge edge2 = createEdge( sourceId, "test", target, 2, false );
+
+        gm.writeEdge( edge2 ).toBlocking().last();
+
+        final Edge edge3 = createEdge( sourceId, "test", target, 3, false );
+
+        gm.writeEdge( edge3 ).toBlocking().last();
+
+        //now test retrieving it
+
+
+        final SearchByEdge searchDescending =
+            new SimpleSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge1.getTargetNode(),Long.MAX_VALUE,
+                SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent()   );
+
+        final Observable<Edge> edgesDescending = gm.loadEdgeVersions( searchDescending
);
+
+        //search descending
+        final List<Edge> descending = edgesDescending.toList().toBlocking().single();
+
+        assertEquals( "Correct size returned", 3, descending.size() );
+
+        assertEquals( "Correct edges returned", edge3, descending.get( 0 ) );
+
+        assertEquals( "Correct edges returned", edge2, descending.get( 1 ) );
+
+        assertEquals( "Correct edges returned", edge1, descending.get( 2 ) );
+
+
+        //now search ascending
+
+        final SearchByEdge searchAscending =
+                    new SimpleSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge1.getTargetNode(),Long.MAX_VALUE,
+                        SearchByEdgeType.Order.ASCENDING, Optional.<Edge>absent() 
 );
+
+        Observable<Edge> edgesAscending = gm.loadEdgeVersions( searchAscending );
+
+        List<Edge> ascending = edgesAscending.toList().toBlocking().single();
+
+        assertEquals( "Correct size returned", 3, ascending.size() );
+
+        assertEquals( "Correct edges returned", edge1, ascending.get( 0 ) );
+
+        assertEquals( "Correct edges returned", edge2, ascending.get( 1 ) );
+
+        assertEquals( "Correct edges returned", edge3, ascending.get( 2 ) );
+    }
 }
 
 


Mime
View raw message