usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [16/50] [abbrv] incubator-usergrid git commit: First pass at upgrading to java 8 and latest RX java
Date Mon, 23 Mar 2015 18:30:26 GMT
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 26d06ad..ef258f4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -188,7 +188,7 @@ public class GraphManagerImpl implements GraphManager {
         final Timer.Context timer = writeEdgeTimer.time();
         final Meter meter = writeEdgeMeter;
 
-        return Observable.from( markedEdge ).map( new Func1<MarkedEdge, Edge>() {
+        return Observable.just( markedEdge ).map( new Func1<MarkedEdge, Edge>() {
             @Override
             public Edge call( final MarkedEdge edge ) {
 
@@ -234,7 +234,7 @@ public class GraphManagerImpl implements GraphManager {
 
         final Timer.Context timer = deleteEdgeTimer.time();
         final Meter meter = deleteEdgeMeter;
-        return Observable.from(markedEdge).map(new Func1<MarkedEdge, Edge>() {
+        return Observable.just(markedEdge).map(new Func1<MarkedEdge, Edge>() {
             @Override
             public Edge call(final MarkedEdge edge) {
 
@@ -281,7 +281,7 @@ public class GraphManagerImpl implements GraphManager {
     public Observable<Id> deleteNode( final Id node, final long timestamp ) {
         final Timer.Context timer = deleteNodeTimer.time();
         final Meter meter = deleteNodeMeter;
-        return Observable.from( node ).map( new Func1<Id, Id>() {
+        return Observable.just( node ).map( new Func1<Id, Id>() {
             @Override
             public Id call( final Id id ) {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
index ab141f7..bfaeaaa 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
@@ -176,6 +176,8 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
                          * Sum up the total number of edges we had, then execute the mutation
if we have
                          * anything to do
                          */
+
+
                         return MathObservable.sumInteger( Observable.merge( checks ) )
                                              .doOnNext( new Action1<Integer>() {
                                                             @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
index e8c224e..6236a16 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
@@ -103,7 +103,7 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
     public Observable<Integer> receive( final ApplicationScope scope, final Id node,
final UUID timestamp ) {
 
 
-        return Observable.from( node )
+        return Observable.just( node )
 
                 //delete source and targets in parallel and merge them into a single observable
                 .flatMap( new Func1<Id, Observable<Integer>>() {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
index 2d9b47f..ecb9a9b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
 import rx.Observable;
 import rx.functions.Action1;
 import rx.functions.Func1;
+import rx.schedulers.Schedulers;
 
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
@@ -75,61 +76,49 @@ public class EdgeDataMigrationImpl implements DataMigration<GraphNode>
{
     }
 
 
-
-
     @Override
-       public int migrate( final int currentVersion, final MigrationDataProvider<GraphNode>
migrationDataProvider,
-                           final ProgressObserver observer ) {
+    public int migrate( final int currentVersion, final MigrationDataProvider<GraphNode>
migrationDataProvider,
+                        final ProgressObserver observer ) {
 
         final AtomicLong counter = new AtomicLong();
 
-        final MigrationRelationship<EdgeMetadataSerialization>
-                migration = allVersions.getMigrationRelationship( currentVersion );
-
-       final Observable<List<Edge>> observable =  migrationDataProvider.getData().flatMap(
new Func1<GraphNode,
-           Observable<List<Edge>>>() {
-            @Override
-            public Observable<List<Edge>> call( final GraphNode graphNode ) {
-                final GraphManager gm = graphManagerFactory.createEdgeManager( graphNode.applicationScope
);
-
-                //get edges from the source
-                return edgesFromSourceObservable.edgesFromSource( gm, graphNode.entryNode
).buffer( 1000 ).parallel( new Func1<Observable<List<Edge>>, Observable<List<Edge>>>()
{
-                                                  @Override
-                                                  public Observable<List<Edge>>
call( final Observable<List<Edge>> listObservable ) {
-                          return listObservable.doOnNext( new Action1<List<Edge>>()
{
-                              @Override
-                              public void call( List<Edge> edges ) {
-                                  final MutationBatch batch = keyspace.prepareMutationBatch();
-
-                                  for ( Edge edge : edges ) {
-                                      logger.info( "Migrating meta for edge {}", edge );
-                                      final MutationBatch edgeBatch =
-                                              migration.to.writeEdge(  graphNode.applicationScope,
edge );
-                                      batch.mergeShallow( edgeBatch );
-                                  }
-
-                                  try {
-                                      batch.execute();
-                                  }
-                                  catch ( ConnectionException e ) {
-                                      throw new RuntimeException( "Unable to perform migration",
e );
-                                  }
-
-                                  //update the observer so the admin can see it
-                                  final long newCount = counter.addAndGet( edges.size() );
-
-                                  observer.update( migration.to.getImplementationVersion(),
-                                          String.format( "Currently running.  Rewritten %d
edge types",
-                                                  newCount ) );
-                              }
-                          } );
-                  } } );
-            }} );
-
-        observable.longCount().toBlocking().last();
+        final MigrationRelationship<EdgeMetadataSerialization> migration =
+            allVersions.getMigrationRelationship( currentVersion );
 
-        return migration.to.getImplementationVersion();
+        final Observable<List<Edge>> observable = migrationDataProvider.getData().flatMap(
graphNode -> {
+            final GraphManager gm = graphManagerFactory.createEdgeManager( graphNode.applicationScope
);
+
+            //get edges from the source
+            return edgesFromSourceObservable.edgesFromSource( gm, graphNode.entryNode ).buffer(
1000 )
+                                            .doOnNext( edges -> {
+                                                    final MutationBatch batch = keyspace.prepareMutationBatch();
+
+                                                    for ( Edge edge : edges ) {
+                                                        logger.info( "Migrating meta for
edge {}", edge );
+                                                        final MutationBatch edgeBatch =
+                                                            migration.to.writeEdge( graphNode.applicationScope,
edge );
+                                                        batch.mergeShallow( edgeBatch );
+                                                    }
 
+                                                    try {
+                                                        batch.execute();
+                                                    }
+                                                    catch ( ConnectionException e ) {
+                                                        throw new RuntimeException( "Unable
to perform migration", e );
+                                                    }
+
+                                                    //update the observer so the admin can
see it
+                                                    final long newCount = counter.addAndGet(
edges.size() );
+
+                                                    observer.update( migration.to.getImplementationVersion(),
String
+                                                        .format( "Currently running.  Rewritten
%d edge types",
+                                                            newCount ) );
+                                                } ).subscribeOn( Schedulers.io() );
+        }, 10 );
+
+        observable.countLong().toBlocking().last();
+
+        return migration.to.getImplementationVersion();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 6d30d22..3bbf3e4 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -516,7 +516,7 @@ public class GraphManagerShardConsistencyIT {
                                                             }
                                                         } )
 
-                                                        .longCount().toBlocking().last();
+                                                        .countLong().toBlocking().last();
 
 
 //                if(returnedEdgeCount != count[0]-duplicate[0]){

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/SimpleTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/SimpleTest.java
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/SimpleTest.java
index 0a27a6b..7b3fafd 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/SimpleTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/SimpleTest.java
@@ -63,23 +63,23 @@ public class SimpleTest {
 
 
         Edge testTargetEdge = createEdge( sourceId1, "test", targetId1, System.currentTimeMillis()
);
-        gm.writeEdge( testTargetEdge ).toBlockingObservable().singleOrDefault( null );
+        gm.writeEdge( testTargetEdge ).toBlocking().singleOrDefault( null );
 
         Edge testTarget2Edge = createEdge( sourceId2, "edgeType1", targetId1, System.currentTimeMillis()
);
-        gm.writeEdge( testTarget2Edge ).toBlockingObservable().singleOrDefault( null );
+        gm.writeEdge( testTarget2Edge ).toBlocking().singleOrDefault( null );
 
         Edge test2TargetEdge = createEdge( sourceId1, "edgeType1", targetId1, System.currentTimeMillis()
);
-        gm.writeEdge( test2TargetEdge ).toBlockingObservable().singleOrDefault( null );
+        gm.writeEdge( test2TargetEdge ).toBlocking().singleOrDefault( null );
 
         Edge test3TargetEdge = createEdge( sourceId1, "edgeType2", targetId1, System.currentTimeMillis()
);
-        gm.writeEdge( test3TargetEdge ).toBlockingObservable().singleOrDefault( null );
+        gm.writeEdge( test3TargetEdge ).toBlocking().singleOrDefault( null );
 
         int count = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType(targetId1, null, null)
)
-                .count().toBlockingObservable().last();
+                .count().toBlocking().last();
         assertEquals( 3, count );
 
         count = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType(targetId1, "edgeType",
null) )
-                .count().toBlockingObservable().last();
+                .count().toBlocking().last();
         assertEquals( 2, count );
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImplTest.java
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImplTest.java
index a269c15..049c3d2 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImplTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImplTest.java
@@ -121,7 +121,7 @@ public class EdgeDataMigrationImplTest implements DataMigrationResetRule.DataMig
 
 
         //walk from s1 and s2
-        final Observable<GraphNode> graphNodes = Observable.from( new GraphNode( applicationScope,
sourceId1), new GraphNode(applicationScope, sourceId2 ) );
+        final Observable<GraphNode> graphNodes = Observable.just( new GraphNode( applicationScope,
sourceId1), new GraphNode(applicationScope, sourceId2 ) );
 
         final MigrationDataProvider<GraphNode> testMigrationProvider = new MigrationDataProvider<GraphNode>()
{
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index 9656e2d..3ec7852 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -47,8 +47,8 @@ limitations under the License.
 
     <properties>
 
-        <maven.compiler.source>1.7</maven.compiler.source>
-        <maven.compiler.target>1.7</maven.compiler.target>
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
 
         <antlr.version>3.4</antlr.version>
         <archaius.version>0.5.12</archaius.version>
@@ -64,14 +64,14 @@ limitations under the License.
         <guava.version>18.0</guava.version>
         <guice.version>4.0-beta5</guice.version>
         <guicyfig.version>3.2</guicyfig.version>
-        <hystrix.version>1.3.16</hystrix.version>
+        <hystrix.version>1.4.0</hystrix.version>
         <jackson-2-version>2.4.1</jackson-2-version>
         <jackson-smile.verson>2.4.3</jackson-smile.verson>
         <mockito.version>1.10.8</mockito.version>
         <junit.version>4.11</junit.version>
         <kryo-serializers.version>0.26</kryo-serializers.version>
         <log4j.version>1.2.17</log4j.version>
-        <rx.version>0.19.6</rx.version>
+        <rx.version>1.0.8</rx.version>
         <slf4j.version>1.7.2</slf4j.version>
         <surefire.version>2.16</surefire.version>
         <aws.version>1.9.0</aws.version>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
index c962d6b..82af950 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.UUID;
 
 import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.slf4j.Logger;
@@ -32,7 +33,6 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.core.test.UseModules;
 import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexScope;
 import org.apache.usergrid.persistence.index.guice.IndexTestFig;
@@ -41,13 +41,11 @@ import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.field.IntegerField;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.google.inject.Inject;
 
 import rx.Observable;
 import rx.functions.Action1;
-import rx.functions.Action2;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
@@ -57,6 +55,7 @@ import rx.schedulers.Schedulers;
  */
 @RunWith( EsRunner.class )
 @UseModules( { TestIndexModule.class } )
+@Ignore( "Should only be run during load tests of elasticsearch" )
 public class IndexLoadTestsIT extends BaseIT {
     private static final Logger log = LoggerFactory.getLogger( IndexLoadTestsIT.class );
 
@@ -70,13 +69,14 @@ public class IndexLoadTestsIT extends BaseIT {
     @Inject
     public EntityIndexFactory entityIndexFactory;
 
+
     @Test
-    public void testHeavyLoad(){
+    public void testHeavyLoad() {
 
         final UUID applicationUUID = UUID.fromString( indexTestFig.getApplicationId() );
 
-        final Id applicationId = new SimpleId(applicationUUID, "application");
-        final ApplicationScope scope = new ApplicationScopeImpl( applicationId  );
+        final Id applicationId = new SimpleId( applicationUUID, "application" );
+        final ApplicationScope scope = new ApplicationScopeImpl( applicationId );
 
         final EntityIndex index = entityIndexFactory.createEntityIndex( scope );
 
@@ -87,83 +87,52 @@ public class IndexLoadTestsIT extends BaseIT {
 
         //run them all
         createEntities.toBlocking().last();
-
-
-
-
     }
 
-    public Observable<Entity> createStreamFromWorkers(final EntityIndex entityIndex,
final Id ownerId){
-
-        //create a sequence of observables.  Each index will be it's own worker thread using
the Schedulers.newthread()
-     return Observable.range( 0, indexTestFig.getNumberOfWorkers() ).parallel( new Func1<Observable<Integer>,
Observable<Entity>>() {
-
 
-          @Override
-          public Observable<Entity> call( final Observable<Integer> integerObservable
) {
-             return integerObservable.flatMap( new Func1<Integer, Observable<Entity>>()
{
-                  @Override
-                  public Observable<Entity> call( final Integer integer ) {
-                      return createWriteObservable( entityIndex, ownerId, integer );
-                  }
-              } );
+    public Observable<Entity> createStreamFromWorkers( final EntityIndex entityIndex,
final Id ownerId ) {
 
-          }
-      }, Schedulers.newThread() );
+        //create a sequence of observables.  Each index will be it's own worker thread using
the Schedulers.newthread()
+        return Observable.range( 0, indexTestFig.getNumberOfWorkers() ).flatMap(
+            integer -> createWriteObservable( entityIndex, ownerId, integer ).subscribeOn(
Schedulers.newThread() ) );
     }
 
 
-    private Observable<Entity> createWriteObservable( final EntityIndex entityIndex,
final Id ownerId, final int workerIndex){
+    private Observable<Entity> createWriteObservable( final EntityIndex entityIndex,
final Id ownerId,
+                                                      final int workerIndex ) {
 
 
         final IndexScope scope = new IndexScopeImpl( ownerId, "test" );
 
 
-
-       return  Observable.range( 0, indexTestFig.getNumberOfRecords() )
+        return Observable.range( 0, indexTestFig.getNumberOfRecords() )
 
             //create our entity
-                  .map( new Func1<Integer, Entity>() {
-            @Override
-            public Entity call( final Integer integer ) {
-                final Entity entity = new Entity("test");
-
-                entity.setField( new IntegerField("workerIndex", workerIndex));
-                entity.setField( new IntegerField( "ordinal", integer ) );
-
-                return entity;
-            }
-        } ).buffer( indexTestFig.getBufferSize() ).doOnNext( new Action1<List<Entity>>()
{
-            @Override
-            public void call( final List<Entity> entities ) {
-                //take our entities and roll them into a batch
-                  Observable.from( entities ).collect( entityIndex.createBatch(), new Action2<EntityIndexBatch,
Entity>() {
-
-
-                    @Override
-                    public void call( final EntityIndexBatch entityIndexBatch, final Entity
entity ) {
-                        entityIndexBatch.index(scope, entity  );
-                    }
-                } ).doOnNext( new Action1<EntityIndexBatch>() {
-                    @Override
-                    public void call( final EntityIndexBatch entityIndexBatch ) {
+            .map( new Func1<Integer, Entity>() {
+                @Override
+                public Entity call( final Integer integer ) {
+                    final Entity entity = new Entity( "test" );
+
+                    entity.setField( new IntegerField( "workerIndex", workerIndex ) );
+                    entity.setField( new IntegerField( "ordinal", integer ) );
+
+                    return entity;
+                }
+            } ).buffer( indexTestFig.getBufferSize() ).doOnNext( new Action1<List<Entity>>()
{
+                @Override
+                public void call( final List<Entity> entities ) {
+                    //take our entities and roll them into a batch
+                    Observable.from( entities )
+                              .collect( () -> entityIndex.createBatch(), ( entityIndexBatch,
entity ) -> {
+
+                                  entityIndexBatch.index( scope, entity );
+                              } ).doOnNext( entityIndexBatch -> {
                         entityIndexBatch.execute();
-                    }
-                } ).toBlocking().last();
-            }
-        } )
-
-            //translate back into a stream of entities for the caller to use
-           .flatMap( new Func1<List<Entity>, Observable<Entity>>() {
-            @Override
-            public Observable<Entity> call( final List<Entity> entities ) {
-                return Observable.from( entities );
-            }
-        } );
+                    } ).toBlocking().last();
+                }
+            } )
 
+                //translate back into a stream of entities for the caller to use
+            .flatMap(entities -> Observable.from( entities ) );
     }
-
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index f24917a..efbda2d 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -125,7 +125,7 @@
       <usergrid.it.threads>8</usergrid.it.threads>
 
       <metrics.version>3.0.0</metrics.version>
-      <rx.version>0.19.6</rx.version>
+      <rx.version>1.0.8</rx.version>
         <surefire.plugin.artifactName>surefire-junit47</surefire.plugin.artifactName>
       <surefire.plugin.version>2.18.1</surefire.plugin.version>
       <powermock.version>1.6.1</powermock.version>
@@ -1560,8 +1560,8 @@
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-compiler-plugin</artifactId>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.8</source>
+          <target>1.8</target>
           <optimize>true</optimize>
           <showDeprecation>true</showDeprecation>
           <debug>true</debug>
@@ -1583,7 +1583,7 @@
             <configuration>
               <rules>
                 <requireJavaVersion>
-                  <version>1.7.0</version>
+                  <version>1.8.0</version>
                 </requireJavaVersion>
                 <requireMavenVersion>
                   <version>[3.0,)</version>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index 4f849e0..bebd557 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@ -937,18 +937,12 @@ public class ImportServiceImpl implements ImportService {
         // potentially skip the first n if this is a resume operation
         final int entityNumSkip = (int)tracker.getTotalEntityCount();
 
-        // with this code we get asynchronous behavior and testImportWithMultipleFiles will
fail
-       final int entityCount =  entityEventObservable.takeWhile( new Func1<WriteEvent,
Boolean>() {
-            @Override
-            public Boolean call( final WriteEvent writeEvent ) {
-                return !tracker.shouldStopProcessingEntities();
-            }
-        } ).skip(entityNumSkip).parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>()
{
-            @Override
-            public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable)
{
-                return entityWrapperObservable.doOnNext(doWork);
-            }
-        }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
+
+        entityEventObservable.takeWhile( writeEvent -> !tracker.shouldStopProcessingEntities()
).skip( entityNumSkip )
+            .flatMap( writeEvent -> {
+                return Observable.just( writeEvent ).doOnNext( doWork );
+            }, 10 ).reduce( 0, heartbeatReducer ).toBlocking().last();
+
 
         jp.close();
 
@@ -979,17 +973,11 @@ public class ImportServiceImpl implements ImportService {
         final int connectionNumSkip = (int)tracker.getTotalConnectionCount();
 
         // with this code we get asynchronous behavior and testImportWithMultipleFiles will
fail
-        final int connectionCount = otherEventObservable.takeWhile( new Func1<WriteEvent,
Boolean>() {
-            @Override
-            public Boolean call( final WriteEvent writeEvent ) {
-                return !tracker.shouldStopProcessingConnections();
-            }
-        } ).skip(connectionNumSkip).parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>()
{
-            @Override
-            public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable)
{
-                return entityWrapperObservable.doOnNext(doWork);
-            }
-        }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
+        final int connectionCount = otherEventObservable.takeWhile(
+            writeEvent -> !tracker.shouldStopProcessingConnections() ).skip(connectionNumSkip).flatMap(
entityWrapper ->{
+                return Observable.just(entityWrapper).doOnNext( doWork ).subscribeOn( Schedulers.io()
);
+
+        }, 10 ).reduce(0, heartbeatReducer).toBlocking().last();
 
         jp.close();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 5b1a6b3..b183daa 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -110,84 +110,81 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
             final UUID appId = em.getApplication().getUuid();
             final Map<String,Object> payloads = notification.getPayloads();
 
-            final Func1<Entity,Entity> entityListFunct = new Func1<Entity, Entity>()
{
-                @Override
-                public Entity call(Entity entity) {
+            final Func1<Entity,Entity> entityListFunct = entity -> {
 
-                    try {
+                try {
 
-                        long now = System.currentTimeMillis();
-                        List<EntityRef> devicesRef = getDevices(entity); // resolve
group
+                    long now = System.currentTimeMillis();
+                    List<EntityRef> devicesRef = getDevices(entity); // resolve group
 
-                        LOG.info("notification {} queue  {} devices, duration "+(System.currentTimeMillis()-now)+"
ms", notification.getUuid(), devicesRef.size());
+                    LOG.info("notification {} queue  {} devices, duration "+(System.currentTimeMillis()-now)+"
ms", notification.getUuid(), devicesRef.size());
 
-                        for (EntityRef deviceRef : devicesRef) {
-                            LOG.info("notification {} starting to queue device {} ", notification.getUuid(),
deviceRef.getUuid());
-                            long hash = MurmurHash.hash(deviceRef.getUuid());
-                            if (sketch.estimateCount(hash) > 0) { //look for duplicates
-                                LOG.warn("Maybe Found duplicate device: {}", deviceRef.getUuid());
-                                continue;
-                            } else {
-                                sketch.add(hash, 1);
-                            }
-                            String notifierId = null;
-                            String notifierKey = null;
-
-                            //find the device notifier info, match it to the payload
-                            for (Map.Entry<String, Object> entry : payloads.entrySet())
{
-                                ProviderAdapter adapter = notifierMap.get(entry.getKey().toLowerCase());
-                                now = System.currentTimeMillis();
-                                String providerId = getProviderId(deviceRef, adapter.getNotifier());
-                                if (providerId != null) {
-                                    notifierId = providerId;
-                                    notifierKey = entry.getKey().toLowerCase();
-                                    break;
-                                }
-                                LOG.info("Provider query for notification {} device {} took
"+(System.currentTimeMillis()-now)+" ms",notification.getUuid(),deviceRef.getUuid());
-                            }
+                    for (EntityRef deviceRef : devicesRef) {
+                        LOG.info("notification {} starting to queue device {} ", notification.getUuid(),
deviceRef.getUuid());
+                        long hash = MurmurHash.hash(deviceRef.getUuid());
+                        if (sketch.estimateCount(hash) > 0) { //look for duplicates
+                            LOG.warn("Maybe Found duplicate device: {}", deviceRef.getUuid());
+                            continue;
+                        } else {
+                            sketch.add(hash, 1);
+                        }
+                        String notifierId = null;
+                        String notifierKey = null;
 
-                            if (notifierId == null) {
-                                LOG.info("Notifier did not match for device {} ", deviceRef);
-                                continue;
+                        //find the device notifier info, match it to the payload
+                        for (Map.Entry<String, Object> entry : payloads.entrySet())
{
+                            ProviderAdapter adapter = notifierMap.get(entry.getKey().toLowerCase());
+                            now = System.currentTimeMillis();
+                            String providerId = getProviderId(deviceRef, adapter.getNotifier());
+                            if (providerId != null) {
+                                notifierId = providerId;
+                                notifierKey = entry.getKey().toLowerCase();
+                                break;
                             }
+                            LOG.info("Provider query for notification {} device {} took "+(System.currentTimeMillis()-now)+"
ms",notification.getUuid(),deviceRef.getUuid());
+                        }
 
-                            ApplicationQueueMessage message = new ApplicationQueueMessage(appId,
notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId);
-                            if (notification.getQueued() == null) {
-                                // update queued time
-                                now = System.currentTimeMillis();
-                                notification.setQueued(System.currentTimeMillis());
-                                LOG.info("notification {} device {} queue time set. duration
"+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), deviceRef.getUuid());
-                            }
+                        if (notifierId == null) {
+                            LOG.info("Notifier did not match for device {} ", deviceRef);
+                            continue;
+                        }
+
+                        ApplicationQueueMessage message = new ApplicationQueueMessage(appId,
notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId);
+                        if (notification.getQueued() == null) {
+                            // update queued time
                             now = System.currentTimeMillis();
-                            qm.sendMessage(message);
-                            LOG.info("notification {} post-queue to device {} duration "
+ (System.currentTimeMillis() - now) + " ms "+queueName+" queue", notification.getUuid(),
deviceRef.getUuid());
-                            deviceCount.incrementAndGet();
-                            queueMeter.mark();
+                            notification.setQueued(System.currentTimeMillis());
+                            LOG.info("notification {} device {} queue time set. duration
"+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), deviceRef.getUuid());
                         }
-                    } catch (Exception deviceLoopException) {
-                        LOG.error("Failed to add devices", deviceLoopException);
-                        errorMessages.add("Failed to add devices for entity: " + entity.getUuid()
+ " error:" + deviceLoopException);
+                        now = System.currentTimeMillis();
+                        qm.sendMessage(message);
+                        LOG.info("notification {} post-queue to device {} duration " + (System.currentTimeMillis()
- now) + " ms "+queueName+" queue", notification.getUuid(), deviceRef.getUuid());
+                        deviceCount.incrementAndGet();
+                        queueMeter.mark();
                     }
-                    return entity;
+                } catch (Exception deviceLoopException) {
+                    LOG.error("Failed to add devices", deviceLoopException);
+                    errorMessages.add("Failed to add devices for entity: " + entity.getUuid()
+ " error:" + deviceLoopException);
                 }
+                return entity;
             };
 
             long now = System.currentTimeMillis();
-            Observable o = rx.Observable.create(new IteratorObservable<Entity>(iterator))
-                    .parallel(new Func1<Observable<Entity>, Observable<Entity>>()
{
-                        @Override
-                        public rx.Observable<Entity> call(rx.Observable<Entity>
deviceObservable) {
-                            return deviceObservable.map(entityListFunct);
-                        }
-                    }, Schedulers.io())
-                    .doOnError(new Action1<Throwable>() {
-                        @Override
-                        public void call(Throwable throwable) {
-                            LOG.error("Failed while writing", throwable);
-                        }
-                    });
-            o.toBlocking().lastOrDefault(null);
-            LOG.info("notification {} done queueing duration {} ms", notification.getUuid(),
System.currentTimeMillis() - now);
+
+
+            //process up to 10 concurrently
+            Observable o = rx.Observable.create( new IteratorObservable<Entity>( iterator
) )
+
+                                        .flatMap( entity -> Observable.just( entity ).map(
entityListFunct )
+                                                                      .doOnError( throwable
-> {
+                                                                          LOG.error( "Failed
while writing",
+                                                                              throwable );
+                                                                      } ).subscribeOn( Schedulers.io()
)
+
+                                            , 10 );
+
+            o.toBlocking().lastOrDefault( null );
+            LOG.info( "notification {} done queueing duration {} ms", notification.getUuid(),
System.currentTimeMillis() - now);
         }
 
         // update queued time
@@ -338,48 +335,39 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
                 return message;
             }
         };
-        Observable o = rx.Observable.from(messages)
-                .parallel(new Func1<rx.Observable<QueueMessage>, rx.Observable<ApplicationQueueMessage>>()
{
-                    @Override
-                    public rx.Observable<ApplicationQueueMessage> call(rx.Observable<QueueMessage>
messageObservable) {
-                        return messageObservable.map(func);
+
+        //from each queue message, process them in parallel up to 10 at a time
+        Observable o = rx.Observable.from( messages ).flatMap( queueMessage -> {
+
+
+            return Observable.just( queueMessage ).map( func ).buffer( messages.size() ).map(
queueMessages -> {
+                //for gcm this will actually send notification
+                for ( ProviderAdapter providerAdapter : notifierMap.values() ) {
+                    try {
+                        providerAdapter.doneSendingNotifications();
                     }
-                }, Schedulers.io())
-                .buffer(messages.size())
-                .map(new Func1<List<ApplicationQueueMessage>, HashMap<UUID, ApplicationQueueMessage>>()
{
-                    @Override
-                    public HashMap<UUID, ApplicationQueueMessage> call(List<ApplicationQueueMessage>
queueMessages) {
-                        //for gcm this will actually send notification
-                        for (ProviderAdapter providerAdapter : notifierMap.values()) {
-                            try {
-                                providerAdapter.doneSendingNotifications();
-                            } catch (Exception e) {
-                                LOG.error("providerAdapter.doneSendingNotifications: ", e);
-                            }
+                    catch ( Exception e ) {
+                        LOG.error( "providerAdapter.doneSendingNotifications: ", e );
+                    }
+                }
+                //TODO: check if a notification is done and mark it
+                HashMap<UUID, ApplicationQueueMessage> notifications = new HashMap<>();
+                for ( ApplicationQueueMessage message : queueMessages ) {
+                    if ( notifications.get( message.getNotificationId() ) == null ) {
+                        try {
+                            TaskManager taskManager = taskMap.get( message.getNotificationId()
);
+                            notifications.put( message.getNotificationId(), message );
+                            taskManager.finishedBatch();
                         }
-                        //TODO: check if a notification is done and mark it
-                        HashMap<UUID, ApplicationQueueMessage> notifications = new
HashMap<UUID, ApplicationQueueMessage>();
-                        for (ApplicationQueueMessage message : queueMessages) {
-                            if (notifications.get(message.getNotificationId()) == null) {
-                                try {
-                                    TaskManager taskManager = taskMap.get(message.getNotificationId());
-                                    notifications.put(message.getNotificationId(), message);
-                                    taskManager.finishedBatch();
-                                } catch (Exception e) {
-                                    LOG.error("Failed to finish batch", e);
-                                }
-                            }
-
+                        catch ( Exception e ) {
+                            LOG.error( "Failed to finish batch", e );
                         }
-                        return notifications;
-                    }
-                })
-                .doOnError(new Action1<Throwable>() {
-                    @Override
-                    public void call(Throwable throwable) {
-                        LOG.error("Failed while sending",throwable);
                     }
-                });
+                }
+                return notifications;
+            } ).doOnError( throwable -> LOG.error( "Failed while sending", throwable )
);
+        }, 10 );
+
         return o;
     }
 
@@ -400,7 +388,8 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager
{
      * {"winphone":"mymessage","apple":"mymessage"}
      * TODO: document this method better
      */
-    private Map<String, Object> translatePayloads(Map<String, Object> payloads,
Map<Object, ProviderAdapter> notifierMap) throws Exception {
+    private Map<String, Object> translatePayloads(Map<String, Object> payloads,
Map<Object, ProviderAdapter>
+        notifierMap) throws Exception {
         Map<String, Object> translatedPayloads = new HashMap<String, Object>(
 payloads.size());
         for (Map.Entry<String, Object> entry : payloads.entrySet()) {
             String payloadKey = entry.getKey().toLowerCase();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/282e2271/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java
----------------------------------------------------------------------
diff --git a/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java
b/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java
index e8c5ace..6d0419a 100644
--- a/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java
+++ b/stack/test-utils/src/main/java/org/apache/usergrid/setup/ConcurrentProcessSingleton.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.setup;
 
 
+import java.io.IOException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -106,7 +108,19 @@ public class ConcurrentProcessSingleton {
             barrier.await( ONE_MINUTE );
             logger.info( "Setup to complete" );
 
-            lock.maybeReleaseLock();
+
+            Runtime.getRuntime().addShutdownHook( new Thread(  ){
+                @Override
+                public void run() {
+                    try {
+                        lock.maybeReleaseLock();
+                    }
+                    catch ( IOException e ) {
+                        throw new RuntimeException( "Unable to release lock" );
+                    }
+                }
+            });
+
         }
         catch ( Exception e ) {
             throw new RuntimeException( "Unable to initialize system", e );


Mime
View raw message