Return-Path: X-Original-To: apmail-usergrid-commits-archive@minotaur.apache.org Delivered-To: apmail-usergrid-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 554E917313 for ; Fri, 20 Mar 2015 16:51:47 +0000 (UTC) Received: (qmail 99498 invoked by uid 500); 20 Mar 2015 16:51:47 -0000 Delivered-To: apmail-usergrid-commits-archive@usergrid.apache.org Received: (qmail 99460 invoked by uid 500); 20 Mar 2015 16:51:47 -0000 Mailing-List: contact commits-help@usergrid.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@usergrid.incubator.apache.org Delivered-To: mailing list commits@usergrid.incubator.apache.org Received: (qmail 99451 invoked by uid 99); 20 Mar 2015 16:51:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Mar 2015 16:51:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1941BE1102; Fri, 20 Mar 2015 16:51:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sfeldman@apache.org To: commits@usergrid.apache.org Date: Fri, 20 Mar 2015 16:51:47 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [01/12] incubator-usergrid git commit: WIP Overwrite Repository: incubator-usergrid Updated Branches: refs/heads/USERGRID-480 f7e78f4a6 -> 336c22285 WIP Overwrite Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9a787238 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9a787238 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9a787238 Branch: refs/heads/USERGRID-480 Commit: 9a7872380b2953b3916197d2aacc65fa78867642 Parents: eb0c689 Author: Todd Nine Authored: Wed Mar 18 14:18:14 2015 -0600 Committer: Todd Nine Committed: Wed Mar 18 14:18:14 2015 -0600 ---------------------------------------------------------------------- .../persistence/index/guice/IndexTestFig.java | 57 ++++ .../index/guice/TestIndexModule.java | 4 + .../index/impl/CorePerformanceIT.java | 339 ------------------- .../index/impl/IndexLoadTestsIT.java | 169 +++++++++ 4 files changed, 230 insertions(+), 339 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9a787238/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java new file mode 100644 index 0000000..ecf3dfa --- /dev/null +++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.index.guice; + + +import java.util.UUID; + +import org.safehaus.guicyfig.Default; +import org.safehaus.guicyfig.FigSingleton; +import org.safehaus.guicyfig.GuicyFig; +import org.safehaus.guicyfig.Key; + + +/** + * Test configuration for creating documents + */ +@FigSingleton +public interface IndexTestFig extends GuicyFig { + + @Key( "stresstest.numWorkers" ) + @Default( "16" ) + public int getNumberOfWorkers(); + + @Key( "stresstest.numberofRecords" ) + @Default( "10000" ) + public int getNumberOfRecords(); + + @Key( "stresstest.bufferSize" ) + @Default( "1000" ) + public int getBufferSize(); + + @Key( "stresstest.validate.wait" ) + @Default( "2000" ) + public long getValidateWait(); + + + @Key( "stresstest.applicationId" ) + @Default( "0df46683-cdab-11e4-83c2-d2be4de3081a" ) + public String getApplicationId(); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9a787238/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java index 50b994d..79a021a 100644 --- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java +++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java @@ -19,9 +19,12 @@ package org.apache.usergrid.persistence.index.guice; +import org.safehaus.guicyfig.GuicyFigModule; + import org.apache.usergrid.persistence.collection.guice.CollectionModule; import org.apache.usergrid.persistence.core.guice.TestModule; import org.apache.usergrid.persistence.core.guice.CommonModule; +import org.apache.usergrid.persistence.index.IndexFig; import org.apache.usergrid.persistence.index.impl.BufferQueue; import org.apache.usergrid.persistence.index.impl.BufferQueueInMemoryImpl; import org.apache.usergrid.persistence.index.impl.BufferQueueSQSImpl; @@ -36,5 +39,6 @@ public class TestIndexModule extends TestModule { // configure collections and our core astyanax framework install( new CollectionModule() ); install( new IndexModule() ); + install( new GuicyFigModule(IndexTestFig.class) ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9a787238/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java deleted file mode 100644 index c1bfe38..0000000 --- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java +++ /dev/null @@ -1,339 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. The ASF licenses this file to You - * under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. For additional information regarding - * copyright in this work, please see the NOTICE file in the top level - * directory of this distribution. - */ -package org.apache.usergrid.persistence.index.impl; - - -import java.io.BufferedReader; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; - -import org.junit.ClassRule; -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.commons.lang3.math.NumberUtils; - -import org.apache.usergrid.persistence.collection.CollectionScope; -import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; -import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; -import org.apache.usergrid.persistence.index.EntityIndex; -import org.apache.usergrid.persistence.index.EntityIndexBatch; -import org.apache.usergrid.persistence.index.EntityIndexFactory; -import org.apache.usergrid.persistence.index.IndexScope; -import org.apache.usergrid.persistence.index.SearchTypes; -import org.apache.usergrid.persistence.index.guice.TestIndexModule; -import org.apache.usergrid.persistence.index.query.CandidateResults; -import org.apache.usergrid.persistence.index.query.EntityResults; -import org.apache.usergrid.persistence.index.query.Query; -import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.entity.Id; -import org.apache.usergrid.persistence.model.entity.SimpleId; -import org.apache.usergrid.persistence.model.field.DoubleField; -import org.apache.usergrid.persistence.model.field.LongField; -import org.apache.usergrid.persistence.model.field.StringField; -import org.apache.usergrid.persistence.model.util.UUIDGenerator; - -import com.google.inject.Guice; -import com.google.inject.Injector; - - - -/** - * TODO: make CorePerformanceIT configurable, add CHOP markup. - */ -public class CorePerformanceIT extends BaseIT { - private static final Logger log = LoggerFactory.getLogger(CorePerformanceIT.class); - - @ClassRule - public static ElasticSearchResource es = new ElasticSearchResource(); - - // max entities we will write and read - static int maxEntities = 10; // TODO: make this configurable when you add Chop - - // each app will get all data - static int appCount = 10; - - // number of threads = orgCount x appCount - - // total number of records = orgCount x appCount x numRecords - - static EntityCollectionManagerFactory ecmf; - static EntityIndexFactory ecif ; - - - @Ignore("Relies on finefoods.txt which must be downloaded separately") - @Test - public void loadAndReadData() throws IOException, InterruptedException { - - Injector injector = Guice.createInjector( new TestIndexModule() ); - - // only on first run - //MigrationManager m = injector.getInstance( MigrationManager.class ) - //m.migrate() - - ecmf = injector.getInstance( EntityCollectionManagerFactory.class ); - ecif = injector.getInstance( EntityIndexFactory.class ); - - final ApplicationScope scope = new ApplicationScopeImpl( new SimpleId( "application" ) ); - - log.info("Start Data Load"); - - List scopes = loadData(scope); - - log.info("Finish Data Load"); - - log.info("Start Data Read"); - - - readData( scope, scopes ); - log.info("Finish Data Read"); - - runSelectedQueries( scope, scopes ); - - } - - - private List loadData(final ApplicationScope applicationScope) throws InterruptedException { - - long time = new Date().getTime(); - - List scopes = new ArrayList(); - List threads = new ArrayList(); - - - for ( int j = 0; j < appCount; j++ ) { - - String appName = "app-" + j + "-" + time; - Id appId = new SimpleId( appName ); - IndexScope indexScope = new IndexScopeImpl( appId, "reviews"); - scopes.add( indexScope ); - - Thread t = new Thread( new DataLoader( applicationScope, indexScope ) ); - t.start(); - threads.add( t ); - } - - // wait for indexing to end - for ( Thread t : threads ) { - t.join(); - } - - return scopes; - } - - - private void readData(final ApplicationScope applicationScope, List scopes ) throws InterruptedException { - - List threads = new ArrayList(); - for ( IndexScope scope : scopes ) { - - Thread t = new Thread( new DataReader( applicationScope, scope )); - t.start(); - threads.add(t); - } - - // wait for reading to end - for ( Thread t : threads ) { - t.join(); - } - } - - - static class DataReader implements Runnable { - final ApplicationScope scope; - final IndexScope indexScope; - - public DataReader( final ApplicationScope scope, IndexScope indexScope ) { - this.scope = scope; - this.indexScope = indexScope; - } - - public void run() { - - EntityIndex eci = ecif.createEntityIndex( scope); - EntityCollectionManager ecm = ecmf.createCollectionManager( new CollectionScopeImpl( scope.getApplication(), indexScope.getOwner(), indexScope.getName() ) ); - - Query query = Query.fromQL( "review_score > 0"); // get all reviews; - query.withLimit( maxEntities < 1000 ? maxEntities : 1000 ); - - final SearchTypes searchType = SearchTypes.fromTypes( "review" ); - - CandidateResults candidateResults = eci.search(indexScope, searchType, query ); - int count = candidateResults.size(); - - while ( candidateResults.hasCursor() && count < maxEntities ) { - query.setCursor( candidateResults.getCursor() ) ; - candidateResults = eci.search(indexScope, searchType, query ); - count += candidateResults.size(); - - //cause retrieval from cassandra - EntityResults entityResults = new EntityResults( - candidateResults, ecm, UUIDGenerator.newTimeUUID() ); - - while(entityResults.hasNext()){ - entityResults.next(); - } - - log.info("Read {} reviews in {} / {} ", new Object[] { - count, indexScope.getOwner(), indexScope.getName() } ); - } - } - } - - - static class DataLoader implements Runnable { - final ApplicationScope applicationScope; - final IndexScope indexScope; - - public DataLoader( final ApplicationScope applicationScope, IndexScope indexScope ) { - this.applicationScope = applicationScope; - this.indexScope = indexScope; - } - - public void run() { - - CollectionScope collectionScope = new CollectionScopeImpl( - applicationScope.getApplication(), indexScope.getOwner(), indexScope.getName() ); - EntityCollectionManager ecm = ecmf.createCollectionManager(collectionScope ); - EntityIndex eci = ecif.createEntityIndex(applicationScope ); - - FileReader fr; - try { - fr = new FileReader("../../resources/finefoods.txt"); - } catch (FileNotFoundException ex) { - throw new RuntimeException("Error opening file", ex); - } - BufferedReader br = new BufferedReader(fr); - String s = null; - - // create the first entry - Entity current = new Entity( - new SimpleId(UUIDGenerator.newTimeUUID(), "review")); - -// Id orgId = orgAppScope.scope.getApplication(); -// Id appId = orgAppScope.scope.getOwner(); - - int count = 0; - - EntityIndexBatch entityIndexBatch = eci.createBatch(); - - try { - while ( (s = br.readLine()) != null && count < maxEntities ) { - - try { - - if ( s.trim().equals("")) { // then we are at end of a record - - // write and index current entity - ecm.write( current ).toBlocking().last(); - - entityIndexBatch.index(indexScope, current ); - - if ( maxEntities < 20 ) { - log.info("Index written for {}", current.getId()); - log.info("---"); - } - - // create the next entity - current = new Entity( - new SimpleId(UUIDGenerator.newTimeUUID(), "review")); - - count++; - if(count % 1000 == 0){ - entityIndexBatch.execute().get(); - } - - if (count % 100000 == 0) { - log.info("Indexed {} reviews in {} / {} ", - new Object[] { - count, - applicationScope, - indexScope.getOwner() } ); - } - continue; - } - - // process a field - String name = s.substring( 0, s.indexOf(":")).replace("/", "_").toLowerCase() ; - String value = s.substring( s.indexOf(":") + 1 ).trim(); - - if ( maxEntities < 20 ) { - log.info("Indexing {} = {}", name, value); - } - - if ( NumberUtils.isNumber(value) && value.contains(".")) { - current.setField( new DoubleField( name, Double.parseDouble(value))); - - } else if ( NumberUtils.isNumber(value) ) { - current.setField( new LongField( name, Long.parseLong(value))); - - } else { - current.setField( new StringField( name, value.toString() )); - } - - } catch ( Exception e ) { - log.info("Error on line " + count); - } - } - - } catch (IOException ex) { - throw new RuntimeException("Error reading file", ex); - } - - eci.refresh(); - } - } - - - public void runSelectedQueries(final ApplicationScope scope, List indexScopes ) { - - for ( IndexScope indexScope : indexScopes ) { - EntityIndex eci = ecif.createEntityIndex(scope ); - - // TODO: come up with more and more complex queries for CorePerformanceIT - - query(indexScope, eci, "product_productid = 'B006K2ZZ7K'") ; - query(indexScope, eci, "review_profilename = 'Twoapennything'") ; - query(indexScope, eci, "review_profilename contains 'Natalia'") ; - query(indexScope, eci, "review_profilename contains 'Patrick'") ; - query(indexScope, eci, "review_time = 1342051200") ; - query(indexScope, eci, "review_time > 1342051200") ; - query(indexScope, eci, "review_score > 0"); - query(indexScope, eci, "review_score > 2"); - query(indexScope, eci, "review_score > 3"); - query(indexScope, eci, "review_score > 4"); - query(indexScope, eci, "review_score > 5"); - } - } - - public static void query(final IndexScope indexScope, final EntityIndex eci, final String query ) {; - Query q = Query.fromQL(query) ; -// CandidateResults candidateResults = eci.search(indexScope, q ); TODO FIXME -// log.info("size = {} returned from query {}", candidateResults.size(), q.getQl() ); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9a787238/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java new file mode 100644 index 0000000..c962d6b --- /dev/null +++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.index.impl; + + +import java.util.List; +import java.util.UUID; + +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; +import org.apache.usergrid.persistence.core.test.UseModules; +import org.apache.usergrid.persistence.index.EntityIndex; +import org.apache.usergrid.persistence.index.EntityIndexBatch; +import org.apache.usergrid.persistence.index.EntityIndexFactory; +import org.apache.usergrid.persistence.index.IndexScope; +import org.apache.usergrid.persistence.index.guice.IndexTestFig; +import org.apache.usergrid.persistence.index.guice.TestIndexModule; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.model.entity.SimpleId; +import org.apache.usergrid.persistence.model.field.IntegerField; +import org.apache.usergrid.persistence.model.util.UUIDGenerator; + +import com.google.inject.Inject; + +import rx.Observable; +import rx.functions.Action1; +import rx.functions.Action2; +import rx.functions.Func1; +import rx.schedulers.Schedulers; + + +/** + * TODO: make CorePerformanceIT configurable, add CHOP markup. + */ +@RunWith( EsRunner.class ) +@UseModules( { TestIndexModule.class } ) +public class IndexLoadTestsIT extends BaseIT { + private static final Logger log = LoggerFactory.getLogger( IndexLoadTestsIT.class ); + + @ClassRule + public static ElasticSearchResource es = new ElasticSearchResource(); + + + @Inject + public IndexTestFig indexTestFig; + + @Inject + public EntityIndexFactory entityIndexFactory; + + @Test + public void testHeavyLoad(){ + + final UUID applicationUUID = UUID.fromString( indexTestFig.getApplicationId() ); + + final Id applicationId = new SimpleId(applicationUUID, "application"); + final ApplicationScope scope = new ApplicationScopeImpl( applicationId ); + + final EntityIndex index = entityIndexFactory.createEntityIndex( scope ); + + //create our index if it doesn't exist + index.initializeIndex(); + + final Observable createEntities = createStreamFromWorkers( index, applicationId ); + + //run them all + createEntities.toBlocking().last(); + + + + + } + + public Observable createStreamFromWorkers(final EntityIndex entityIndex, final Id ownerId){ + + //create a sequence of observables. Each index will be it's own worker thread using the Schedulers.newthread() + return Observable.range( 0, indexTestFig.getNumberOfWorkers() ).parallel( new Func1, Observable>() { + + + @Override + public Observable call( final Observable integerObservable ) { + return integerObservable.flatMap( new Func1>() { + @Override + public Observable call( final Integer integer ) { + return createWriteObservable( entityIndex, ownerId, integer ); + } + } ); + + } + }, Schedulers.newThread() ); + } + + + private Observable createWriteObservable( final EntityIndex entityIndex, final Id ownerId, final int workerIndex){ + + + final IndexScope scope = new IndexScopeImpl( ownerId, "test" ); + + + + return Observable.range( 0, indexTestFig.getNumberOfRecords() ) + + //create our entity + .map( new Func1() { + @Override + public Entity call( final Integer integer ) { + final Entity entity = new Entity("test"); + + entity.setField( new IntegerField("workerIndex", workerIndex)); + entity.setField( new IntegerField( "ordinal", integer ) ); + + return entity; + } + } ).buffer( indexTestFig.getBufferSize() ).doOnNext( new Action1>() { + @Override + public void call( final List entities ) { + //take our entities and roll them into a batch + Observable.from( entities ).collect( entityIndex.createBatch(), new Action2() { + + + @Override + public void call( final EntityIndexBatch entityIndexBatch, final Entity entity ) { + entityIndexBatch.index(scope, entity ); + } + } ).doOnNext( new Action1() { + @Override + public void call( final EntityIndexBatch entityIndexBatch ) { + entityIndexBatch.execute(); + } + } ).toBlocking().last(); + } + } ) + + //translate back into a stream of entities for the caller to use + .flatMap( new Func1, Observable>() { + @Override + public Observable call( final List entities ) { + return Observable.from( entities ); + } + } ); + + } + + + + +}