usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From toddn...@apache.org
Subject [01/10] incubator-usergrid git commit: WIP Overwrite
Date Fri, 20 Mar 2015 13:41:51 GMT
Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-405 0a16033c3 -> dcf469378


WIP Overwrite


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9a787238
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9a787238
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9a787238

Branch: refs/heads/USERGRID-405
Commit: 9a7872380b2953b3916197d2aacc65fa78867642
Parents: eb0c689
Author: Todd Nine <tnine@apigee.com>
Authored: Wed Mar 18 14:18:14 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Wed Mar 18 14:18:14 2015 -0600

----------------------------------------------------------------------
 .../persistence/index/guice/IndexTestFig.java   |  57 ++++
 .../index/guice/TestIndexModule.java            |   4 +
 .../index/impl/CorePerformanceIT.java           | 339 -------------------
 .../index/impl/IndexLoadTestsIT.java            | 169 +++++++++
 4 files changed, 230 insertions(+), 339 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9a787238/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java
b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java
new file mode 100644
index 0000000..ecf3dfa
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.guice;
+
+
+import java.util.UUID;
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+
+/**
+ * Test configuration for creating documents
+ */
+@FigSingleton
+public interface IndexTestFig extends GuicyFig {
+
+    @Key( "stresstest.numWorkers" )
+    @Default( "16" )
+    public int getNumberOfWorkers();
+
+    @Key( "stresstest.numberofRecords" )
+    @Default( "10000" )
+    public int getNumberOfRecords();
+
+    @Key( "stresstest.bufferSize" )
+    @Default( "1000" )
+    public int getBufferSize();
+
+    @Key( "stresstest.validate.wait" )
+    @Default( "2000" )
+    public long getValidateWait();
+
+
+    @Key( "stresstest.applicationId" )
+    @Default( "0df46683-cdab-11e4-83c2-d2be4de3081a" )
+    public String getApplicationId();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9a787238/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 50b994d..79a021a 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@ -19,9 +19,12 @@
 package org.apache.usergrid.persistence.index.guice;
 
 
+import org.safehaus.guicyfig.GuicyFigModule;
+
 import org.apache.usergrid.persistence.collection.guice.CollectionModule;
 import org.apache.usergrid.persistence.core.guice.TestModule;
 import org.apache.usergrid.persistence.core.guice.CommonModule;
+import org.apache.usergrid.persistence.index.IndexFig;
 import org.apache.usergrid.persistence.index.impl.BufferQueue;
 import org.apache.usergrid.persistence.index.impl.BufferQueueInMemoryImpl;
 import org.apache.usergrid.persistence.index.impl.BufferQueueSQSImpl;
@@ -36,5 +39,6 @@ public class TestIndexModule extends TestModule {
         // configure collections and our core astyanax framework
         install( new CollectionModule() );
         install( new IndexModule()  );
+        install( new GuicyFigModule(IndexTestFig.class) );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9a787238/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java
b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java
deleted file mode 100644
index c1bfe38..0000000
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.index.impl;
-
-
-import java.io.BufferedReader;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import org.junit.ClassRule;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.lang3.math.NumberUtils;
-
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.IndexScope;
-import org.apache.usergrid.persistence.index.SearchTypes;
-import org.apache.usergrid.persistence.index.guice.TestIndexModule;
-import org.apache.usergrid.persistence.index.query.CandidateResults;
-import org.apache.usergrid.persistence.index.query.EntityResults;
-import org.apache.usergrid.persistence.index.query.Query;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
-import org.apache.usergrid.persistence.model.field.DoubleField;
-import org.apache.usergrid.persistence.model.field.LongField;
-import org.apache.usergrid.persistence.model.field.StringField;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-
-
-
-/**
- * TODO: make CorePerformanceIT configurable, add CHOP markup.
- */
-public class CorePerformanceIT extends BaseIT {
-    private static final Logger log = LoggerFactory.getLogger(CorePerformanceIT.class);
-
-    @ClassRule
-    public static ElasticSearchResource es = new ElasticSearchResource();
-
-    // max entities we will write and read
-    static int maxEntities = 10; // TODO: make this configurable when you add Chop
-
-    // each app will get all data
-    static int appCount = 10;
-
-    // number of threads = orgCount x appCount
-
-    // total number of records = orgCount x appCount x numRecords
-
-    static EntityCollectionManagerFactory ecmf;
-    static EntityIndexFactory ecif ;
-
-
-    @Ignore("Relies on finefoods.txt which must be downloaded separately")
-    @Test
-    public void loadAndReadData() throws IOException, InterruptedException {
-
-        Injector injector = Guice.createInjector( new TestIndexModule() );
-
-        // only on first run
-        //MigrationManager m = injector.getInstance( MigrationManager.class )
-        //m.migrate()
-
-        ecmf = injector.getInstance( EntityCollectionManagerFactory.class );
-        ecif = injector.getInstance( EntityIndexFactory.class );
-
-        final ApplicationScope scope = new ApplicationScopeImpl( new SimpleId( "application"
) );
-
-        log.info("Start Data Load");
-
-        List<IndexScope> scopes = loadData(scope);
-
-        log.info("Finish Data Load");
-
-        log.info("Start Data Read");
-
-
-        readData( scope, scopes );
-        log.info("Finish Data Read");
-
-        runSelectedQueries( scope, scopes );
-
-    }
-
-
-    private List<IndexScope> loadData(final ApplicationScope applicationScope) throws
InterruptedException {
-
-        long time = new Date().getTime();
-
-        List<IndexScope> scopes = new ArrayList<IndexScope>();
-        List<Thread> threads = new ArrayList<Thread>();
-
-
-        for ( int j = 0; j < appCount; j++ ) {
-
-            String appName = "app-" + j + "-" + time;
-            Id appId = new SimpleId( appName );
-            IndexScope indexScope = new IndexScopeImpl( appId, "reviews");
-            scopes.add( indexScope );
-
-            Thread t = new Thread( new DataLoader( applicationScope, indexScope ) );
-            t.start();
-            threads.add( t );
-        }
-
-        // wait for indexing to end
-        for ( Thread t : threads ) {
-            t.join();
-        }
-
-        return scopes;
-    }
-
-
-    private void readData(final ApplicationScope applicationScope,  List<IndexScope>
scopes ) throws InterruptedException {
-
-        List<Thread> threads = new ArrayList<Thread>();
-        for ( IndexScope scope : scopes ) {
-
-            Thread t = new Thread( new DataReader( applicationScope, scope ));
-            t.start();
-            threads.add(t);
-        }
-
-        // wait for reading to end
-        for ( Thread t : threads ) {
-            t.join();
-        }
-    }
-
-
-    static class DataReader implements Runnable {
-        final ApplicationScope scope;
-       final  IndexScope indexScope;
-
-        public DataReader( final ApplicationScope scope, IndexScope indexScope ) {
-            this.scope = scope;
-            this.indexScope = indexScope;
-        }
-
-        public void run() {
-
-            EntityIndex eci =   ecif.createEntityIndex( scope);
-            EntityCollectionManager ecm = ecmf.createCollectionManager( new CollectionScopeImpl(
scope.getApplication(), indexScope.getOwner(), indexScope.getName() ) );
-
-            Query query = Query.fromQL( "review_score > 0"); // get all reviews;
-            query.withLimit( maxEntities < 1000 ? maxEntities : 1000 );
-
-            final SearchTypes searchType = SearchTypes.fromTypes( "review" );
-
-            CandidateResults candidateResults = eci.search(indexScope, searchType, query
);
-            int count = candidateResults.size();
-
-            while ( candidateResults.hasCursor() && count < maxEntities ) {
-                query.setCursor( candidateResults.getCursor() )   ;
-                candidateResults = eci.search(indexScope, searchType,  query );
-                count += candidateResults.size();
-
-                //cause retrieval from cassandra
-                EntityResults entityResults = new EntityResults(
-                    candidateResults, ecm, UUIDGenerator.newTimeUUID() );
-
-                while(entityResults.hasNext()){
-                    entityResults.next();
-                }
-
-                log.info("Read {} reviews in {} / {} ", new Object[] {
-                    count, indexScope.getOwner(), indexScope.getName() } );
-            }
-        }
-    }
-
-
-    static class DataLoader implements Runnable {
-        final ApplicationScope applicationScope;
-        final IndexScope indexScope;
-
-        public DataLoader( final ApplicationScope applicationScope, IndexScope indexScope
) {
-            this.applicationScope = applicationScope;
-            this.indexScope = indexScope;
-        }
-
-        public void run() {
-
-            CollectionScope collectionScope = new CollectionScopeImpl(
-                    applicationScope.getApplication(), indexScope.getOwner(), indexScope.getName()
);
-            EntityCollectionManager ecm = ecmf.createCollectionManager(collectionScope );
-            EntityIndex eci = ecif.createEntityIndex(applicationScope );
-
-            FileReader fr;
-            try {
-                fr = new FileReader("../../resources/finefoods.txt");
-            } catch (FileNotFoundException ex) {
-                throw new RuntimeException("Error opening file", ex);
-            }
-            BufferedReader br = new BufferedReader(fr);
-            String s = null;
-
-            // create the first entry
-            Entity current = new Entity(
-                new SimpleId(UUIDGenerator.newTimeUUID(), "review"));
-
-//            Id orgId = orgAppScope.scope.getApplication();
-//            Id appId = orgAppScope.scope.getOwner();
-
-            int count = 0;
-
-            EntityIndexBatch entityIndexBatch = eci.createBatch();
-
-            try {
-                while ( (s = br.readLine()) != null && count < maxEntities ) {
-
-                    try {
-
-                        if ( s.trim().equals("")) { // then we are at end of a record
-
-                            // write and index current entity
-                            ecm.write( current ).toBlocking().last();
-
-                            entityIndexBatch.index(indexScope, current  );
-
-                            if ( maxEntities < 20 ) {
-                                log.info("Index written for {}", current.getId());
-                                log.info("---");
-                            }
-
-                            // create the next entity
-                            current = new Entity(
-                                    new SimpleId(UUIDGenerator.newTimeUUID(), "review"));
-
-                            count++;
-                            if(count % 1000 == 0){
-                                entityIndexBatch.execute().get();
-                            }
-
-                            if (count % 100000 == 0) {
-                                log.info("Indexed {} reviews in {} / {} ",
-                                    new Object[] {
-                                        count,
-                                            applicationScope,
-                                        indexScope.getOwner() } );
-                            }
-                            continue;
-                        }
-
-                        // process a field
-                        String name = s.substring( 0, s.indexOf(":")).replace("/", "_").toLowerCase()
;
-                        String value = s.substring( s.indexOf(":") + 1 ).trim();
-
-                        if ( maxEntities < 20 ) {
-                            log.info("Indexing {} = {}", name, value);
-                        }
-
-                        if ( NumberUtils.isNumber(value) && value.contains("."))
{
-                            current.setField( new DoubleField( name, Double.parseDouble(value)));
-
-                        } else if ( NumberUtils.isNumber(value) ) {
-                            current.setField( new LongField( name, Long.parseLong(value)));
-
-                        } else {
-                            current.setField( new StringField( name, value.toString() ));
-                        }
-
-                    } catch ( Exception e ) {
-                        log.info("Error on line " + count);
-                    }
-                }
-
-            } catch (IOException ex) {
-                throw new RuntimeException("Error reading file", ex);
-            }
-
-            eci.refresh();
-        }
-    }
-
-
-    public void runSelectedQueries(final ApplicationScope scope,  List<IndexScope>
indexScopes ) {
-
-        for ( IndexScope indexScope : indexScopes ) {
-            EntityIndex eci = ecif.createEntityIndex(scope );
-
-            // TODO: come up with more and more complex queries for CorePerformanceIT
-
-            query(indexScope, eci, "product_productid = 'B006K2ZZ7K'") ;
-            query(indexScope, eci, "review_profilename = 'Twoapennything'") ;
-            query(indexScope, eci, "review_profilename contains 'Natalia'") ;
-            query(indexScope, eci, "review_profilename contains 'Patrick'") ;
-            query(indexScope, eci, "review_time = 1342051200") ;
-            query(indexScope, eci, "review_time > 1342051200") ;
-            query(indexScope, eci, "review_score > 0");
-            query(indexScope, eci, "review_score > 2");
-            query(indexScope, eci, "review_score > 3");
-            query(indexScope, eci, "review_score > 4");
-            query(indexScope, eci, "review_score > 5");
-        }
-    }
-
-    public static void query(final IndexScope indexScope, final EntityIndex eci, final String
query ) {;
-        Query q = Query.fromQL(query) ;
-//        CandidateResults candidateResults = eci.search(indexScope,  q );  TODO FIXME
-//        log.info("size = {} returned from query {}", candidateResults.size(), q.getQl()
);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9a787238/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
new file mode 100644
index 0000000..c962d6b
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.List;
+import java.util.UUID;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.guice.IndexTestFig;
+import org.apache.usergrid.persistence.index.guice.TestIndexModule;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.field.IntegerField;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+
+import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Action2;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+
+/**
+ * TODO: make CorePerformanceIT configurable, add CHOP markup.
+ */
+@RunWith( EsRunner.class )
+@UseModules( { TestIndexModule.class } )
+public class IndexLoadTestsIT extends BaseIT {
+    private static final Logger log = LoggerFactory.getLogger( IndexLoadTestsIT.class );
+
+    @ClassRule
+    public static ElasticSearchResource es = new ElasticSearchResource();
+
+
+    @Inject
+    public IndexTestFig indexTestFig;
+
+    @Inject
+    public EntityIndexFactory entityIndexFactory;
+
+    @Test
+    public void testHeavyLoad(){
+
+        final UUID applicationUUID = UUID.fromString( indexTestFig.getApplicationId() );
+
+        final Id applicationId = new SimpleId(applicationUUID, "application");
+        final ApplicationScope scope = new ApplicationScopeImpl( applicationId  );
+
+        final EntityIndex index = entityIndexFactory.createEntityIndex( scope );
+
+        //create our index if it doesn't exist
+        index.initializeIndex();
+
+        final Observable<Entity> createEntities = createStreamFromWorkers( index, applicationId
);
+
+        //run them all
+        createEntities.toBlocking().last();
+
+
+
+
+    }
+
+    public Observable<Entity> createStreamFromWorkers(final EntityIndex entityIndex,
final Id ownerId){
+
+        //create a sequence of observables.  Each index will be it's own worker thread using
the Schedulers.newthread()
+     return Observable.range( 0, indexTestFig.getNumberOfWorkers() ).parallel( new Func1<Observable<Integer>,
Observable<Entity>>() {
+
+
+          @Override
+          public Observable<Entity> call( final Observable<Integer> integerObservable
) {
+             return integerObservable.flatMap( new Func1<Integer, Observable<Entity>>()
{
+                  @Override
+                  public Observable<Entity> call( final Integer integer ) {
+                      return createWriteObservable( entityIndex, ownerId, integer );
+                  }
+              } );
+
+          }
+      }, Schedulers.newThread() );
+    }
+
+
+    private Observable<Entity> createWriteObservable( final EntityIndex entityIndex,
final Id ownerId, final int workerIndex){
+
+
+        final IndexScope scope = new IndexScopeImpl( ownerId, "test" );
+
+
+
+       return  Observable.range( 0, indexTestFig.getNumberOfRecords() )
+
+            //create our entity
+                  .map( new Func1<Integer, Entity>() {
+            @Override
+            public Entity call( final Integer integer ) {
+                final Entity entity = new Entity("test");
+
+                entity.setField( new IntegerField("workerIndex", workerIndex));
+                entity.setField( new IntegerField( "ordinal", integer ) );
+
+                return entity;
+            }
+        } ).buffer( indexTestFig.getBufferSize() ).doOnNext( new Action1<List<Entity>>()
{
+            @Override
+            public void call( final List<Entity> entities ) {
+                //take our entities and roll them into a batch
+                  Observable.from( entities ).collect( entityIndex.createBatch(), new Action2<EntityIndexBatch,
Entity>() {
+
+
+                    @Override
+                    public void call( final EntityIndexBatch entityIndexBatch, final Entity
entity ) {
+                        entityIndexBatch.index(scope, entity  );
+                    }
+                } ).doOnNext( new Action1<EntityIndexBatch>() {
+                    @Override
+                    public void call( final EntityIndexBatch entityIndexBatch ) {
+                        entityIndexBatch.execute();
+                    }
+                } ).toBlocking().last();
+            }
+        } )
+
+            //translate back into a stream of entities for the caller to use
+           .flatMap( new Func1<List<Entity>, Observable<Entity>>() {
+            @Override
+            public Observable<Entity> call( final List<Entity> entities ) {
+                return Observable.from( entities );
+            }
+        } );
+
+    }
+
+
+
+
+}


Mime
View raw message