Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6378E200D23 for ; Thu, 19 Oct 2017 23:40:56 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 61E581609EE; Thu, 19 Oct 2017 21:40:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 565B31609D7 for ; Thu, 19 Oct 2017 23:40:55 +0200 (CEST) Received: (qmail 79457 invoked by uid 500); 19 Oct 2017 21:40:54 -0000 Mailing-List: contact commits-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list commits@phoenix.apache.org Received: (qmail 79448 invoked by uid 99); 19 Oct 2017 21:40:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Oct 2017 21:40:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7D818DFA3C; Thu, 19 Oct 2017 21:40:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vincentpoon@apache.org To: commits@phoenix.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: phoenix git commit: PHOENIX-4242 Fix Indexer post-compact hook logging of NPE and TableNotFound Date: Thu, 19 Oct 2017 21:40:52 +0000 (UTC) archived-at: Thu, 19 Oct 2017 21:40:56 -0000 Repository: phoenix Updated Branches: refs/heads/4.12-HBase-1.3 a38b0c06d -> 2e521f54e PHOENIX-4242 Fix Indexer post-compact hook logging of NPE and TableNotFound Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2e521f54 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2e521f54 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2e521f54 Branch: refs/heads/4.12-HBase-1.3 Commit: 2e521f54ec7539ff36bc9a99eb122bc58268e5db Parents: a38b0c0 Author: Vincent Poon Authored: Thu Oct 19 14:28:27 2017 -0700 Committer: Vincent Poon Committed: Thu Oct 19 14:40:41 2017 -0700 ---------------------------------------------------------------------- .../UngroupedAggregateRegionObserverIT.java | 171 +++++++++++++++++++ .../UngroupedAggregateRegionObserver.java | 103 ++++++----- .../org/apache/phoenix/hbase/index/Indexer.java | 52 ------ .../apache/phoenix/schema/MetaDataClient.java | 3 + 4 files changed, 239 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/2e521f54/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java new file mode 100644 index 0000000..3efd40e --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.never; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.log4j.Appender; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.spi.LoggingEvent; +import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.IndexUtil; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.TestUtil; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class UngroupedAggregateRegionObserverIT extends ParallelStatsDisabledIT { + + private String dataTableName; + private String indexTableName; + private String schemaName; + private String dataTableFullName; + private static String indexTableFullName; + + @Mock + private Appender mockAppender; + + @Captor + private ArgumentCaptor captorLoggingEvent; + private UngroupedAggregateRegionObserver ungroupedObserver; + + @Before + public void setup() { + ungroupedObserver = new UngroupedAggregateRegionObserver(); + ungroupedObserver.setCompactionConfig(PropertiesUtil.cloneConfig(config)); + } + + /** + * Tests the that post compact hook doesn't log any NPE for a System table + */ + @Test + public void testPostCompactSystemSequence() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + startCapturingIndexLog(); + // run the post-compact hook + ungroupedObserver.clearTsOnDisabledIndexes("SYSTEM.SEQUENCE"); + stopCapturingIndexLog(); + // uneventful - nothing should be logged + Mockito.verify(mockAppender, never()) + .doAppend((LoggingEvent) captorLoggingEvent.capture()); + } + } + + /** + * Tests that calling the post compact hook on the data table permanently disables an index that + * is being rebuilt (i.e. already disabled or inactive) + */ + @Test + public void testPostCompactDataTableDuringRebuild() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + generateUniqueTableNames(); + testRebuildPostCompact(conn, dataTableFullName); + } + } + + /** + * Tests that calling the post compact hook on the index table permanently disables an index + * that is being rebuilt (i.e. already disabled or inactive) + */ + @Test + public void testPostCompactIndexTableDuringRebuild() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + generateUniqueTableNames(); + testRebuildPostCompact(conn, indexTableFullName); + } + } + + private void testRebuildPostCompact(Connection conn, String tableToCompact) + throws SQLException { + conn.createStatement().execute( + String.format(PartialScannerResultsDisabledIT.TEST_TABLE_DDL, dataTableFullName)); + conn.createStatement().execute(String.format(PartialScannerResultsDisabledIT.INDEX_1_DDL, + indexTableName, dataTableFullName)); + // disable the index, simulating an index write failure + PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class); + IndexUtil.updateIndexState(pConn, indexTableFullName, PIndexState.DISABLE, + EnvironmentEdgeManager.currentTimeMillis()); + + // run the post-compact hook on the data table + startCapturingIndexLog(); + ungroupedObserver.clearTsOnDisabledIndexes(tableToCompact); + stopCapturingIndexLog(); + // an event should've been logged + Mockito.verify(mockAppender).doAppend((LoggingEvent) captorLoggingEvent.capture()); + LoggingEvent loggingEvent = (LoggingEvent) captorLoggingEvent.getValue(); + assertThat(loggingEvent.getLevel(), is(Level.INFO)); + // index should be permanently disabled (disabletime of 0) + assertTrue(TestUtil.checkIndexState(pConn, indexTableFullName, PIndexState.DISABLE, 0L)); + } + + /** + * Tests that a non-Phoenix table (created purely through HBase) doesn't log a warning in + * postCompact + */ + @Test + public void testPostCompactTableNotFound() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + HBaseTestingUtility utility = getUtility(); + String nonPhoenixTable = "NOT_A_PHOENIX_TABLE"; + utility.getHBaseAdmin().createTable(utility.createTableDescriptor(nonPhoenixTable)); + startCapturingIndexLog(); + ungroupedObserver.clearTsOnDisabledIndexes(nonPhoenixTable); + stopCapturingIndexLog(); + // a debug level event should've been logged + Mockito.verify(mockAppender).doAppend((LoggingEvent) captorLoggingEvent.capture()); + LoggingEvent loggingEvent = (LoggingEvent) captorLoggingEvent.getValue(); + assertThat(loggingEvent.getLevel(), is(Level.DEBUG)); + } + } + + private void stopCapturingIndexLog() { + LogManager.getLogger(UngroupedAggregateRegionObserver.class).removeAppender(mockAppender); + } + + private void startCapturingIndexLog() { + LogManager.getLogger(UngroupedAggregateRegionObserver.class).addAppender(mockAppender); + } + + private void generateUniqueTableNames() { + schemaName = generateUniqueName(); + dataTableName = generateUniqueName() + "_DATA"; + dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + indexTableName = generateUniqueName() + "_IDX"; + indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/2e521f54/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index c3024a7..af50420 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -97,6 +97,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.PhoenixIndexCodec; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.query.QueryConstants; @@ -108,8 +109,10 @@ import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PRow; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableImpl; +import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.ValueSchema.Field; import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker; @@ -133,7 +136,10 @@ import org.apache.phoenix.util.ExpressionUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.KeyValueUtil; import org.apache.phoenix.util.LogUtil; +import org.apache.phoenix.util.MetaDataUtil; +import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; @@ -142,7 +148,10 @@ import org.apache.phoenix.util.TimeKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicate; import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; @@ -926,7 +935,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } @Override - public void postCompact(final ObserverContext e, final Store store, + public void postCompact(final ObserverContext c, final Store store, final StoreFile resultFile, CompactionRequest request) throws IOException { // If we're compacting all files, then delete markers are removed // and we must permanently disable an index that needs to be @@ -940,49 +949,67 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver User.runAsLoginUser(new PrivilegedExceptionAction() { @Override public Void run() throws Exception { - MutationCode mutationCode = null; - long disableIndexTimestamp = 0; - - try (CoprocessorHConnection coprocessorHConnection = - new CoprocessorHConnection(compactionConfig, - (HRegionServer) e.getEnvironment() - .getRegionServerServices()); - HTableInterface htable = - coprocessorHConnection - .getTable(SchemaUtil.getPhysicalTableName( - PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, - compactionConfig))) { - String tableName = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString(); - // FIXME: if this is an index on a view, we won't find a row for it in SYSTEM.CATALOG - // Instead, we need to disable all indexes on the view. - byte[] tableKey = SchemaUtil.getTableKeyFromFullName(tableName); - Get get = new Get(tableKey); - get.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES); - Result result = htable.get(get); - if (!result.isEmpty()) { - Cell cell = result.listCells().get(0); - if (cell.getValueLength() > 0) { - disableIndexTimestamp = PLong.INSTANCE.getCodec().decodeLong(cell.getValueArray(), cell.getValueOffset(), SortOrder.getDefault()); - if (disableIndexTimestamp != 0) { - logger.info("Major compaction running while index on table is disabled. Clearing index disable timestamp: " + tableName); - mutationCode = IndexUtil.updateIndexState(tableKey, 0L, htable, PIndexState.DISABLE).getMutationCode(); - } - } - } - } catch (Throwable t) { // log, but swallow exception as we don't want to impact compaction - logger.warn("Potential failure to permanently disable index during compaction " + e.getEnvironment().getRegionInfo().getTable().getNameAsString(), t); - } finally { - if (disableIndexTimestamp != 0 && mutationCode != MutationCode.TABLE_ALREADY_EXISTS && mutationCode != MutationCode.TABLE_NOT_FOUND) { - logger.warn("Attempt to permanently disable index " + e.getEnvironment().getRegionInfo().getTable().getNameAsString() + - " during compaction" + (mutationCode == null ? "" : " failed with code = " + mutationCode)); - } - } + String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString(); + clearTsOnDisabledIndexes(fullTableName); return null; } }); } } + @VisibleForTesting + public void clearTsOnDisabledIndexes(final String fullTableName) { + try (PhoenixConnection conn = + QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class)) { + String baseTable = fullTableName; + PTable table = PhoenixRuntime.getTableNoCache(conn, baseTable); + List indexes; + // if it's an index table, we just need to check if it's disabled + if (PTableType.INDEX.equals(table.getType())) { + indexes = Lists.newArrayList(table.getIndexes()); + indexes.add(table); + } else { + // for a data table, check all its indexes + indexes = table.getIndexes(); + } + // FIXME need handle views and indexes on views as well + // if any index is disabled, we won't have all the data for a rebuild after compaction + for (PTable index : indexes) { + if (index.getIndexDisableTimestamp() != 0) { + try { + logger.info( + "Major compaction running while index on table is disabled. Clearing index disable timestamp: " + + index); + IndexUtil.updateIndexState(conn, index.getName().getString(), + PIndexState.DISABLE, Long.valueOf(0L)); + } catch (SQLException e) { + logger.warn( + "Unable to permanently disable index " + index.getName().getString(), + e); + } + } + } + } catch (Exception e) { + if (e instanceof TableNotFoundException) { + logger.debug("Ignoring HBase table that is not a Phoenix table: " + fullTableName); + // non-Phoenix HBase tables won't be found, do nothing + return; + } + // If we can't reach the stats table, don't interrupt the normal + // compaction operation, just log a warning. + if (logger.isWarnEnabled()) { + logger.warn("Unable to permanently disable indexes being partially rebuild for " + + fullTableName, + e); + } + } + } + + @VisibleForTesting + public void setCompactionConfig(Configuration compactionConfig) { + this.compactionConfig = compactionConfig; + } + private static PTable deserializeTable(byte[] b) { try { PTableProtos.PTable ptableProto = PTableProtos.PTable.parseFrom(b); http://git-wip-us.apache.org/repos/asf/phoenix/blob/2e521f54/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index 8957b30..24eeab5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -25,7 +25,6 @@ import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER import java.io.IOException; import java.security.PrivilegedExceptionAction; -import java.sql.SQLException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -63,8 +62,6 @@ import org.apache.hadoop.hbase.regionserver.OperationStatus; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.security.User; @@ -90,18 +87,12 @@ import org.apache.phoenix.hbase.index.write.IndexWriter; import org.apache.phoenix.hbase.index.write.RecoveryIndexWriter; import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache; import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy; -import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.schema.PIndexState; -import org.apache.phoenix.schema.PTable; import org.apache.phoenix.trace.TracingUtils; import org.apache.phoenix.trace.util.NullSpan; import org.apache.phoenix.util.EnvironmentEdgeManager; -import org.apache.phoenix.util.IndexUtil; -import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; -import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ServerUtil; import com.google.common.collect.Lists; @@ -840,48 +831,5 @@ public class Indexer extends BaseRegionObserver { properties.put(Indexer.INDEX_BUILDER_CONF_KEY, builder.getName()); desc.addCoprocessor(Indexer.class.getName(), null, priority, properties); } - - @Override - public void postCompact(final ObserverContext c, final Store store, - final StoreFile resultFile, CompactionRequest request) throws IOException { - // If we're compacting all files, then delete markers are removed - // and we must permanently disable an index that needs to be - // partially rebuild because we're potentially losing the information - // we need to successfully rebuilt it. - if (request.isAllFiles() || request.isMajor()) { - // Compaction and split upcalls run with the effective user context of the requesting user. - // This will lead to failure of cross cluster RPC if the effective user is not - // the login user. Switch to the login user context to ensure we have the expected - // security context. - User.runAsLoginUser(new PrivilegedExceptionAction() { - @Override - public Void run() throws Exception { - String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString(); - try { - PhoenixConnection conn = QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class); - PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName); - // FIXME: we may need to recurse into children of this table too - for (PTable index : table.getIndexes()) { - if (index.getIndexDisableTimestamp() != 0) { - try { - LOG.info("Major compaction running while index on table is disabled. Clearing index disable timestamp: " + fullTableName); - IndexUtil.updateIndexState(conn, index.getName().getString(), PIndexState.DISABLE, Long.valueOf(0L)); - } catch (SQLException e) { - LOG.warn("Unable to permanently disable index " + index.getName().getString(), e); - } - } - } - } catch (Exception e) { - // If we can't reach the stats table, don't interrupt the normal - // compaction operation, just log a warning. - if (LOG.isWarnEnabled()) { - LOG.warn("Unable to permanently disable indexes being partially rebuild for " + fullTableName, e); - } - } - return null; - } - }); - } - } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2e521f54/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 0f6bab2..0ce4246 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -602,6 +602,9 @@ public class MetaDataClient { } if (SYSTEM_CATALOG_SCHEMA.equals(schemaName)) { + if (result.getMutationCode() == MutationCode.TABLE_ALREADY_EXISTS && result.getTable() == null) { + result.setTable(table); + } return result; } MutationCode code = result.getMutationCode();