Return-Path: X-Original-To: apmail-usergrid-commits-archive@minotaur.apache.org Delivered-To: apmail-usergrid-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 91F6410A57 for ; Tue, 25 Nov 2014 20:49:42 +0000 (UTC) Received: (qmail 14294 invoked by uid 500); 25 Nov 2014 20:49:42 -0000 Delivered-To: apmail-usergrid-commits-archive@usergrid.apache.org Received: (qmail 14273 invoked by uid 500); 25 Nov 2014 20:49:42 -0000 Mailing-List: contact commits-help@usergrid.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@usergrid.incubator.apache.org Delivered-To: mailing list commits@usergrid.incubator.apache.org Received: (qmail 14264 invoked by uid 99); 25 Nov 2014 20:49:42 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Nov 2014 20:49:42 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 25430A1A5EC; Tue, 25 Nov 2014 20:49:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sfeldman@apache.org To: commits@usergrid.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: incubator-usergrid git commit: add index to alias methods Date: Tue, 25 Nov 2014 20:49:42 +0000 (UTC) Repository: incubator-usergrid Updated Branches: refs/heads/index-alias af16be295 -> b376cfd13 add index to alias methods Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b376cfd1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b376cfd1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b376cfd1 Branch: refs/heads/index-alias Commit: b376cfd13ef76d8f2011201c65a14585d2055023 Parents: af16be2 Author: Shawn Feldman Authored: Tue Nov 25 13:49:24 2014 -0700 Committer: Shawn Feldman Committed: Tue Nov 25 13:49:24 2014 -0700 ---------------------------------------------------------------------- .../usergrid/persistence/index/EntityIndex.java | 12 +++ .../persistence/index/IndexIdentifier.java | 8 +- .../index/impl/EsEntityIndexImpl.java | 95 +++++++++++--------- .../persistence/index/impl/EntityIndexTest.java | 83 +++++++++++------ 4 files changed, 127 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b376cfd1/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java index 88498b3..30dc3fc 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java @@ -37,11 +37,23 @@ public interface EntityIndex { public void initializeIndex(); /** + * Create an index and add to alias, will create alias and remove any old index from write alias if alias already exists + * @param indexSuffix index name + */ + public void createIndexAddToAlias(final String indexSuffix, final int numShards, final int numReplicas); + + /** * Create the index batch. */ public EntityIndexBatch createBatch(); /** + * Add alias to index, will remove old index from write alias + * @param indexSuffix must be different than current index + */ + public void addAlias(final String indexSuffix); + + /** * Execute query in Usergrid syntax. */ public CandidateResults search(final IndexScope indexScope, final SearchTypes searchType, Query query ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b376cfd1/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java index 82a6023..54a02d6 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java @@ -45,12 +45,12 @@ public class IndexIdentifier{ /** * Get index name, send in additional parameter to add incremental indexes - * @param version + * @param suffix * @return */ - public String getIndex(int version) { - if (version > 0) { - return getIndexBase() + "_v" + (version + 1); + public String getIndex(String suffix) { + if (suffix != null) { + return getIndexBase() + "_" + (suffix + 1); } else { return getIndexBase(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b376cfd1/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java index 540aec4..e7cbd2e 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java @@ -29,6 +29,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest; import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse; +import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; @@ -36,6 +37,8 @@ import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequestBuilder; import org.elasticsearch.client.AdminClient; +import org.elasticsearch.cluster.metadata.AliasMetaData; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -115,11 +118,8 @@ public class EsEntityIndexImpl implements EntityIndex { @Inject - public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config, - final EsProvider provider ) { - + public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config, final EsProvider provider ) { ValidationUtils.validateApplicationScope( appScope ); - this.applicationScope = appScope; this.esProvider = provider; this.config = config; @@ -129,58 +129,67 @@ public class EsEntityIndexImpl implements EntityIndex { this.failureMonitor = new FailureMonitorImpl( config, provider ); } - @Override public void initializeIndex() { + final int numberOfShards = config.getNumberOfShards(); + final int numberOfReplicas = config.getNumberOfReplicas(); + createIndexAddToAlias(null,numberOfShards,numberOfReplicas); + } + @Override + public void createIndexAddToAlias(final String indexSuffix, final int numberOfShards, final int numberOfReplicas) { try { - if ( !mappingsCreated.getAndSet( true ) ) { + + if (!mappingsCreated.getAndSet(true)) { createMappings(); } - createIndexAndAlias(); + //get index name with suffix attached + String indexName = indexIdentifier.getIndex(indexSuffix); - // create the document, this ensures the index is ready + //Create index + try { + final AdminClient admin = esProvider.getClient().admin(); + Settings settings = ImmutableSettings.settingsBuilder().put("index.number_of_shards", numberOfShards) + .put("index.number_of_replicas", numberOfReplicas).build(); + final CreateIndexResponse cir = admin.indices().prepareCreate(indexName).setSettings(settings).execute().actionGet(); + logger.info("Created new Index Name [{}] ACK=[{}]", indexName, cir.isAcknowledged()); + } catch (IndexAlreadyExistsException e) { + logger.info("Index Name [{}] already exists", indexName); + } - // Immediately create a document and remove it to ensure the entire cluster is ready - // to receive documents. Occasionally we see errors. - // See this post: http://s.apache.org/index-missing-exception + addAlias(indexSuffix); testNewIndex(); - } - catch ( IndexAlreadyExistsException expected ) { + } catch (IndexAlreadyExistsException expected) { // this is expected to happen if index already exists, it's a no-op and swallow - } - catch ( IOException e ) { - throw new RuntimeException( "Unable to initialize index", e ); + } catch (IOException e) { + throw new RuntimeException("Unable to initialize index", e); } } - /** - * Create the index and alias - */ - private void createIndexAndAlias() { - final int numberOfShards = config.getNumberOfShards(); - final int numberOfReplicas = config.getNumberOfReplicas(); - final AdminClient admin = esProvider.getClient().admin(); - - Settings settings = ImmutableSettings.settingsBuilder().put( "index.number_of_shards", numberOfShards) - .put( "index.number_of_replicas", numberOfReplicas ).build(); - //TODO:swallow exception, look into setting up routing rules - String indexVersionName = indexIdentifier.getIndex(0); + @Override + public void addAlias(final String indexSuffix) { try { - final CreateIndexResponse cir = admin.indices().prepareCreate(indexVersionName).setSettings(settings).execute().actionGet(); - logger.info("Created new Index Name [{}] ACK=[{}]", indexVersionName, cir.isAcknowledged()); - }catch(IndexAlreadyExistsException e){ - logger.info("Index Name [{}] already exists",indexVersionName); - } - try { //check if alias exists and get the alias - Boolean isAck = admin.indices().prepareAliases().addAlias(indexVersionName, alias.getReadAlias()).execute().actionGet().isAcknowledged(); - logger.info("Created new read Alias Name [{}] ACK=[{}]", alias, isAck); + Boolean isAck; + String indexName = indexIdentifier.getIndex(indexSuffix); + final AdminClient adminClient = esProvider.getClient().admin(); + //remove write alias, can only have one + ImmutableOpenMap> aliasMap = adminClient.indices().getAliases(new GetAliasesRequest(alias.getWriteAlias())).actionGet().getAliases(); + String[] indexNames = aliasMap.keys().toArray(String.class); + for(String currentIndex : indexNames){ + isAck = adminClient.indices().prepareAliases().removeAlias(currentIndex,alias.getWriteAlias()).execute().actionGet().isAcknowledged(); + logger.info("Removed Index Name [{}] from Alias=[{}] ACK=[{}]",currentIndex, alias, isAck); - isAck = admin.indices().prepareAliases().addAlias(indexVersionName, alias.getWriteAlias()).execute().actionGet().isAcknowledged(); + } + //add read alias + isAck = adminClient.indices().prepareAliases().addAlias(indexName, alias.getReadAlias()).execute().actionGet().isAcknowledged(); + logger.info("Created new read Alias Name [{}] ACK=[{}]", alias, isAck); + //add write alias + isAck = adminClient.indices().prepareAliases().addAlias(indexName, alias.getWriteAlias()).execute().actionGet().isAcknowledged(); logger.info("Created new write Alias Name [{}] ACK=[{}]", alias, isAck); + } catch (Exception e) { logger.warn("Failed to create alias ", e); } @@ -193,6 +202,10 @@ public class EsEntityIndexImpl implements EntityIndex { */ private void testNewIndex() { + // create the document, this ensures the index is ready + // Immediately create a document and remove it to ensure the entire cluster is ready + // to receive documents. Occasionally we see errors. + // See this post: http://s.apache.org/index-missing-exception logger.info( "Refreshing Created new Index Name [{}]", alias); @@ -430,7 +443,7 @@ public class EsEntityIndexImpl implements EntityIndex { public int getPendingTasks() { final PendingClusterTasksResponse tasksResponse = esProvider.getClient().admin() - .cluster().pendingClusterTasks( new PendingClusterTasksRequest() ).actionGet(); + .cluster().pendingClusterTasks(new PendingClusterTasksRequest()).actionGet(); return tasksResponse.pendingTasks().size(); } @@ -475,7 +488,7 @@ public class EsEntityIndexImpl implements EntityIndex { */ public void deleteIndex() { AdminClient adminClient = esProvider.getClient().admin(); - DeleteIndexResponse response = adminClient.indices().prepareDelete( indexIdentifier.getIndex(0) ).get(); + DeleteIndexResponse response = adminClient.indices().prepareDelete( indexIdentifier.getIndex(null) ).get(); if ( response.isAcknowledged() ) { logger.info( "Deleted index: " + alias); } @@ -519,7 +532,7 @@ public class EsEntityIndexImpl implements EntityIndex { try { ClusterHealthResponse chr = esProvider.getClient().admin() - .cluster().health( new ClusterHealthRequest() ).get(); + .cluster().health(new ClusterHealthRequest()).get(); return Health.valueOf( chr.getStatus().name() ); } catch ( Exception ex ) { @@ -539,7 +552,7 @@ public class EsEntityIndexImpl implements EntityIndex { try { ClusterHealthResponse chr = esProvider.getClient().admin().cluster() - .health(new ClusterHealthRequest(new String[]{indexIdentifier.getIndex(0)})) + .health(new ClusterHealthRequest(new String[]{indexIdentifier.getIndex(null)})) .get(); return Health.valueOf( chr.getStatus().name() ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b376cfd1/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java index f8bd30f..8ea8835 100644 --- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java +++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java @@ -76,25 +76,67 @@ public class EntityIndexTest extends BaseIT { @Test public void testIndex() throws IOException { - - final int MAX_ENTITIES = 100; - Id appId = new SimpleId( "application" ); ApplicationScope applicationScope = new ApplicationScopeImpl( appId ); - IndexScope indexScope = new IndexScopeImpl( appId, "things" ); + EntityIndex entityIndex = eif.createEntityIndex( applicationScope ); + entityIndex.initializeIndex(); final String entityType = "thing"; + IndexScope indexScope = new IndexScopeImpl( appId, "things" ); + final SearchTypes searchTypes = SearchTypes.fromTypes( entityType ); + insertJsonBlob(entityIndex, entityType, indexScope, "/sample-large.json",100,0); - final SearchTypes searchTypes = SearchTypes.fromTypes( entityType ); + entityIndex.refresh(); + testQueries( indexScope, searchTypes, entityIndex ); + } + + @Test + public void testMultipleIndexInitializations(){ + Id appId = new SimpleId( "application" ); + + ApplicationScope applicationScope = new ApplicationScopeImpl( appId ); EntityIndex entityIndex = eif.createEntityIndex( applicationScope ); entityIndex.initializeIndex(); + for(int i=0;i<10;i++) { + entityIndex.initializeIndex(); + } + + } + + @Test + public void testAddMultipleIndexes() throws IOException { + Id appId = new SimpleId( "application" ); - InputStream is = this.getClass().getResourceAsStream( "/sample-large.json" ); + ApplicationScope applicationScope = new ApplicationScopeImpl( appId ); + + EntityIndex entityIndex = eif.createEntityIndex( applicationScope ); + entityIndex.initializeIndex(); + + final String entityType = "thing"; + IndexScope indexScope = new IndexScopeImpl( appId, "things" ); + final SearchTypes searchTypes = SearchTypes.fromTypes( entityType ); + + insertJsonBlob(entityIndex, entityType, indexScope, "/sample-large.json",100,0); + + entityIndex.refresh(); + + testQueries( indexScope, searchTypes, entityIndex ); + + entityIndex.createIndexAddToAlias("v2",1,0); + + insertJsonBlob(entityIndex, entityType, indexScope, "/sample-large.json",100,100); + + testQueries( indexScope, searchTypes, entityIndex ); + + } + + private void insertJsonBlob(EntityIndex entityIndex, String entityType, IndexScope indexScope, String filePath,final int max,final int startIndex) throws IOException { + InputStream is = this.getClass().getResourceAsStream( filePath ); ObjectMapper mapper = new ObjectMapper(); List sampleJson = mapper.readValue( is, new TypeReference>() {} ); @@ -104,13 +146,19 @@ public class EntityIndexTest extends BaseIT { final EntityIndexBatch batch = entityIndex.createBatch(); + if(startIndex > 0){ + for(int i =0; i item = ( Map ) o; Entity entity = new Entity( entityType ); - entity = EntityIndexMapUtils.fromMap( entity, item ); - EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() ); + entity = EntityIndexMapUtils.fromMap(entity, item); + EntityUtils.setVersion(entity, UUIDGenerator.newTimeUUID()); batch.index( indexScope, entity ); @@ -120,7 +168,7 @@ public class EntityIndexTest extends BaseIT { - if ( count++ > MAX_ENTITIES ) { + if ( count++ > max ) { break; } } @@ -129,25 +177,8 @@ public class EntityIndexTest extends BaseIT { timer.stop(); log.info( "Total time to index {} entries {}ms, average {}ms/entry", new Object[] { count, timer.getTime(), timer.getTime() / count } ); - - entityIndex.refresh(); - - - testQueries( indexScope, searchTypes, entityIndex ); } - @Test - public void testMultipleIndexInitializations(){ - Id appId = new SimpleId( "application" ); - - ApplicationScope applicationScope = new ApplicationScopeImpl( appId ); - - EntityIndex entityIndex = eif.createEntityIndex( applicationScope ); - for(int i=0;i<10;i++) { - entityIndex.initializeIndex(); - } - - } @Test public void testDeindex() {