usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sfeld...@apache.org
Subject incubator-usergrid git commit: add index to alias methods
Date Tue, 25 Nov 2014 20:49:42 GMT
Repository: incubator-usergrid
Updated Branches:
  refs/heads/index-alias af16be295 -> b376cfd13


add index to alias methods


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b376cfd1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b376cfd1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b376cfd1

Branch: refs/heads/index-alias
Commit: b376cfd13ef76d8f2011201c65a14585d2055023
Parents: af16be2
Author: Shawn Feldman <sfeldman@apache.org>
Authored: Tue Nov 25 13:49:24 2014 -0700
Committer: Shawn Feldman <sfeldman@apache.org>
Committed: Tue Nov 25 13:49:24 2014 -0700

----------------------------------------------------------------------
 .../usergrid/persistence/index/EntityIndex.java | 12 +++
 .../persistence/index/IndexIdentifier.java      |  8 +-
 .../index/impl/EsEntityIndexImpl.java           | 95 +++++++++++---------
 .../persistence/index/impl/EntityIndexTest.java | 83 +++++++++++------
 4 files changed, 127 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b376cfd1/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
index 88498b3..30dc3fc 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
@@ -37,11 +37,23 @@ public interface EntityIndex {
     public void initializeIndex();
 
     /**
+     * Create an index and add to alias, will create alias and remove any old index from
write alias if alias already exists
+     * @param indexSuffix index name
+     */
+    public void createIndexAddToAlias(final String indexSuffix, final int numShards, final
int numReplicas);
+
+    /**
      * Create the index batch.
      */
     public EntityIndexBatch createBatch();
 
     /**
+     * Add alias to index, will remove old index from write alias
+     * @param indexSuffix must be different than current index
+     */
+    public void addAlias(final String indexSuffix);
+
+    /**
      * Execute query in Usergrid syntax.
      */
     public CandidateResults search(final IndexScope indexScope, final SearchTypes searchType,
Query query );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b376cfd1/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java
index 82a6023..54a02d6 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java
@@ -45,12 +45,12 @@ public class IndexIdentifier{
 
     /**
      * Get index name, send in additional parameter to add incremental indexes
-     * @param version
+     * @param suffix
      * @return
      */
-    public String getIndex(int version) {
-        if (version > 0) {
-            return getIndexBase() + "_v" + (version + 1);
+    public String getIndex(String suffix) {
+        if (suffix != null) {
+            return getIndexBase() + "_" + (suffix + 1);
         } else {
             return getIndexBase();
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b376cfd1/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 540aec4..e7cbd2e 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -29,6 +29,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
 import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
+import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
 import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
@@ -36,6 +37,8 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchScrollRequestBuilder;
 import org.elasticsearch.client.AdminClient;
+import org.elasticsearch.cluster.metadata.AliasMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -115,11 +118,8 @@ public class EsEntityIndexImpl implements EntityIndex {
 
 
     @Inject
-    public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config,
-                              final EsProvider provider ) {
-
+    public EsEntityIndexImpl( @Assisted final ApplicationScope appScope, final IndexFig config,
final EsProvider provider ) {
         ValidationUtils.validateApplicationScope( appScope );
-
         this.applicationScope = appScope;
         this.esProvider = provider;
         this.config = config;
@@ -129,58 +129,67 @@ public class EsEntityIndexImpl implements EntityIndex {
         this.failureMonitor = new FailureMonitorImpl( config, provider );
     }
 
-
     @Override
     public void initializeIndex() {
+        final int numberOfShards = config.getNumberOfShards();
+        final int numberOfReplicas = config.getNumberOfReplicas();
+        createIndexAddToAlias(null,numberOfShards,numberOfReplicas);
+    }
 
+    @Override
+    public void createIndexAddToAlias(final String indexSuffix, final int numberOfShards,
final int numberOfReplicas) {
         try {
-            if ( !mappingsCreated.getAndSet( true ) ) {
+
+            if (!mappingsCreated.getAndSet(true)) {
                 createMappings();
             }
 
-            createIndexAndAlias();
+            //get index name with suffix attached
+            String indexName = indexIdentifier.getIndex(indexSuffix);
 
-            // create the document, this ensures the index is ready
+            //Create index
+            try {
+                final AdminClient admin = esProvider.getClient().admin();
+                Settings settings = ImmutableSettings.settingsBuilder().put("index.number_of_shards",
numberOfShards)
+                        .put("index.number_of_replicas", numberOfReplicas).build();
+                final CreateIndexResponse cir = admin.indices().prepareCreate(indexName).setSettings(settings).execute().actionGet();
+                logger.info("Created new Index Name [{}] ACK=[{}]", indexName, cir.isAcknowledged());
+            } catch (IndexAlreadyExistsException e) {
+                logger.info("Index Name [{}] already exists", indexName);
+            }
 
-            // Immediately create a document and remove it to ensure the entire cluster is
ready 
-            // to receive documents. Occasionally we see errors.  
-            // See this post: http://s.apache.org/index-missing-exception
+            addAlias(indexSuffix);
 
             testNewIndex();
-        }
-        catch ( IndexAlreadyExistsException expected ) {
+        } catch (IndexAlreadyExistsException expected) {
             // this is expected to happen if index already exists, it's a no-op and swallow
-        }
-        catch ( IOException e ) {
-            throw new RuntimeException( "Unable to initialize index", e );
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to initialize index", e);
         }
     }
 
-    /**
-     * Create the index and alias
-     */
-    private void createIndexAndAlias() {
-        final int numberOfShards = config.getNumberOfShards();
-        final int numberOfReplicas = config.getNumberOfReplicas();
-        final AdminClient admin = esProvider.getClient().admin();
-
-        Settings settings = ImmutableSettings.settingsBuilder().put( "index.number_of_shards",
numberOfShards)
-                .put( "index.number_of_replicas", numberOfReplicas ).build();
 
-        //TODO:swallow exception, look into setting up routing rules
-        String indexVersionName =  indexIdentifier.getIndex(0);
+    @Override
+    public void addAlias(final String indexSuffix) {
         try {
-            final CreateIndexResponse cir = admin.indices().prepareCreate(indexVersionName).setSettings(settings).execute().actionGet();
-            logger.info("Created new Index Name [{}] ACK=[{}]", indexVersionName, cir.isAcknowledged());
-        }catch(IndexAlreadyExistsException e){
-            logger.info("Index Name [{}] already exists",indexVersionName);
-        }
-        try {   //check if alias exists and get the alias
-            Boolean isAck = admin.indices().prepareAliases().addAlias(indexVersionName, alias.getReadAlias()).execute().actionGet().isAcknowledged();
-            logger.info("Created new read Alias Name [{}] ACK=[{}]", alias, isAck);
+            Boolean isAck;
+            String indexName = indexIdentifier.getIndex(indexSuffix);
+            final AdminClient adminClient = esProvider.getClient().admin();
+            //remove write alias, can only have one
+            ImmutableOpenMap<String,List<AliasMetaData>> aliasMap = adminClient.indices().getAliases(new
GetAliasesRequest(alias.getWriteAlias())).actionGet().getAliases();
+            String[] indexNames = aliasMap.keys().toArray(String.class);
+            for(String currentIndex : indexNames){
+                isAck = adminClient.indices().prepareAliases().removeAlias(currentIndex,alias.getWriteAlias()).execute().actionGet().isAcknowledged();
+                logger.info("Removed Index Name [{}] from Alias=[{}] ACK=[{}]",currentIndex,
alias, isAck);
 
-            isAck = admin.indices().prepareAliases().addAlias(indexVersionName, alias.getWriteAlias()).execute().actionGet().isAcknowledged();
+            }
+            //add read alias
+            isAck = adminClient.indices().prepareAliases().addAlias(indexName, alias.getReadAlias()).execute().actionGet().isAcknowledged();
+            logger.info("Created new read Alias Name [{}] ACK=[{}]", alias, isAck);
+            //add write alias
+            isAck = adminClient.indices().prepareAliases().addAlias(indexName, alias.getWriteAlias()).execute().actionGet().isAcknowledged();
             logger.info("Created new write Alias Name [{}] ACK=[{}]", alias, isAck);
+
         } catch (Exception e) {
             logger.warn("Failed to create alias ", e);
         }
@@ -193,6 +202,10 @@ public class EsEntityIndexImpl implements EntityIndex {
      */
     private void testNewIndex() {
 
+        // create the document, this ensures the index is ready
+        // Immediately create a document and remove it to ensure the entire cluster is ready
+        // to receive documents. Occasionally we see errors.
+        // See this post: http://s.apache.org/index-missing-exception
 
         logger.info( "Refreshing Created new Index Name [{}]", alias);
 
@@ -430,7 +443,7 @@ public class EsEntityIndexImpl implements EntityIndex {
     public int getPendingTasks() {
 
         final PendingClusterTasksResponse tasksResponse = esProvider.getClient().admin()
-                .cluster().pendingClusterTasks( new PendingClusterTasksRequest() ).actionGet();
+                .cluster().pendingClusterTasks(new PendingClusterTasksRequest()).actionGet();
 
         return tasksResponse.pendingTasks().size();
     }
@@ -475,7 +488,7 @@ public class EsEntityIndexImpl implements EntityIndex {
      */
     public void deleteIndex() {
         AdminClient adminClient = esProvider.getClient().admin();
-        DeleteIndexResponse response = adminClient.indices().prepareDelete( indexIdentifier.getIndex(0)
).get();
+        DeleteIndexResponse response = adminClient.indices().prepareDelete( indexIdentifier.getIndex(null)
).get();
         if ( response.isAcknowledged() ) {
             logger.info( "Deleted index: " + alias);
         }
@@ -519,7 +532,7 @@ public class EsEntityIndexImpl implements EntityIndex {
 
         try {
             ClusterHealthResponse chr = esProvider.getClient().admin()
-                    .cluster().health( new ClusterHealthRequest() ).get();
+                    .cluster().health(new ClusterHealthRequest()).get();
             return Health.valueOf( chr.getStatus().name() );
         }
         catch ( Exception ex ) {
@@ -539,7 +552,7 @@ public class EsEntityIndexImpl implements EntityIndex {
 
         try {
             ClusterHealthResponse chr = esProvider.getClient().admin().cluster()
-                                                  .health(new ClusterHealthRequest(new String[]{indexIdentifier.getIndex(0)}))
+                                                  .health(new ClusterHealthRequest(new String[]{indexIdentifier.getIndex(null)}))
                                                   .get();
             return Health.valueOf( chr.getStatus().name() );
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b376cfd1/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index f8bd30f..8ea8835 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -76,25 +76,67 @@ public class EntityIndexTest extends BaseIT {
 
     @Test
     public void testIndex() throws IOException {
-
-        final int MAX_ENTITIES = 100;
-
         Id appId = new SimpleId( "application" );
 
         ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
 
-        IndexScope indexScope = new IndexScopeImpl( appId, "things" );
+        EntityIndex entityIndex = eif.createEntityIndex( applicationScope );
+        entityIndex.initializeIndex();
 
         final String entityType = "thing";
+        IndexScope indexScope = new IndexScopeImpl( appId, "things" );
+        final SearchTypes searchTypes = SearchTypes.fromTypes( entityType );
 
+        insertJsonBlob(entityIndex, entityType, indexScope, "/sample-large.json",100,0);
 
-        final SearchTypes searchTypes = SearchTypes.fromTypes( entityType );
+        entityIndex.refresh();
 
+        testQueries( indexScope, searchTypes,  entityIndex );
+    }
+
+    @Test
+    public void testMultipleIndexInitializations(){
+        Id appId = new SimpleId( "application" );
+
+        ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
 
         EntityIndex entityIndex = eif.createEntityIndex( applicationScope );
         entityIndex.initializeIndex();
+        for(int i=0;i<10;i++) {
+            entityIndex.initializeIndex();
+        }
+
+    }
+
+    @Test
+    public void testAddMultipleIndexes() throws IOException {
+        Id appId = new SimpleId( "application" );
 
-        InputStream is = this.getClass().getResourceAsStream( "/sample-large.json" );
+        ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
+
+        EntityIndex entityIndex = eif.createEntityIndex( applicationScope );
+        entityIndex.initializeIndex();
+
+        final String entityType = "thing";
+        IndexScope indexScope = new IndexScopeImpl( appId, "things" );
+        final SearchTypes searchTypes = SearchTypes.fromTypes( entityType );
+
+        insertJsonBlob(entityIndex, entityType, indexScope, "/sample-large.json",100,0);
+
+        entityIndex.refresh();
+
+        testQueries( indexScope, searchTypes,  entityIndex );
+
+        entityIndex.createIndexAddToAlias("v2",1,0);
+
+        insertJsonBlob(entityIndex, entityType, indexScope, "/sample-large.json",100,100);
+
+        testQueries( indexScope, searchTypes,  entityIndex );
+
+    }
+
+    private void insertJsonBlob(EntityIndex entityIndex, String entityType, IndexScope indexScope,
String filePath,final int max,final int startIndex) throws IOException {
+        InputStream is = this.getClass().getResourceAsStream( filePath );
         ObjectMapper mapper = new ObjectMapper();
         List<Object> sampleJson = mapper.readValue( is, new TypeReference<List<Object>>()
{} );
 
@@ -104,13 +146,19 @@ public class EntityIndexTest extends BaseIT {
 
         final EntityIndexBatch batch = entityIndex.createBatch();
 
+        if(startIndex > 0){
+            for(int i =0; i<startIndex;i++){
+                sampleJson.remove(0);
+            }
+        }
+
         for ( Object o : sampleJson ) {
 
             Map<String, Object> item = ( Map<String, Object> ) o;
 
             Entity entity = new Entity( entityType );
-            entity = EntityIndexMapUtils.fromMap( entity, item );
-            EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() );
+            entity = EntityIndexMapUtils.fromMap(entity, item);
+            EntityUtils.setVersion(entity, UUIDGenerator.newTimeUUID());
 
             batch.index( indexScope, entity );
 
@@ -120,7 +168,7 @@ public class EntityIndexTest extends BaseIT {
 
 
 
-            if ( count++ > MAX_ENTITIES ) {
+            if ( count++ > max ) {
                 break;
             }
         }
@@ -129,25 +177,8 @@ public class EntityIndexTest extends BaseIT {
         timer.stop();
         log.info( "Total time to index {} entries {}ms, average {}ms/entry",
                 new Object[] { count, timer.getTime(), timer.getTime() / count } );
-
-        entityIndex.refresh();
-
-
-        testQueries( indexScope, searchTypes,  entityIndex );
     }
 
-    @Test
-    public void testMultipleIndexInitializations(){
-        Id appId = new SimpleId( "application" );
-
-        ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
-
-        EntityIndex entityIndex = eif.createEntityIndex( applicationScope );
-        for(int i=0;i<10;i++) {
-            entityIndex.initializeIndex();
-        }
-
-    }
 
     @Test
     public void testDeindex() {


Mime
View raw message