Return-Path: X-Original-To: apmail-usergrid-commits-archive@minotaur.apache.org Delivered-To: apmail-usergrid-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EA2D0183E2 for ; Fri, 30 Oct 2015 23:32:36 +0000 (UTC) Received: (qmail 20176 invoked by uid 500); 30 Oct 2015 23:32:36 -0000 Delivered-To: apmail-usergrid-commits-archive@usergrid.apache.org Received: (qmail 20127 invoked by uid 500); 30 Oct 2015 23:32:36 -0000 Mailing-List: contact commits-help@usergrid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@usergrid.apache.org Delivered-To: mailing list commits@usergrid.apache.org Received: (qmail 19210 invoked by uid 99); 30 Oct 2015 23:32:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 30 Oct 2015 23:32:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 571E4E07EE; Fri, 30 Oct 2015 23:32:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: toddnine@apache.org To: commits@usergrid.apache.org Date: Fri, 30 Oct 2015 23:33:04 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [30/50] [abbrv] usergrid git commit: -Convert search in ES to use term queries instead of scroll with filters. -Fix delete so we remove connections from Elasticsearch as well as the entity on entity delete event. -Convert search in ES to use term queries instead of scroll with filters. -Fix delete so we remove connections from Elasticsearch as well as the entity on entity delete event. Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/22beca2c Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/22beca2c Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/22beca2c Branch: refs/heads/master Commit: 22beca2cb3714833ddf6a748522c6bead17fe9f2 Parents: 2686054 Author: Michael Russo Authored: Wed Oct 28 15:58:41 2015 -0700 Committer: Michael Russo Committed: Wed Oct 28 15:58:41 2015 -0700 ---------------------------------------------------------------------- .../corepersistence/index/IndexServiceImpl.java | 2 +- .../persistence/index/CandidateResult.java | 11 +- .../persistence/index/EntityIndexBatch.java | 2 +- .../usergrid/persistence/index/IndexFig.java | 6 + .../index/impl/DeIndexOperation.java | 5 + .../index/impl/EsEntityIndexBatchImpl.java | 34 ++++- .../index/impl/EsEntityIndexImpl.java | 149 +++++++++++-------- .../persistence/index/impl/IndexingUtils.java | 2 +- 8 files changed, 137 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/22beca2c/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java index ccb6221..b2a1a2a 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java @@ -219,7 +219,7 @@ public class IndexServiceImpl implements IndexService { //collect results into a single batch .collect( () -> ei.createBatch(), ( batch, candidateResult ) -> { logger.debug( "Deindexing on edge {} for entity {} added to batch",searchEdge , entityId ); - batch.deindex( searchEdge, candidateResult ); + batch.deindex( candidateResult ); } ) //return the future from the batch execution .flatMap( batch ->Observable.just(batch.build()) ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/22beca2c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResult.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResult.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResult.java index 2375509..9b1898c 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResult.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/CandidateResult.java @@ -32,10 +32,12 @@ import org.apache.usergrid.persistence.model.entity.Id; public class CandidateResult implements EntityVersion { private final Id entityId; private final UUID entityVersion; + private final String docId; - public CandidateResult( Id entityId, UUID entityVersion ) { + public CandidateResult( Id entityId, UUID entityVersion, String docId ) { this.entityId = entityId; this.entityVersion = entityVersion; + this.docId = docId; } @Override @@ -48,6 +50,10 @@ public class CandidateResult implements EntityVersion { return entityId; } + public String getDocId() { + return docId; + } + @Override public boolean equals( final Object o ) { @@ -66,6 +72,9 @@ public class CandidateResult implements EntityVersion { if ( !entityVersion.equals( that.entityVersion ) ) { return false; } + if ( !docId.equals( that.docId ) ) { + return false; + } return true; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/22beca2c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java index 98652c1..17dd4d3 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java @@ -63,7 +63,7 @@ public interface EntityIndexBatch { EntityIndexBatch deindex( final SearchEdge searchEdge, final Id id, final UUID version ); - + EntityIndexBatch deindex( final CandidateResult candidateResult); /** * get the batches http://git-wip-us.apache.org/repos/asf/usergrid/blob/22beca2c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java index e093d7d..33f199a 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java @@ -60,6 +60,8 @@ public interface IndexFig extends GuicyFig { String ELASTICSEARCH_CLIENT_TYPE = "elasticsearch.client.type"; + String ELASTICSEARCH_VERSION_QUERY_LIMIT = "elasticsearch.version_query_limit"; + /** * Comma-separated list of Elasticsearch hosts. @@ -199,4 +201,8 @@ public interface IndexFig extends GuicyFig { @Default("1000") @Key( "elasticsearch_queue_error_sleep_ms" ) long getSleepTimeForQueueError(); + + @Default("1000") + @Key( ELASTICSEARCH_VERSION_QUERY_LIMIT ) + int getVersionQueryLimit(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/22beca2c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java index 4060dac..dbecf8a 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java @@ -61,6 +61,11 @@ public class DeIndexOperation implements BatchOperation { this.documentId = createIndexDocId( applicationScope, id, version, searchEdge ); } + public DeIndexOperation( String[] indexes, String docId) { + this.indexes = indexes; + this.documentId = docId; + } + @Override public void doOperation( final Client client, final BulkRequestBuilder bulkRequest ) { http://git-wip-us.apache.org/repos/asf/usergrid/blob/22beca2c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java index 68830ca..a8fb751 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java @@ -84,23 +84,42 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { @Override public EntityIndexBatch deindex( final SearchEdge searchEdge, final Id id, final UUID version ) { - IndexValidationUtils.validateSearchEdge( searchEdge ); - ValidationUtils.verifyIdentity( id ); + IndexValidationUtils.validateSearchEdge(searchEdge); + ValidationUtils.verifyIdentity(id); ValidationUtils.verifyVersion( version ); String[] indexes = entityIndex.getIndexes(); //get the default index if no alias exists yet if ( indexes == null || indexes.length == 0 ) { - throw new IllegalStateException("No indexes exist for " + indexLocationStrategy.getAlias().getWriteAlias()); + throw new IllegalStateException("No indexes exist for " + indexLocationStrategy.getAlias().getWriteAlias()); } if ( log.isDebugEnabled() ) { log.debug( "Deindexing to indexes {} with scope {} on edge {} with id {} and version {} ", - new Object[] { indexes, applicationScope, searchEdge, id, version } ); + new Object[] { indexes, applicationScope, searchEdge, id, version } ); } - container.addDeIndexRequest( new DeIndexOperation( indexes, applicationScope, searchEdge, id, version ) ); + container.addDeIndexRequest(new DeIndexOperation(indexes, applicationScope, searchEdge, id, version)); + + return this; + } + + public EntityIndexBatch deindexWithDocId( final String docId ) { + + String[] indexes = entityIndex.getIndexes(); + //get the default index if no alias exists yet + if ( indexes == null || indexes.length == 0 ) { + throw new IllegalStateException("No indexes exist for " + indexLocationStrategy.getAlias().getWriteAlias()); + } + + if ( log.isDebugEnabled() ) { + log.debug( "Deindexing to indexes {} with with documentId {} ", + new Object[] { indexes, docId } ); + } + + + container.addDeIndexRequest( new DeIndexOperation( indexes, docId ) ); return this; } @@ -117,6 +136,11 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { return deindex( searchEdge, entity.getId(), entity.getVersion() ); } + @Override + public EntityIndexBatch deindex( final CandidateResult entity ) { + + return deindexWithDocId(entity.getDocId()); + } @Override http://git-wip-us.apache.org/repos/asf/usergrid/blob/22beca2c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java index 9be591e..0b7f2d7 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java @@ -65,6 +65,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.metrics.sum.Sum; import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder; +import org.elasticsearch.search.sort.SortOrder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -389,7 +390,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { final ParsedQuery parsedQuery = ParsedQueryBuilder.build(query); final SearchRequestBuilder srb = searchRequest.getBuilder( searchEdge, searchTypes, parsedQuery, limit, offset ) - .setTimeout( TimeValue.timeValueMillis( queryTimeout ) ); + .setTimeout(TimeValue.timeValueMillis(queryTimeout)); if ( logger.isDebugEnabled() ) { logger.debug( "Searching index (read alias): {}\n nodeId: {}, edgeType: {}, \n type: {}\n query: {} ", @@ -427,57 +428,62 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { */ IndexValidationUtils.validateSearchEdge(edge); - Preconditions.checkNotNull( entityId, "entityId cannot be null" ); + Preconditions.checkNotNull(entityId, "entityId cannot be null"); SearchResponse searchResponse; - List candidates = new ArrayList<>(); - final ParsedQuery parsedQuery = ParsedQueryBuilder.build( "select *" ); - - final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder(); - - //I can't just search on the entity Id. + //never let the limit be less than 2 as there are potential indefinite paging issues + final int searchLimit = Math.max(2, indexFig.getVersionQueryLimit()); - FilterBuilder entityEdgeFilter = FilterBuilders.termFilter(IndexingUtils.EDGE_NODE_ID_FIELDNAME, - IndexingUtils.nodeId(edge.getNodeId())); + final QueryBuilder entityQuery = QueryBuilders + .termQuery(IndexingUtils.EDGE_NODE_ID_FIELDNAME, IndexingUtils.nodeId(edge.getNodeId())); - srb.setPostFilter(entityEdgeFilter); + final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder(); if ( logger.isDebugEnabled() ) { - logger.debug( "Searching for marked versions in index (read alias): {}\n nodeId: {},\n query: {} ", + logger.debug( "Searching for edges in (read alias): {}\n nodeId: {},\n query: {} ", this.alias.getReadAlias(),entityId, srb ); } try { - //Added For Graphite Metrics - - //set the timeout on the scroll cursor to 6 seconds and set the number of values returned per shard to 100. - //The settings for the scroll aren't tested and so we aren't sure what vlaues would be best in a production enviroment - //TODO: review this and make them not magic numbers when acking this PR. - searchResponse = srb.setScroll( new TimeValue( 6000 ) ).setSize( 100 ).execute().actionGet(); + long queryTimestamp = 0L; while(true){ - //add search result hits to some sort of running tally of hits. - candidates = aggregateScrollResults( candidates, searchResponse, null ); - SearchScrollRequestBuilder ssrb = searchRequestBuilderStrategyV2 - .getScrollBuilder( searchResponse.getScrollId() ) - .setScroll( new TimeValue( 6000 ) ); + QueryBuilder timestampQuery = QueryBuilders + .rangeQuery(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME) + .gte(queryTimestamp); - //TODO: figure out how to log exactly what we're putting into elasticsearch - // if ( logger.isDebugEnabled() ) { - // logger.debug( "Scroll search using query: {} ", - // ssrb.toString() ); - // } + QueryBuilder finalQuery = QueryBuilders + .boolQuery() + .must(entityQuery) + .must(timestampQuery); - searchResponse = ssrb.execute().actionGet(); + searchResponse = srb + .setQuery(finalQuery) + .setSize(searchLimit) + .addSort(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME, SortOrder.ASC) + .execute() + .actionGet(); - if (searchResponse.getHits().getHits().length == 0) { + int responseSize = searchResponse.getHits().getHits().length; + if(responseSize == 0){ break; } + // update queryTimestamp to be the timestamp of the last entity returned from the query + queryTimestamp = (long) searchResponse + .getHits().getAt(responseSize - 1) + .getSource().get(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME); + + candidates = aggregateScrollResults(candidates, searchResponse, null); + + if(responseSize < searchLimit){ + + break; + } } } @@ -488,7 +494,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { } failureMonitor.success(); - return new CandidateResults( candidates, parsedQuery.getSelectFieldMappings()); + return new CandidateResults( candidates, Collections.EMPTY_SET); } @@ -496,61 +502,74 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { public CandidateResults getAllEntityVersionsBeforeMarkedVersion( final Id entityId, final UUID markedVersion ) { Preconditions.checkNotNull( entityId, "entityId cannot be null" ); - Preconditions.checkNotNull( markedVersion, "markedVersion cannot be null" ); + Preconditions.checkNotNull(markedVersion, "markedVersion cannot be null"); ValidationUtils.verifyVersion(markedVersion); SearchResponse searchResponse; - List candidates = new ArrayList<>(); - final ParsedQuery parsedQuery = ParsedQueryBuilder.build( "select *" ); + final long markedTimestamp = markedVersion.timestamp(); - final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder(); - - FilterBuilder entityIdFilter = FilterBuilders.termFilter(IndexingUtils.ENTITY_ID_FIELDNAME, - IndexingUtils.entityId(entityId)); + // never let the limit be less than 2 as there are potential indefinite paging issues + final int searchLimit = Math.max(2, indexFig.getVersionQueryLimit()); - srb.setPostFilter(entityIdFilter); + // this query will find the document for the entity itself + final QueryBuilder entityQuery = QueryBuilders + .termQuery(IndexingUtils.ENTITY_ID_FIELDNAME, IndexingUtils.entityId(entityId)); + // this query will find all the documents where this entity is a source/target node + final QueryBuilder nodeQuery = QueryBuilders + .termQuery(IndexingUtils.EDGE_NODE_ID_FIELDNAME, IndexingUtils.nodeId(entityId)); - - if ( logger.isDebugEnabled() ) { - logger.debug( "Searching for marked versions in index (read alias): {}\n nodeId: {},\n query: {} ", - this.alias.getReadAlias(),entityId, srb ); - } + final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder(); try { - //Added For Graphite Metrics - //set the timeout on the scroll cursor to 6 seconds and set the number of values returned per shard to 100. - //The settings for the scroll aren't tested and so we aren't sure what vlaues would be best in a production enviroment - //TODO: review this and make them not magic numbers when acking this PR. - searchResponse = srb.setScroll( new TimeValue( 6000 ) ).setSize( 100 ).execute().actionGet(); + long queryTimestamp = 0L; - //list that will hold all of the search hits + while(true){ + QueryBuilder timestampQuery = QueryBuilders + .rangeQuery(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME) + .gte(queryTimestamp) + .lte(markedTimestamp); + + QueryBuilder entityQueryWithTimestamp = QueryBuilders + .boolQuery() + .must(entityQuery) + .must(timestampQuery); + + QueryBuilder finalQuery = QueryBuilders + .boolQuery() + .should(entityQueryWithTimestamp) + .should(nodeQuery) + .minimumNumberShouldMatch(1); + + searchResponse = srb + .setQuery(finalQuery) + .setSize(searchLimit) + .addSort(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME, SortOrder.ASC) + .execute() + .actionGet(); - while(true){ - //add search result hits to some sort of running tally of hits. - candidates = aggregateScrollResults( candidates, searchResponse, markedVersion); + int responseSize = searchResponse.getHits().getHits().length; + if(responseSize == 0){ + break; + } + + candidates = aggregateScrollResults(candidates, searchResponse, markedVersion); - SearchScrollRequestBuilder ssrb = searchRequestBuilderStrategyV2 - .getScrollBuilder( searchResponse.getScrollId() ) - .setScroll( new TimeValue( 6000 ) ); + // update queryTimestamp to be the timestamp of the last entity returned from the query + queryTimestamp = (long) searchResponse + .getHits().getAt(responseSize - 1) + .getSource().get(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME); - //TODO: figure out how to log exactly what we're putting into elasticsearch -// if ( logger.isDebugEnabled() ) { -// logger.debug( "Scroll search using query: {} ", -// ssrb.toString() ); -// } - searchResponse = ssrb.execute().actionGet(); + if(responseSize < searchLimit){ - if (searchResponse.getHits().getHits().length == 0) { break; } - } } catch ( Throwable t ) { @@ -560,7 +579,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { } failureMonitor.success(); - return new CandidateResults( candidates, parsedQuery.getSelectFieldMappings()); + return new CandidateResults( candidates, Collections.EMPTY_SET); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/22beca2c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java index 9e06fa6..18cb928 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java @@ -241,7 +241,7 @@ public class IndexingUtils { Id entityId = new SimpleId( UUID.fromString( entityUUID ), entityType ); - return new CandidateResult( entityId, UUID.fromString( versionUUID ) ); + return new CandidateResult( entityId, UUID.fromString( versionUUID ), documentId ); }