phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajeshb...@apache.org
Subject git commit: PHOENIX-1139 Failed to disable local index when index update fails(Rajeshbabu)
Date Wed, 03 Sep 2014 04:42:22 GMT
Repository: phoenix
Updated Branches:
  refs/heads/master 27a6ccef3 -> 845888bcb


PHOENIX-1139 Failed to disable local index when index update fails(Rajeshbabu)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/845888bc
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/845888bc
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/845888bc

Branch: refs/heads/master
Commit: 845888bcb58a6fe5975de4cfabbe9266407de9dc
Parents: 27a6cce
Author: Rajeshbabu Chintaguntla <rajeshbabu@apache.org>
Authored: Wed Sep 3 04:37:36 2014 +0000
Committer: Rajeshbabu Chintaguntla <rajeshbabu@apache.org>
Committed: Wed Sep 3 04:37:36 2014 +0000

----------------------------------------------------------------------
 .../end2end/index/MutableIndexFailureIT.java    |  71 +++++++-----
 .../hbase/index/balancer/IndexLoadBalancer.java |   2 +-
 .../index/PhoenixIndexFailurePolicy.java        | 112 +++++++++++++++----
 .../apache/phoenix/schema/MetaDataClient.java   |   7 ++
 4 files changed, 141 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/845888bc/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index e499c3b..6137e06 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -27,13 +27,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.io.IOException;
 import java.sql.Connection;
-import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
@@ -44,52 +41,36 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseCluster;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.filter.CompareFilter;
-import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.master.LoadBalancer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.hbase.index.balancer.IndexLoadBalancer;
+import org.apache.phoenix.hbase.index.master.IndexMasterObserver;
 import org.apache.phoenix.jdbc.PhoenixTestDriver;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.MetaDataClient;
