Return-Path: X-Original-To: apmail-usergrid-commits-archive@minotaur.apache.org Delivered-To: apmail-usergrid-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E019F17558 for ; Tue, 3 Mar 2015 00:22:02 +0000 (UTC) Received: (qmail 4766 invoked by uid 500); 3 Mar 2015 00:21:50 -0000 Delivered-To: apmail-usergrid-commits-archive@usergrid.apache.org Received: (qmail 4658 invoked by uid 500); 3 Mar 2015 00:21:50 -0000 Mailing-List: contact commits-help@usergrid.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@usergrid.incubator.apache.org Delivered-To: mailing list commits@usergrid.incubator.apache.org Received: (qmail 4330 invoked by uid 99); 3 Mar 2015 00:21:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Mar 2015 00:21:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D5C3AE1023; Tue, 3 Mar 2015 00:21:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: grey@apache.org To: commits@usergrid.apache.org Date: Tue, 03 Mar 2015 00:22:03 -0000 Message-Id: <7ada688281214e28a695556b9d717071@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [15/27] incubator-usergrid git commit: change config change config changing how core uses index promises change future implementation adding promises back Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8eb09e04 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8eb09e04 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8eb09e04 Branch: refs/heads/USERGRID-280 Commit: 8eb09e04cd0a4d47719b0d6dd403b066bfafeecb Parents: 2145f21 Author: Shawn Feldman Authored: Wed Feb 25 10:30:52 2015 -0700 Committer: Shawn Feldman Committed: Wed Feb 25 11:45:30 2015 -0700 ---------------------------------------------------------------------- .../corepersistence/CpEntityManager.java | 7 ++-- .../corepersistence/CpRelationManager.java | 13 +++--- .../results/FilteringLoader.java | 42 +++++++++---------- .../batch/job/AbstractSchedulerRuntimeIT.java | 2 +- .../persistence/core/future/BetterFuture.java | 13 +++--- .../persistence/index/EntityIndexBatch.java | 2 + .../usergrid/persistence/index/IndexFig.java | 16 ++++++- .../index/impl/CorePerformanceIT.java | 44 ++++++++++---------- .../impl/EntityConnectionIndexImplTest.java | 6 +-- 9 files changed, 82 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 721ac80..7905c43 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -33,6 +33,7 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; +import org.apache.usergrid.persistence.core.future.BetterFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; @@ -975,11 +976,11 @@ public class CpEntityManager implements EntityManager { } ); - ei.createBatch().index( defaultIndexScope, cpEntity ).execute(); - + BetterFuture future = ei.createBatch().index( defaultIndexScope, cpEntity ).execute(); // update in all containing collections and connection indexes CpRelationManager rm = ( CpRelationManager ) getRelationManager( entityRef ); rm.updateContainingCollectionAndCollectionIndexes( cpEntity ); + future.get(); } @@ -2829,7 +2830,7 @@ public class CpEntityManager implements EntityManager { // // batch.index(appAllTypesScope, memberEntity); - batch.execute(); + batch.execute().get(); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index 7be6dea..07fc45e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.usergrid.persistence.core.future.BetterFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; @@ -441,7 +442,7 @@ public class CpRelationManager implements RelationManager { } ).count().toBlocking().lastOrDefault( 0 ); - entityIndexBatch.execute(); + entityIndexBatch.execute().get(); logger.debug( "updateContainingCollectionsAndCollections() updated {} indexes", count ); } @@ -834,7 +835,7 @@ public class CpRelationManager implements RelationManager { batch.deindex( itemScope, cpHeadEntity ); - batch.execute(); + BetterFuture future = batch.execute(); // remove edge from collection to item GraphManager gm = managerCache.getGraphManager( applicationScope ); @@ -870,9 +871,9 @@ public class CpRelationManager implements RelationManager { } } } + future.get(); } - @Override public void copyRelationships(String srcRelationName, EntityRef dstEntityRef, String dstRelationName) throws Exception { @@ -1060,13 +1061,15 @@ public class CpRelationManager implements RelationManager { // batch.index( allTypesIndexScope, targetEntity ); - batch.execute(); + BetterFuture future = batch.execute(); Keyspace ko = cass.getApplicationKeyspace( applicationId ); Mutator m = createMutator( ko, be ); batchUpdateEntityConnection( m, false, connection, UUIDGenerator.newTimeUUID() ); batchExecute( m, CassandraService.RETRY_COUNT ); + future.get(); + return connection; } @@ -1291,7 +1294,7 @@ public class CpRelationManager implements RelationManager { // // batch.deindex( allTypesIndexScope, targetEntity ); - batch.execute(); + batch.execute().get(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java index dca59e0..7848be5 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java @@ -71,10 +71,10 @@ public class FilteringLoader implements ResultsLoader { * @param applicationScope The application scope to perform the load * @param indexScope The index scope used in the search */ - protected FilteringLoader( - final ManagerCache managerCache, - final ResultsVerifier resultsVerifier, - final ApplicationScope applicationScope, + protected FilteringLoader( + final ManagerCache managerCache, + final ResultsVerifier resultsVerifier, + final ApplicationScope applicationScope, final IndexScope indexScope ) { this.managerCache = managerCache; @@ -103,25 +103,25 @@ public class FilteringLoader implements ResultsLoader { // Maps the entity ids to our candidates final Map maxCandidateMapping = new HashMap<>( crs.size() ); - // Groups all candidate results by types. When search connections there will be multiple + // Groups all candidate results by types. When search connections there will be multiple // types, so we want to batch fetch them more efficiently - - final HashMultimap groupedByScopes = + + final HashMultimap groupedByScopes = HashMultimap.create( crs.size(), crs.size() ); final Iterator iter = crs.iterator(); - // TODO, in this case we're "optimizing" due to the limitations of collection scope. + // TODO, in this case we're "optimizing" due to the limitations of collection scope. // Perhaps we should change the API to just be an application, then an "owner" scope? - // Go through the candidates and group them by scope for more efficient retrieval. + // Go through the candidates and group them by scope for more efficient retrieval. // Also remove duplicates before we even make a network call for ( int i = 0; iter.hasNext(); i++ ) { final CandidateResult currentCandidate = iter.next(); - final String collectionType = CpNamingUtils.getCollectionScopeNameFromEntityType( + final String collectionType = CpNamingUtils.getCollectionScopeNameFromEntityType( currentCandidate.getId().getType() ); final Id entityId = currentCandidate.getId(); @@ -147,11 +147,11 @@ public class FilteringLoader implements ResultsLoader { if ( UUIDComparator.staticCompare( currentVersion, previousMaxVersion ) > 0 ) { //de-index it - logger.debug( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", - new Object[] { - entityId.getUuid(), - entityId.getType(), - previousMaxVersion, + logger.debug( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", + new Object[] { + entityId.getUuid(), + entityId.getType(), + previousMaxVersion, currentVersion } ); //deindex this document, and remove the previous maxVersion @@ -170,7 +170,7 @@ public class FilteringLoader implements ResultsLoader { } - //now everything is ordered, and older versions are removed. Batch fetch versions to verify + //now everything is ordered, and older versions are removed. Batch fetch versions to verify // existence and correct versions final TreeMap sortedResults = new TreeMap<>(); @@ -193,10 +193,10 @@ public class FilteringLoader implements ResultsLoader { //now using the scope, load the collection - // Get the collection scope and batch load all the versions. We put all entities in - // app/app for easy retrieval/ unless persistence changes, we never want to read from + // Get the collection scope and batch load all the versions. We put all entities in + // app/app for easy retrieval/ unless persistence changes, we never want to read from // any scope other than the app, app, scope name scope - final CollectionScope collScope = new CollectionScopeImpl( + final CollectionScope collScope = new CollectionScopeImpl( applicationScope.getApplication(), applicationScope.getApplication(), scopeName); final EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collScope); @@ -225,7 +225,7 @@ public class FilteringLoader implements ResultsLoader { } - // NOTE DO NOT execute the batch here. + // NOTE DO NOT execute the batch here. // It changes the results and we need consistent paging until we aggregate all results return resultsVerifier.getResults( sortedResults.values() ); } @@ -233,7 +233,7 @@ public class FilteringLoader implements ResultsLoader { @Override public void postProcess() { - this.indexBatch.execute(); + this.indexBatch.execute().get(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/core/src/test/java/org/apache/usergrid/batch/job/AbstractSchedulerRuntimeIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/batch/job/AbstractSchedulerRuntimeIT.java b/stack/core/src/test/java/org/apache/usergrid/batch/job/AbstractSchedulerRuntimeIT.java index 8825497..81836ae 100644 --- a/stack/core/src/test/java/org/apache/usergrid/batch/job/AbstractSchedulerRuntimeIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/batch/job/AbstractSchedulerRuntimeIT.java @@ -96,7 +96,7 @@ public class AbstractSchedulerRuntimeIT { JobSchedulerService jobScheduler = springResource.getBean( JobSchedulerService.class ); jobScheduler.setJobListener( listener ); if ( jobScheduler.state() != State.RUNNING ) { - jobScheduler.startAndWait(); +// jobScheduler.startAndWait(); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java index 6146fe8..201fa9a 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java @@ -22,21 +22,22 @@ import java.util.concurrent.FutureTask; /** * Future without the exception nastiness */ -public class BetterFuture{ - FutureTask future; +public class BetterFuture extends FutureTask { public BetterFuture(Callable callable){ - future = new FutureTask<>(callable); + super(callable); } public void done(){ - future.run(); + run(); } public T get(){ try { - return future.get(); + return super.get(); }catch (Exception e){ throw new RuntimeException(e); } } -} \ No newline at end of file + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java index a02d0da..bf606bc 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java @@ -64,11 +64,13 @@ public interface EntityIndexBatch { /** * Execute the batch + * @return future to guarantee execution */ public BetterFuture execute(); /** * Execute the batch and force the refresh + * @return future to guarantee execution */ public BetterFuture executeAndRefresh(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java index 9bdac36..7434b99 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java @@ -121,15 +121,27 @@ public interface IndexFig extends GuicyFig { @Default("2") int getIndexCacheMaxWorkers(); + /** + * how long to wait before the buffer flushes to send + * @return + */ @Default("250") @Key( INDEX_BUFFER_TIMEOUT ) int getIndexBufferTimeout(); - @Default("100") + /** + * size of the buffer to build up before you send results + * @return + */ + @Default("300") @Key( INDEX_BUFFER_SIZE ) int getIndexBufferSize(); - @Default("300") + /** + * Request batch size for ES + * @return + */ + @Default("100") @Key( INDEX_BATCH_SIZE) int getIndexBatchSize(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java index 6d09ccb..c1bfe38 100644 --- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java +++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java @@ -72,12 +72,12 @@ public class CorePerformanceIT extends BaseIT { public static ElasticSearchResource es = new ElasticSearchResource(); // max entities we will write and read - static int maxEntities = 10; // TODO: make this configurable when you add Chop + static int maxEntities = 10; // TODO: make this configurable when you add Chop // each app will get all data static int appCount = 10; - // number of threads = orgCount x appCount + // number of threads = orgCount x appCount // total number of records = orgCount x appCount x numRecords @@ -191,14 +191,14 @@ public class CorePerformanceIT extends BaseIT { count += candidateResults.size(); //cause retrieval from cassandra - EntityResults entityResults = new EntityResults( + EntityResults entityResults = new EntityResults( candidateResults, ecm, UUIDGenerator.newTimeUUID() ); while(entityResults.hasNext()){ entityResults.next(); } - log.info("Read {} reviews in {} / {} ", new Object[] { + log.info("Read {} reviews in {} / {} ", new Object[] { count, indexScope.getOwner(), indexScope.getName() } ); } } @@ -216,7 +216,7 @@ public class CorePerformanceIT extends BaseIT { public void run() { - CollectionScope collectionScope = new CollectionScopeImpl( + CollectionScope collectionScope = new CollectionScopeImpl( applicationScope.getApplication(), indexScope.getOwner(), indexScope.getName() ); EntityCollectionManager ecm = ecmf.createCollectionManager(collectionScope ); EntityIndex eci = ecif.createEntityIndex(applicationScope ); @@ -232,7 +232,7 @@ public class CorePerformanceIT extends BaseIT { // create the first entry Entity current = new Entity( - new SimpleId(UUIDGenerator.newTimeUUID(), "review")); + new SimpleId(UUIDGenerator.newTimeUUID(), "review")); // Id orgId = orgAppScope.scope.getApplication(); // Id appId = orgAppScope.scope.getOwner(); @@ -243,54 +243,54 @@ public class CorePerformanceIT extends BaseIT { try { while ( (s = br.readLine()) != null && count < maxEntities ) { - + try { - + if ( s.trim().equals("")) { // then we are at end of a record - + // write and index current entity ecm.write( current ).toBlocking().last(); entityIndexBatch.index(indexScope, current ); - + if ( maxEntities < 20 ) { log.info("Index written for {}", current.getId()); log.info("---"); } - + // create the next entity current = new Entity( new SimpleId(UUIDGenerator.newTimeUUID(), "review")); - + count++; if(count % 1000 == 0){ - entityIndexBatch.execute(); + entityIndexBatch.execute().get(); } if (count % 100000 == 0) { - log.info("Indexed {} reviews in {} / {} ", - new Object[] { - count, + log.info("Indexed {} reviews in {} / {} ", + new Object[] { + count, applicationScope, indexScope.getOwner() } ); } continue; } - + // process a field String name = s.substring( 0, s.indexOf(":")).replace("/", "_").toLowerCase() ; String value = s.substring( s.indexOf(":") + 1 ).trim(); - + if ( maxEntities < 20 ) { log.info("Indexing {} = {}", name, value); } - + if ( NumberUtils.isNumber(value) && value.contains(".")) { current.setField( new DoubleField( name, Double.parseDouble(value))); - + } else if ( NumberUtils.isNumber(value) ) { current.setField( new LongField( name, Long.parseLong(value))); - + } else { current.setField( new StringField( name, value.toString() )); } @@ -306,7 +306,7 @@ public class CorePerformanceIT extends BaseIT { eci.refresh(); } - } + } public void runSelectedQueries(final ApplicationScope scope, List indexScopes ) { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java index b07bd21..c65f106 100644 --- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java +++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java @@ -136,7 +136,7 @@ public class EntityConnectionIndexImplTest extends BaseIT { batch.index( searchScope, oj ); batch.index( otherIndexScope, oj ); - batch.executeAndRefresh(); + batch.executeAndRefresh().get(); personLikesIndex.refresh(); @@ -267,7 +267,7 @@ public class EntityConnectionIndexImplTest extends BaseIT { batch.index( searchScope, oj ); batch.index( otherIndexScope, oj ); - batch.executeAndRefresh(); + batch.executeAndRefresh().get(); personLikesIndex.refresh(); EsTestUtils.waitForTasks( personLikesIndex ); @@ -287,7 +287,7 @@ public class EntityConnectionIndexImplTest extends BaseIT { batch.deindex( searchScope, egg ); batch.deindex( searchScope, muffin ); batch.deindex( searchScope, oj ); - batch.executeAndRefresh(); + batch.executeAndRefresh().get(); likes = personLikesIndex.search( searchScope, SearchTypes.fromTypes( muffin.getId().getType(), egg.getId().getType(), oj.getId().getType() ),