phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From an...@apache.org
Subject phoenix git commit: PHOENIX-4785 Unable to write to table if index is made active during retry
Date Thu, 21 Jun 2018 23:18:22 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 22ea19cee -> fe06cacc1


PHOENIX-4785 Unable to write to table if index is made active during retry


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/fe06cacc
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/fe06cacc
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/fe06cacc

Branch: refs/heads/4.x-HBase-0.98
Commit: fe06cacc1a715f094fbf98e6d42bd9abc5053037
Parents: 22ea19c
Author: Ankit Singhal <ankitsinghal59@gmail.com>
Authored: Thu Jun 21 16:17:25 2018 -0700
Committer: Ankit Singhal <ankitsinghal59@gmail.com>
Committed: Thu Jun 21 16:17:25 2018 -0700

----------------------------------------------------------------------
 .../end2end/index/MutableIndexFailureIT.java    | 128 ++++++++++++++-----
 .../MutableIndexFailureWithNamespaceIT.java     |  80 ++++++++++++
 .../coprocessor/MetaDataEndpointImpl.java       |  30 +++++
 .../index/PhoenixIndexFailurePolicy.java        |  71 +++++++++-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |   2 +-
 5 files changed, 276 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/fe06cacc/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 715e37f..aac20ac 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -28,10 +28,16 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -104,10 +110,10 @@ public class MutableIndexFailureIT extends BaseTest {
     private final boolean throwIndexWriteFailure;
     private String schema = generateUniqueName();
     private List<CommitException> exceptions = Lists.newArrayList();
-    private static RegionCoprocessorEnvironment indexRebuildTaskRegionEnvironment;
-    private static final int forwardOverlapMs = 1000;
-    private static final int disableTimestampThresholdMs = 10000;
-    private static final int numRpcRetries = 2;
+    protected static RegionCoprocessorEnvironment indexRebuildTaskRegionEnvironment;
+    protected static final int forwardOverlapMs = 1000;
+    protected static final int disableTimestampThresholdMs = 10000;
+    protected static final int numRpcRetries = 2;
 
     public MutableIndexFailureIT(boolean transactional, boolean localIndex, boolean isNamespaceMapped,
Boolean disableIndexOnWriteFailure, boolean failRebuildTask, Boolean throwIndexWriteFailure)
{
         this.transactional = transactional;
@@ -127,6 +133,23 @@ public class MutableIndexFailureIT extends BaseTest {
 
     @BeforeClass
     public static void doSetup() throws Exception {
+        Map<String, String> serverProps = getServerProps();
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
+        clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+        NUM_SLAVES_BASE = 4;
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
+        indexRebuildTaskRegionEnvironment =
+                (RegionCoprocessorEnvironment) getUtility()
+                        .getRSForFirstRegionInTable(
+                            PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
+                        .getOnlineRegions(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
+                        .get(0).getCoprocessorHost()
+                        .findCoprocessorEnvironment(MetaDataRegionObserver.class.getName());
+        MetaDataRegionObserver.initRebuildIndexConnectionProps(
+            indexRebuildTaskRegionEnvironment.getConfiguration());
+    }
+    
+    protected static Map<String,String> getServerProps(){
         Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
         serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
         serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
@@ -142,19 +165,7 @@ public class MutableIndexFailureIT extends BaseTest {
          * because we want to control it's execution ourselves
          */
         serverProps.put(QueryServices.INDEX_REBUILD_TASK_INITIAL_DELAY, Long.toString(Long.MAX_VALUE));
-        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
-        clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
-        NUM_SLAVES_BASE = 4;
-        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
-        indexRebuildTaskRegionEnvironment =
-                (RegionCoprocessorEnvironment) getUtility()
-                        .getRSForFirstRegionInTable(
-                            PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
-                        .getOnlineRegions(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
-                        .get(0).getCoprocessorHost()
-                        .findCoprocessorEnvironment(MetaDataRegionObserver.class.getName());
-        MetaDataRegionObserver.initRebuildIndexConnectionProps(
-            indexRebuildTaskRegionEnvironment.getConfiguration());
+        return serverProps;
     }
 
     @Parameters(name = "MutableIndexFailureIT_transactional={0},localIndex={1},isNamespaceMapped={2},disableIndexOnWriteFailure={3},failRebuildTask={4},throwIndexWriteFailure={5}")
// name is used by failsafe as file name in reports
@@ -162,16 +173,10 @@ public class MutableIndexFailureIT extends BaseTest {
         return Arrays.asList(new Object[][] { 
                 // note - can't disableIndexOnWriteFailure without throwIndexWriteFailure,
PHOENIX-4130
                 { false, false, false, false, false, false},
-                { false, false, true, true, false, null},
-                { false, false, true, true, false, true},
                 { false, false, false, true, false, null},
                 { true, false, false, true, false, null},
-                { true, false, true, true, false, null},
-                { false, true, true, true, false, null},
                 { false, true, false, null, false, null},
                 { true, true, false, true, false, null},
-                { true, true, true, null, false, null},
-
                 { false, false, false, false, false, null},
                 { false, true, false, false, false, null},
                 { false, false, false, false, false, null},
@@ -180,9 +185,7 @@ public class MutableIndexFailureIT extends BaseTest {
                 { false, true, false, true, false, null},
                 { false, true, false, true, false, null},
                 { false, false, false, true, true, null},
-                { false, false, true, true, true, null},
                 { false, false, false, false, true, false},
-                { false, false, true, false, true, false},
                 } 
         );
     }
@@ -258,6 +261,9 @@ public class MutableIndexFailureIT extends BaseTest {
             rs = conn.createStatement().executeQuery(query);
             assertFalse(rs.next());
 
+            initializeTable(conn, fullTableName);
+            addRowsInTableDuringRetry(fullTableName);
+
             // Verify the metadata for index is correct.
             rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), null,
                     new String[] { PTableType.INDEX.toString() });
@@ -270,8 +276,9 @@ public class MutableIndexFailureIT extends BaseTest {
             assertTrue(rs.next());
             assertEquals(thirdIndexName, rs.getString(3));
             assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
-            initializeTable(conn, fullTableName);
-            
+            // we should be able to write to ACTIVE index even in case of disable index on
failure policy
+            addRowToTable(conn, fullTableName);
+
             query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
             rs = conn.createStatement().executeQuery("EXPLAIN " + query);
             String expectedPlan = "CLIENT PARALLEL 2-WAY FULL SCAN OVER "
@@ -402,16 +409,67 @@ public class MutableIndexFailureIT extends BaseTest {
         stmt.setString(2, "x");
         stmt.setString(3, "1");
         stmt.execute();
-        stmt.setString(1, "b");
-        stmt.setString(2, "y");
-        stmt.setString(3, "2");
-        stmt.execute();
+        conn.commit();
+    }
+
+    private void addRowToTable(Connection conn, String tableName) throws SQLException {
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)");
         stmt.setString(1, "c");
         stmt.setString(2, "z");
         stmt.setString(3, "3");
         stmt.execute();
         conn.commit();
+    }
 
+    private void addRowsInTableDuringRetry(final String tableName)
+            throws SQLException, InterruptedException, ExecutionException {
+        int threads=10;
+        boolean wasFailWrite = FailingRegionObserver.FAIL_WRITE;
+        boolean wasToggleFailWriteForRetry = FailingRegionObserver.TOGGLE_FAIL_WRITE_FOR_RETRY;
+        try {
+            Callable callable = new Callable() {
+
+                @Override
+                public Boolean call() {
+                    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+                    props.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, String.valueOf(isNamespaceMapped));
+                    try (Connection conn = driver.connect(url, props)) {
+                        // In case of disable index on failure policy, INDEX will be in PENDING_DISABLE
on first retry
+                        // but will
+                        // become active if retry is successfull
+                        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName
+ " VALUES(?,?,?)");
+                        stmt.setString(1, "b");
+                        stmt.setString(2, "y");
+                        stmt.setString(3, "2");
+                        stmt.execute();
+                        if (!leaveIndexActiveOnFailure && !transactional) {
+                            FailingRegionObserver.FAIL_WRITE = true;
+                            FailingRegionObserver.TOGGLE_FAIL_WRITE_FOR_RETRY = true;
+                        }
+                        conn.commit();
+                    } catch (SQLException e) {
+                        return false;
+                    }
+                    return true;
+                }
+            };
+            ExecutorService executor = Executors.newFixedThreadPool(threads);
+            List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>();
+            for (int i = 0; i < threads; i++) {
+                futures.add(executor.submit(callable));
+            }
+            for (Future<Boolean> future : futures) {
+                Boolean isSuccess = future.get();
+                // transactions can have conflict so ignoring the check for them
+                if (!transactional) {
+                    assertTrue(isSuccess);
+                }
+            }
+            executor.shutdown();
+        } finally {
+            FailingRegionObserver.FAIL_WRITE = wasFailWrite;
+            FailingRegionObserver.TOGGLE_FAIL_WRITE_FOR_RETRY = wasToggleFailWriteForRetry;
+        }
     }
 
     private void validateDataWithIndex(Connection conn, String fullTableName, String fullIndexName,
boolean localIndex) throws SQLException {
@@ -510,6 +568,7 @@ public class MutableIndexFailureIT extends BaseTest {
     }
 
     public static class FailingRegionObserver extends SimpleRegionObserver {
+        public static boolean TOGGLE_FAIL_WRITE_FOR_RETRY = false;
         public static volatile boolean FAIL_WRITE = false;
         public static final String FAIL_INDEX_NAME = "FAIL_IDX";
         public static final String FAIL_TABLE_NAME = "FAIL_TABLE";
@@ -520,6 +579,9 @@ public class MutableIndexFailureIT extends BaseTest {
             if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().endsWith("A_"
+ FAIL_INDEX_NAME)
                     && FAIL_WRITE) {
                 throwException = true;
+                if (TOGGLE_FAIL_WRITE_FOR_RETRY) {
+                    FAIL_WRITE = !FAIL_WRITE;
+                }
             } else {
                 // When local index updates are atomic with data updates, testing a write
failure to a local
                 // index won't make sense.
@@ -542,7 +604,9 @@ public class MutableIndexFailureIT extends BaseTest {
                 }
             }
             if (throwException) {
-                dropIndex(c);
+                if (!TOGGLE_FAIL_WRITE_FOR_RETRY) {
+                    dropIndex(c);
+                }
                 throw new DoNotRetryIOException();
             }
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fe06cacc/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureWithNamespaceIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureWithNamespaceIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureWithNamespaceIT.java
new file mode 100644
index 0000000..5ed9e1f
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureWithNamespaceIT.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.coprocessor.MetaDataRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.BeforeClass;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Maps;
+
+/*
+ * This class is to ensure gets its own cluster with Namespace Enabled
+ */
+public class MutableIndexFailureWithNamespaceIT extends MutableIndexFailureIT {
+
+    public MutableIndexFailureWithNamespaceIT(boolean transactional, boolean localIndex,
boolean isNamespaceMapped,
+            Boolean disableIndexOnWriteFailure, boolean failRebuildTask, Boolean throwIndexWriteFailure)
{
+        super(transactional, localIndex, isNamespaceMapped, disableIndexOnWriteFailure, failRebuildTask,
+                throwIndexWriteFailure);
+    }
+    
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> serverProps = getServerProps();
+        serverProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.TRUE.toString());
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
+        clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.TRUE.toString());
+        clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+        NUM_SLAVES_BASE = 4;
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
+        TableName systemTable = SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,
+                true);
+        indexRebuildTaskRegionEnvironment = (RegionCoprocessorEnvironment)getUtility()
+                .getRSForFirstRegionInTable(systemTable).getOnlineRegions(systemTable).get(0).getCoprocessorHost()
+                .findCoprocessorEnvironment(MetaDataRegionObserver.class.getName());
+        MetaDataRegionObserver.initRebuildIndexConnectionProps(indexRebuildTaskRegionEnvironment.getConfiguration());
+    }
+    
+    @Parameters(name = "MutableIndexFailureIT_transactional={0},localIndex={1},isNamespaceMapped={2},disableIndexOnWriteFailure={3},failRebuildTask={4},throwIndexWriteFailure={5}")
// name is used by failsafe as file name in reports
+    public static List<Object[]> data() {
+        return Arrays.asList(new Object[][] { 
+                // note - can't disableIndexOnWriteFailure without throwIndexWriteFailure,
PHOENIX-4130
+                { false, false, true, true, false, null},
+                { false, false, true, true, false, true},
+                { true, false, true, true, false, null},
+                { false, true, true, true, false, null},
+                { true, true, true, null, false, null},
+                { false, false, true, true, true, null},
+                { false, false, true, false, true, false},
+                } 
+        );
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fe06cacc/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 05b7f41..caaae8c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -3716,7 +3716,37 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements
Coprocesso
                         return;
                     }
                 }
+                if (newState == PIndexState.PENDING_DISABLE && currentState != PIndexState.PENDING_DISABLE)
{
+                    // reset count for first PENDING_DISABLE
+                    newKVs.add(KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
+                            PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES, timeStamp,
Bytes.toBytes(0L)));
+                }
+                if (currentState == PIndexState.PENDING_DISABLE) {
+                    if (newState == PIndexState.ACTIVE) {
+                        //before making index ACTIVE check if all clients succeed otherwise
keep it PENDING_DISABLE
+                        byte[] count = region
+                                .get(new Get(key).addColumn(TABLE_FAMILY_BYTES,
+                                        PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES))
+                                .getValue(TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES);
+                        if (count != null && Bytes.toLong(count) != 0) {
+                            newState = PIndexState.PENDING_DISABLE;
+                            newKVs.remove(disableTimeStampKVIndex);
+                            newKVs.set(indexStateKVIndex, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
+                                    INDEX_STATE_BYTES, timeStamp, Bytes.toBytes(newState.getSerializedValue())));
+                        }
+                    } else if (newState == PIndexState.DISABLE) {
+                        //reset the counter for pending disable when transitioning from PENDING_DISABLE
to DISABLE
+                        newKVs.add(KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
+                                PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES, timeStamp,
Bytes.toBytes(0L)));
+                    }
 
+                }
+                
+                if(newState == PIndexState.ACTIVE||newState == PIndexState.PENDING_ACTIVE||newState
== PIndexState.DISABLE){
+                    newKVs.add(KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
+                            PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES, timeStamp,
Bytes.toBytes(0L)));   
+                }
+                
                 if (currentState == PIndexState.BUILDING && newState != PIndexState.ACTIVE)
{
                     timeStamp = currentStateKV.getTimestamp();
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fe06cacc/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index 40aa0dc..3c51ec7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.index;
 
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
+
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Collection;
@@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -431,6 +434,7 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy
{
     public static void doBatchWithRetries(MutateCommand mutateCommand,
             IndexWriteException iwe, PhoenixConnection connection, ReadOnlyProps config)
             throws IOException {
+        incrementPendingDisableCounter(iwe, connection);
         int maxTries = config.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
             HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
         long pause = config.getLong(HConstants.HBASE_CLIENT_PAUSE,
@@ -465,6 +469,57 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy
{
         throw new DoNotRetryIOException(iwe); // send failure back to client
     }
 
+    private static void incrementPendingDisableCounter(IndexWriteException indexWriteException,PhoenixConnection
conn) {
+        try {
+            Set<String> indexesToUpdate = new HashSet<>();
+            if (indexWriteException instanceof MultiIndexWriteFailureException) {
+                MultiIndexWriteFailureException indexException =
+                        (MultiIndexWriteFailureException) indexWriteException;
+                List<HTableInterfaceReference> failedIndexes = indexException.getFailedTables();
+                if (indexException.isDisableIndexOnFailure() && failedIndexes !=
null) {
+                    for (HTableInterfaceReference failedIndex : failedIndexes) {
+                        String failedIndexTable = failedIndex.getTableName();
+                        if (!indexesToUpdate.contains(failedIndexTable)) {
+                            incrementCounterForIndex(conn,failedIndexTable);
+                            indexesToUpdate.add(failedIndexTable);
+                        }
+                    }
+                }
+            } else if (indexWriteException instanceof SingleIndexWriteFailureException) {
+                SingleIndexWriteFailureException indexException =
+                        (SingleIndexWriteFailureException) indexWriteException;
+                String failedIndex = indexException.getTableName();
+                if (indexException.isDisableIndexOnFailure() && failedIndex != null)
{
+                    incrementCounterForIndex(conn,failedIndex);
+                }
+            }
+        } catch (Exception handleE) {
+            LOG.warn("Error while trying to handle index write exception", indexWriteException);
+        }
+    }
+
+    private static void incrementCounterForIndex(PhoenixConnection conn, String failedIndexTable)
throws IOException {
+        incrementCounterForIndex(conn, failedIndexTable, 1);
+    }
+
+    private static void decrementCounterForIndex(PhoenixConnection conn, String failedIndexTable)
throws IOException {
+        incrementCounterForIndex(conn, failedIndexTable, -1);
+    }
+    
+    private static void incrementCounterForIndex(PhoenixConnection conn, String failedIndexTable,long
amount) throws IOException {
+        byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(failedIndexTable);
+        Increment incr = new Increment(indexTableKey);
+        incr.addColumn(TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES,
amount);
+        try {
+            conn.getQueryServices()
+                    .getTable(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME,
+                            conn.getQueryServices().getProps()).getName())
+                    .increment(incr);
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
     private static boolean canRetryMore(int numRetry, int maxRetries, long canRetryUntil)
{
         // If there is a single try we must not take into account the time.
         return numRetry < maxRetries
@@ -487,13 +542,25 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy
{
     }
 
     private static void updateIndex(String indexFullName, PhoenixConnection conn,
-            PIndexState indexState) throws SQLException {
+            PIndexState indexState) throws SQLException, IOException {
+        //Decrement the counter because we will be here when client give retry after getting
failed or succeed
+        decrementCounterForIndex(conn,indexFullName);
+        Long indexDisableTimestamp = null;
         if (PIndexState.DISABLE.equals(indexState)) {
             LOG.info("Disabling index after hitting max number of index write retries: "
                     + indexFullName);
+            IndexUtil.updateIndexState(conn, indexFullName, indexState, indexDisableTimestamp);
         } else if (PIndexState.ACTIVE.equals(indexState)) {
             LOG.debug("Resetting index to active after subsequent success " + indexFullName);
+            //At server disabled timestamp will be reset only if there is no other client
is in PENDING_DISABLE state
+            indexDisableTimestamp = 0L;
+            try {
+                IndexUtil.updateIndexState(conn, indexFullName, indexState, indexDisableTimestamp);
+            } catch (SQLException e) {
+                // It's possible that some other client had made the Index DISABLED already
, so we can ignore unallowed
+                // transition(DISABLED->ACTIVE)
+                if (e.getErrorCode() != SQLExceptionCode.INVALID_INDEX_STATE_TRANSITION.getErrorCode())
{ throw e; }
+            }
         }
-        IndexUtil.updateIndexState(conn, indexFullName, indexState, null);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fe06cacc/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 16fb184..4cb56bf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -49,7 +49,6 @@ import org.apache.phoenix.expression.function.SQLViewTypeFunction;
 import org.apache.phoenix.expression.function.SqlTypeNameFunction;
 import org.apache.phoenix.expression.function.TransactionProviderNameFunction;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.hbase.index.util.VersionUtil;
 import org.apache.phoenix.iterate.DelegateResultIterator;
 import org.apache.phoenix.iterate.MaterializedResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
@@ -214,6 +213,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
     
     public static final String TABLE_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY;
     public static final byte[] TABLE_FAMILY_BYTES = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
+    public static final byte[] PENDING_DISABLE_COUNT_BYTES = Bytes.toBytes("PENDING_DISABLE_COUNT");
 
     public static final String TYPE_SEQUENCE = "SEQUENCE";
     public static final String SYSTEM_FUNCTION_TABLE = "FUNCTION";


Mime
View raw message