-import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.PIndexState;
-import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.MetaDataUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
 import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 /**
@@ -122,6 +103,9 @@ public class MutableIndexFailureIT extends BaseTest {
         conf.setInt("hbase.client.pause", 5000);
         conf.setInt("hbase.balancer.period", Integer.MAX_VALUE);
         conf.setLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, 0);
+        conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, IndexMasterObserver.class.getName());
+        conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, IndexLoadBalancer.class,
+            LoadBalancer.class);
         util = new HBaseTestingUtility(conf);
         util.startMiniCluster(NUM_SLAVES);
         String clientPort = util.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
@@ -144,7 +128,16 @@ public class MutableIndexFailureIT extends BaseTest {
     }
 
     @Test(timeout=300000)
+    public void testWriteFailureDisablesLocalIndex() throws Exception {
+        testWriteFailureDisablesIndex(true);
+    }
+ 
+    @Test(timeout=300000)
     public void testWriteFailureDisablesIndex() throws Exception {
+        testWriteFailureDisablesIndex(false);
+    }
+    
+    public void testWriteFailureDisablesIndex(boolean localIndex) throws Exception {
         String query;
         ResultSet rs;
 
@@ -157,8 +150,16 @@ public class MutableIndexFailureIT extends BaseTest {
         rs = conn.createStatement().executeQuery(query);
         assertFalse(rs.next());
 
-        conn.createStatement().execute(
+        if(localIndex) {
+            conn.createStatement().execute(
+                "CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME
+ " (v1) INCLUDE (v2)");
+            conn.createStatement().execute(
+                "CREATE LOCAL INDEX " + INDEX_TABLE_NAME+ "_2" + " ON " + DATA_TABLE_FULL_NAME
+ " (v2) INCLUDE (v1)");
+        } else {
+            conn.createStatement().execute(
                 "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1)
INCLUDE (v2)");
+        }
+            
         query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
         rs = conn.createStatement().executeQuery(query);
         assertFalse(rs.next());
@@ -178,7 +179,9 @@ public class MutableIndexFailureIT extends BaseTest {
         stmt.execute();
         conn.commit();
 
-        TableName indexTable = TableName.valueOf(INDEX_TABLE_FULL_NAME);
+        TableName indexTable =
+                TableName.valueOf(localIndex ? MetaDataUtil
+                        .getLocalIndexTableName(DATA_TABLE_FULL_NAME) : INDEX_TABLE_FULL_NAME);
         HBaseAdmin admin = this.util.getHBaseAdmin();
         HTableDescriptor indexTableDesc = admin.getTableDescriptor(indexTable);
         try{
@@ -202,7 +205,15 @@ public class MutableIndexFailureIT extends BaseTest {
         assertEquals(INDEX_TABLE_NAME, rs.getString(3));
         assertEquals(PIndexState.DISABLE.toString(), rs.getString("INDEX_STATE"));
         assertFalse(rs.next());
-        
+        if(localIndex) {
+            rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME+"_2",
+                new String[] { PTableType.INDEX.toString() });
+            assertTrue(rs.next());
+            assertEquals(INDEX_TABLE_NAME+"_2", rs.getString(3));
+            assertEquals(PIndexState.DISABLE.toString(), rs.getString("INDEX_STATE"));
+            assertFalse(rs.next());
+        }
+
         // Verify UPSERT on data table still work after index is disabled       
         stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
         stmt.setString(1, "a3");
@@ -227,6 +238,14 @@ public class MutableIndexFailureIT extends BaseTest {
           if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
               break;
           }
+          if(localIndex) {
+              rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME),
INDEX_TABLE_NAME+"_2",
+                  new String[] { PTableType.INDEX.toString() });
+              assertTrue(rs.next());
+              if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
+                  break;
+              }
+          }
         } while(true);
         
         // verify index table has data

http://git-wip-us.apache.org/repos/asf/phoenix/blob/845888bc/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java
index fc1963b..5bc973c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancer.java
@@ -643,7 +643,7 @@ public class IndexLoadBalancer implements LoadBalancer {
             indexTableVsUserTable.remove(other);
         } else {
             other = indexTableVsUserTable.remove(table);
-            if (other != null) userTableVsIndexTable.remove(table);
+            if (other != null) userTableVsIndexTable.remove(other);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/845888bc/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index 3c9c56c..b683c20 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -18,8 +18,11 @@
 package org.apache.phoenix.index;
 
 import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -28,6 +31,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -35,6 +39,7 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
@@ -48,10 +53,16 @@ import org.apache.phoenix.hbase.index.write.KillServerOnFailurePolicy;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.protobuf.ProtobufUtil;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
 
 /**
@@ -81,7 +92,7 @@ public class PhoenixIndexFailurePolicy extends  KillServerOnFailurePolicy
{
     @Override
     public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted,
Exception cause) throws IOException {
         Set<HTableInterfaceReference> refs = attempted.asMap().keySet();
-        String indexTableName = "";
+        List<String> indexTableNames = new ArrayList<String>(1);
         try {
             for (HTableInterfaceReference ref : refs) {
                 long minTimeStamp = 0;
@@ -98,19 +109,71 @@ public class PhoenixIndexFailurePolicy extends  KillServerOnFailurePolicy
{
                   }
                 }
                 
-                // Disable the index by using the updateIndexState method of MetaDataProtocol
end point coprocessor.
-                indexTableName = ref.getTableName();
-                byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName);
-                HTableInterface systemTable = env.getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES));
-                // Mimic the Put that gets generated by the client on an update of the index
state
-                Put put = new Put(indexTableKey);
-                put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
PIndexState.DISABLE.getSerializedBytes());
-                put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
PDataType.LONG.toBytes(minTimeStamp));
-                final List<Mutation> tableMetadata = Collections.<Mutation>singletonList(put);
-                
-                final Map<byte[], MetaDataResponse> results = 
-                        systemTable.coprocessorService(MetaDataService.class, indexTableKey,
indexTableKey,
-                            new Batch.Call<MetaDataService, MetaDataResponse>() {
+                if(ref.getTableName().startsWith(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX))
{
+                    PhoenixConnection conn = null;
+                    try {
+                        conn = QueryUtil.getConnection(this.env.getConfiguration()).unwrap(
+                                    PhoenixConnection.class);
+                        String userTableName = MetaDataUtil.getUserTableName(ref.getTableName());
+                        PTable dataTable = PhoenixRuntime.getTable(conn, userTableName);
+                        List<PTable> indexes = dataTable.getIndexes();
+                        // local index used to get view id from index mutation row key.
+                        PTable localIndex = null;
+                        Map<ImmutableBytesWritable, String> localIndexNames =
+                                new HashMap<ImmutableBytesWritable, String>();
+                        for (PTable index : indexes) {
+                            if (index.getIndexType() == IndexType.LOCAL
+                                    && index.getIndexState() == PIndexState.ACTIVE)
{
+                                if (localIndex == null) localIndex = index;
+                                localIndexNames.put(new ImmutableBytesWritable(MetaDataUtil.getViewIndexIdDataType().toBytes(
+                                    index.getViewIndexId())),index.getName().getString());
+                            }
+                        }
+                        if(localIndex == null) continue;
+                        
+                        IndexMaintainer indexMaintainer = localIndex.getIndexMaintainer(dataTable);
+                        HRegionInfo regionInfo = this.env.getRegion().getRegionInfo();
+                        int offset =
+                                regionInfo.getStartKey().length == 0 ? regionInfo.getEndKey().length
+                                        : regionInfo.getStartKey().length;
+                        byte[] viewId = null;
+                        for (Mutation mutation : mutations) {
+                            viewId = indexMaintainer.getViewIndexIdFromIndexRowKey(new ImmutableBytesWritable(mutation.getRow(),
offset, mutation.getRow().length - offset));
+                            String indexTableName = localIndexNames.get(new ImmutableBytesWritable(viewId));

+                            if(!indexTableNames.contains(indexTableName)) {
+                                indexTableNames.add(indexTableName);
+                            }
+                        }
+                    } catch (ClassNotFoundException e) {
+                        throw new IOException(e);
+                    } catch (SQLException e) {
+                        throw new IOException(e);
+                    } finally {
+                        if (conn != null) {
+                            try {
+                                conn.close();
+                            } catch (SQLException e) {
+                                throw new IOException(e);
+                            }
+                        }
+                    }
+                } else {
+                    indexTableNames.add(ref.getTableName());
+                }
+
+                for (String indexTableName : indexTableNames) {
+                    // Disable the index by using the updateIndexState method of MetaDataProtocol
end point coprocessor.
+                    byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName);
+                    HTableInterface systemTable = env.getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES));
+                    // Mimic the Put that gets generated by the client on an update of the
index state
+                    Put put = new Put(indexTableKey);
+                    put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
PIndexState.DISABLE.getSerializedBytes());
+                    put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
PDataType.LONG.toBytes(minTimeStamp));
+                    final List<Mutation> tableMetadata = Collections.<Mutation>singletonList(put);
+                    
+                    final Map<byte[], MetaDataResponse> results = 
+                            systemTable.coprocessorService(MetaDataService.class, indexTableKey,
indexTableKey,
+                                new Batch.Call<MetaDataService, MetaDataResponse>()
{
                                 @Override
                                 public MetaDataResponse call(MetaDataService instance) throws
IOException {
                                     ServerRpcController controller = new ServerRpcController();
@@ -128,22 +191,23 @@ public class PhoenixIndexFailurePolicy extends  KillServerOnFailurePolicy
{
                                     return rpcCallback.get();
                                 }
                             });
-                if(results.isEmpty()){
-                    throw new IOException("Didn't get expected result size");
-                }
-                MetaDataResponse tmpResponse = results.values().iterator().next();
-                MetaDataMutationResult result = MetaDataMutationResult.constructFromProto(tmpResponse);
               
+                    if(results.isEmpty()){
+                        throw new IOException("Didn't get expected result size");
+                    }
+                    MetaDataResponse tmpResponse = results.values().iterator().next();
+                    MetaDataMutationResult result = MetaDataMutationResult.constructFromProto(tmpResponse);
               
                 
-                if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
-                    LOG.warn("Attempt to disable index " + indexTableName + " failed with
code = " + result.getMutationCode() + ". Will use default failure policy instead.");
-                    throw new DoNotRetryIOException("Attemp to disable " + indexTableName
+ " failed.");
+                    if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
+                        LOG.warn("Attempt to disable index " + indexTableName + " failed
with code = " + result.getMutationCode() + ". Will use default failure policy instead.");
+                        throw new DoNotRetryIOException("Attemp to disable " + indexTableName
+ " failed.");
+                    }
+                    LOG.info("Successfully disabled index " + indexTableName + " due to an
exception while writing updates.", cause);
                 }
-                LOG.info("Successfully disabled index " + indexTableName + " due to an exception
while writing updates.", cause);
             }
         } catch (Throwable t) {
             LOG.warn("handleFailure failed", t);
             super.handleFailure(attempted, cause);
-            throw new DoNotRetryIOException("Attemp to writes to " + indexTableName + " failed.",
cause);
+            throw new DoNotRetryIOException("Attemp to writes to " + indexTableNames + "
failed.", cause);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/845888bc/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 195968e..4f53422 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -528,11 +528,18 @@ public class MetaDataClient {
                 Scan scan = plan.getContext().getScan();
                 try {
                     scan.setTimeRange(dataTableRef.getLowerBoundTimeStamp(), Long.MAX_VALUE);
+                    plan.getContext().setScanTimeRange(scan.getTimeRange());
                 } catch (IOException e) {
                     throw new SQLException(e);
                 }
                 ImmutableBytesWritable ptr = new ImmutableBytesWritable();
                 PTable dataTable = tableRef.getTable();
+                for(PTable idx: dataTable.getIndexes()) {
+                    if(idx.getName().equals(index.getName())) {
+                        index = idx;
+                        break;
+                    }
+                }
                 List<PTable> indexes = Lists.newArrayListWithExpectedSize(1);
                 // Only build newly created index.
                 indexes.add(index);


Mime
View raw message