From commits-return-19629-archive-asf-public=cust-asf.ponee.io@phoenix.apache.org Fri Feb 23 02:16:28 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id EE8CB18064E for ; Fri, 23 Feb 2018 02:16:26 +0100 (CET) Received: (qmail 4726 invoked by uid 500); 23 Feb 2018 01:16:26 -0000 Mailing-List: contact commits-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list commits@phoenix.apache.org Received: (qmail 4713 invoked by uid 99); 23 Feb 2018 01:16:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Feb 2018 01:16:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E477EE0AE7; Fri, 23 Feb 2018 01:16:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vincentpoon@apache.org To: commits@phoenix.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: phoenix git commit: PHOENIX-4530 Do not collect delete markers during major compaction of table with disabled mutable indexes Date: Fri, 23 Feb 2018 01:16:25 +0000 (UTC) Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 4fc3f7545 -> 178405d70 PHOENIX-4530 Do not collect delete markers during major compaction of table with disabled mutable indexes Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/178405d7 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/178405d7 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/178405d7 Branch: refs/heads/4.x-HBase-0.98 Commit: 178405d7012b05a683da57f5f2e53480e1dd6aed Parents: 4fc3f75 Author: Vincent Poon Authored: Thu Feb 22 17:14:16 2018 -0800 Committer: Vincent Poon Committed: Thu Feb 22 17:16:20 2018 -0800 ---------------------------------------------------------------------- .../PartialScannerResultsDisabledIT.java | 193 ++++++++++++++++++ .../UngroupedAggregateRegionObserverIT.java | 199 ------------------- .../phoenix/end2end/index/MutableIndexIT.java | 57 ++++++ .../end2end/index/PartialIndexRebuilderIT.java | 39 ---- .../UngroupedAggregateRegionObserver.java | 130 +++++------- .../java/org/apache/phoenix/util/TestUtil.java | 19 ++ 6 files changed, 314 insertions(+), 323 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/178405d7/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java new file mode 100644 index 0000000..59471dd --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.util.Properties; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.hbase.HConstants; +import org.apache.phoenix.util.IndexScrutiny; +import org.apache.phoenix.util.SchemaUtil; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PartialScannerResultsDisabledIT extends ParallelStatsDisabledIT { + public static final String TEST_TABLE_DDL = + "CREATE TABLE IF NOT EXISTS %s\n" + "(\n" + " ORGANIZATION_ID CHAR(15) NOT NULL,\n" + + " FEED_ELEMENT_ID CHAR(15) NOT NULL,\n" + + " CONTAINER_ID CHAR(15) NOT NULL,\n" + + " FEED_TYPE VARCHAR(1) NOT NULL, \n" + + " NETWORK_ID CHAR(15) NOT NULL,\n" + " USER_ID CHAR(15) NOT NULL,\n" + + " CREATED_TIME TIMESTAMP,\n" + " LAST_UPDATE TIMESTAMP,\n" + + " RELEVANCE_SCORE DOUBLE,\n" + " FEED_ITEM_TYPE VARCHAR(1),\n" + + " FEED_ELEMENT_TYPE VARCHAR(1),\n" + + " FEED_ELEMENT_IS_SYS_GEN BOOLEAN,\n" + + " FEED_ELEMENT_STATUS VARCHAR(1),\n" + + " FEED_ELEMENT_VISIBILITY VARCHAR(1),\n" + " PARENT_ID CHAR(15),\n" + + " CREATED_BY CHAR(15),\n" + " BEST_COMMENT_ID CHAR(15),\n" + + " COMMENT_COUNT INTEGER,\n" + " CONSTRAINT PK PRIMARY KEY\n" + " (\n" + + " ORGANIZATION_ID,\n" + " FEED_ELEMENT_ID,\n" + + " CONTAINER_ID,\n" + " FEED_TYPE,\n" + " NETWORK_ID,\n" + + " USER_ID\n" + " )\n" + ") COLUMN_ENCODED_BYTES = 0"; + + public static final String INDEX_1_DDL = + "CREATE INDEX IF NOT EXISTS %s\n" + "ON %s (\n" + " NETWORK_ID,\n" + + " CONTAINER_ID,\n" + " FEED_TYPE,\n" + " USER_ID,\n" + + " CREATED_TIME DESC,\n" + " FEED_ELEMENT_ID DESC,\n" + + " CREATED_BY\n" + ") " + + " INCLUDE (\n" + " FEED_ITEM_TYPE,\n" + + " FEED_ELEMENT_TYPE,\n" + " FEED_ELEMENT_IS_SYS_GEN,\n" + + " FEED_ELEMENT_STATUS,\n" + " FEED_ELEMENT_VISIBILITY,\n" + + " PARENT_ID,\n" + " BEST_COMMENT_ID,\n" + " COMMENT_COUNT\n" + ")"; + + private static final String UPSERT_INTO_DATA_TABLE = + "UPSERT INTO %s\n" + "(\n" + " ORGANIZATION_ID,\n" + " FEED_ELEMENT_ID,\n" + + " CONTAINER_ID,\n" + " FEED_TYPE,\n" + " NETWORK_ID,\n" + + " USER_ID,\n" + " CREATED_TIME,\n" + " LAST_UPDATE,\n" + + " FEED_ITEM_TYPE,\n" + " FEED_ELEMENT_TYPE,\n" + + " FEED_ELEMENT_IS_SYS_GEN,\n" + " FEED_ELEMENT_STATUS,\n" + + " FEED_ELEMENT_VISIBILITY,\n" + " PARENT_ID,\n" + " CREATED_BY,\n" + + " BEST_COMMENT_ID,\n" + " COMMENT_COUNT\n" + ")" + + "VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; + + private String dataTableName; + private String indexTableName; + private String schemaName; + private String dataTableFullName; + private static String indexTableFullName; + private static final Logger logger = LoggerFactory.getLogger(PartialScannerResultsDisabledIT.class); + private static Random random = new Random(1); + // background writer threads + private static Random sourceOfRandomness = new Random(0); + private static AtomicInteger upsertIdCounter = new AtomicInteger(1); + + @Before + public void setup() throws Exception { + // create the tables + generateUniqueTableNames(); + createTestTable(getUrl(), String.format(TEST_TABLE_DDL, dataTableFullName)); + createTestTable(getUrl(), String.format(INDEX_1_DDL, indexTableName, dataTableFullName)); + } + + @Test + public void testWithEnoughData() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + // Write enough data to trigger partial scanner results + // TODO: it's likely that less data could be written if whatever + // config parameters decide this are lowered. + writeSingleBatch(conn, 100, 20, dataTableFullName); + logger.info("Running scrutiny"); + // Scutunize index to see if partial results are silently returned + // In that case we'll get a false positive on the scrutiny run. + long rowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); + assertEquals(2000,rowCount); + } + } + + /** + * Simple select query with fetch size that exceed the result size. In that case scan would start to produce + * partial result sets that from Phoenix perspective are the rows with NULL values. + * @throws SQLException + */ + @Test + public void partialResultDuringSelect () throws SQLException { + String tableName = generateUniqueName(); + Properties props = new Properties(); + props.setProperty(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, "5"); + int numRecords = 10; + try (Connection conn = DriverManager.getConnection(url, props)) { + conn.createStatement().execute( + "CREATE TABLE " + tableName + " (PK1 INTEGER NOT NULL PRIMARY KEY, KV1 VARCHAR)"); + int i = 0; + String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?)"; + PreparedStatement stmt = conn.prepareStatement(upsert); + while (i < numRecords) { + stmt.setInt(1, i); + stmt.setString(2, UUID.randomUUID().toString()); + stmt.executeUpdate(); + i++; + } + conn.commit(); + + String sql = "SELECT * FROM " + tableName; + // at every next call wait for this period. This will cause lease to expire. + Statement s = conn.createStatement(); + s.setFetchSize(100); + ResultSet rs = s.executeQuery(sql); + int count = 0; + while (rs.next()) { + if (rs.getString(2) == null) + fail("Null value because of partial row scan"); + } + count++; + } + + } + + private static String randString(int length, Random random) { + return RandomStringUtils.randomAlphabetic(length); + } + + public static void writeSingleBatch(Connection connection, int batchSize, int numBatches, String tableName) throws Exception { + for (int j = 0; j < numBatches; j++) { + try (PreparedStatement statement = + connection.prepareStatement(String.format(UPSERT_INTO_DATA_TABLE, tableName))) { + for (int i = 0; i < batchSize; i++) { + int index = 0; + String id = "" + upsertIdCounter.getAndIncrement(); + statement.setString(++index, id); // ORGANIZATION_ID + statement.setString(++index, id); // FEED_ELEMENT_ID,\n" + statement.setString(++index, id); // CONTAINER_ID,\n" + statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_TYPE,\n" + statement.setString(++index, randString(15, sourceOfRandomness)); // NETWORK_ID,\n" + statement.setString(++index, randString(15, sourceOfRandomness)); // USER_ID,\n" + statement.setTimestamp(++index, new Timestamp(System.currentTimeMillis())); // CREATED_TIME,\n" + statement.setTimestamp(++index, new Timestamp(System.currentTimeMillis())); // LAST_UPDATE\n" + statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_ITEM_TYPE\n" + statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_ELEMENT_TYPE\n" + statement.setBoolean(++index, false); // FEED_ELEMENT_IS_SYS_GEN\n" + statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_ELEMENT_STATUS\n" + statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_ELEMENT_VISIBILITY\n" + statement.setString(++index, randString(15, sourceOfRandomness)); // PARENT_ID\n" + statement.setString(++index, randString(15, sourceOfRandomness)); // CREATED_BY\n" + statement.setString(++index, randString(15, sourceOfRandomness)); // BEST_COMMENT_ID\n" + statement.setInt(++index, random.nextInt()); // COMMENT_COUNT\n" + ")" + statement.execute(); + } + connection.commit(); + } + } + } + + private void generateUniqueTableNames() { + schemaName = generateUniqueName(); + dataTableName = generateUniqueName() + "_DATA"; + dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + indexTableName = generateUniqueName() + "_IDX"; + indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/178405d7/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java deleted file mode 100644 index d64df2c..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.end2end; - -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.never; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; - -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.log4j.Appender; -import org.apache.log4j.Level; -import org.apache.log4j.LogManager; -import org.apache.log4j.spi.LoggingEvent; -import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; -import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.schema.PIndexState; -import org.apache.phoenix.util.EnvironmentEdgeManager; -import org.apache.phoenix.util.IndexUtil; -import org.apache.phoenix.util.PropertiesUtil; -import org.apache.phoenix.util.SchemaUtil; -import org.apache.phoenix.util.TestUtil; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.runners.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class UngroupedAggregateRegionObserverIT extends ParallelStatsDisabledIT { - - public static final String TEST_TABLE_DDL = - "CREATE TABLE IF NOT EXISTS %s\n" + "(\n" + " ORGANIZATION_ID CHAR(15) NOT NULL,\n" - + " FEED_ELEMENT_ID CHAR(15) NOT NULL,\n" - + " CONTAINER_ID CHAR(15) NOT NULL,\n" - + " FEED_TYPE VARCHAR(1) NOT NULL, \n" - + " NETWORK_ID CHAR(15) NOT NULL,\n" + " USER_ID CHAR(15) NOT NULL,\n" - + " CREATED_TIME TIMESTAMP,\n" + " LAST_UPDATE TIMESTAMP,\n" - + " RELEVANCE_SCORE DOUBLE,\n" + " FEED_ITEM_TYPE VARCHAR(1),\n" - + " FEED_ELEMENT_TYPE VARCHAR(1),\n" - + " FEED_ELEMENT_IS_SYS_GEN BOOLEAN,\n" - + " FEED_ELEMENT_STATUS VARCHAR(1),\n" - + " FEED_ELEMENT_VISIBILITY VARCHAR(1),\n" + " PARENT_ID CHAR(15),\n" - + " CREATED_BY CHAR(15),\n" + " BEST_COMMENT_ID CHAR(15),\n" - + " COMMENT_COUNT INTEGER,\n" + " CONSTRAINT PK PRIMARY KEY\n" + " (\n" - + " ORGANIZATION_ID,\n" + " FEED_ELEMENT_ID,\n" - + " CONTAINER_ID,\n" + " FEED_TYPE,\n" + " NETWORK_ID,\n" - + " USER_ID\n" + " )\n" + ") COLUMN_ENCODED_BYTES = 0"; - - public static final String INDEX_1_DDL = - "CREATE INDEX IF NOT EXISTS %s\n" + "ON %s (\n" + " NETWORK_ID,\n" - + " CONTAINER_ID,\n" + " FEED_TYPE,\n" + " USER_ID,\n" - + " CREATED_TIME DESC,\n" + " FEED_ELEMENT_ID DESC,\n" - + " CREATED_BY\n" + ") " - + " INCLUDE (\n" + " FEED_ITEM_TYPE,\n" - + " FEED_ELEMENT_TYPE,\n" + " FEED_ELEMENT_IS_SYS_GEN,\n" - + " FEED_ELEMENT_STATUS,\n" + " FEED_ELEMENT_VISIBILITY,\n" - + " PARENT_ID,\n" + " BEST_COMMENT_ID,\n" + " COMMENT_COUNT\n" + ")"; - - private String dataTableName; - private String indexTableName; - private String schemaName; - private String dataTableFullName; - private static String indexTableFullName; - - @Mock - private Appender mockAppender; - - @Captor - private ArgumentCaptor captorLoggingEvent; - private UngroupedAggregateRegionObserver ungroupedObserver; - - @Before - public void setup() { - ungroupedObserver = new UngroupedAggregateRegionObserver(); - ungroupedObserver.setCompactionConfig(PropertiesUtil.cloneConfig(config)); - } - - /** - * Tests the that post compact hook doesn't log any NPE for a System table - */ - @Test - public void testPostCompactSystemSequence() throws Exception { - try (Connection conn = DriverManager.getConnection(getUrl())) { - startCapturingIndexLog(); - // run the post-compact hook - ungroupedObserver.clearTsOnDisabledIndexes("SYSTEM.SEQUENCE"); - stopCapturingIndexLog(); - // uneventful - nothing should be logged - Mockito.verify(mockAppender, never()) - .doAppend(captorLoggingEvent.capture()); - } - } - - /** - * Tests that calling the post compact hook on the data table permanently disables an index that - * is being rebuilt (i.e. already disabled or inactive) - */ - @Test - public void testPostCompactDataTableDuringRebuild() throws Exception { - try (Connection conn = DriverManager.getConnection(getUrl())) { - generateUniqueTableNames(); - testRebuildPostCompact(conn, dataTableFullName); - } - } - - /** - * Tests that calling the post compact hook on the index table permanently disables an index - * that is being rebuilt (i.e. already disabled or inactive) - */ - @Test - public void testPostCompactIndexTableDuringRebuild() throws Exception { - try (Connection conn = DriverManager.getConnection(getUrl())) { - generateUniqueTableNames(); - testRebuildPostCompact(conn, indexTableFullName); - } - } - - private void testRebuildPostCompact(Connection conn, String tableToCompact) - throws SQLException { - conn.createStatement().execute( - String.format(TEST_TABLE_DDL, dataTableFullName)); - conn.createStatement().execute(String.format(INDEX_1_DDL, - indexTableName, dataTableFullName)); - // disable the index, simulating an index write failure - PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class); - IndexUtil.updateIndexState(pConn, indexTableFullName, PIndexState.DISABLE, - EnvironmentEdgeManager.currentTimeMillis()); - - // run the post-compact hook on the data table - startCapturingIndexLog(); - ungroupedObserver.clearTsOnDisabledIndexes(tableToCompact); - stopCapturingIndexLog(); - // an event should've been logged - Mockito.verify(mockAppender).doAppend(captorLoggingEvent.capture()); - LoggingEvent loggingEvent = captorLoggingEvent.getValue(); - assertThat(loggingEvent.getLevel(), is(Level.INFO)); - // index should be permanently disabled (disabletime of 0) - assertTrue(TestUtil.checkIndexState(pConn, indexTableFullName, PIndexState.DISABLE, 0L)); - } - - /** - * Tests that a non-Phoenix table (created purely through HBase) doesn't log a warning in - * postCompact - */ - @Test - public void testPostCompactTableNotFound() throws Exception { - try (Connection conn = DriverManager.getConnection(getUrl())) { - HBaseTestingUtility utility = getUtility(); - String nonPhoenixTable = "NOT_A_PHOENIX_TABLE"; - utility.getHBaseAdmin().createTable(utility.createTableDescriptor(nonPhoenixTable)); - startCapturingIndexLog(); - ungroupedObserver.clearTsOnDisabledIndexes(nonPhoenixTable); - stopCapturingIndexLog(); - // a debug level event should've been logged - Mockito.verify(mockAppender).doAppend(captorLoggingEvent.capture()); - LoggingEvent loggingEvent = captorLoggingEvent.getValue(); - assertThat(loggingEvent.getLevel(), is(Level.DEBUG)); - } - } - - private void stopCapturingIndexLog() { - LogManager.getLogger(UngroupedAggregateRegionObserver.class).removeAppender(mockAppender); - } - - private void startCapturingIndexLog() { - LogManager.getLogger(UngroupedAggregateRegionObserver.class).addAppender(mockAppender); - } - - private void generateUniqueTableNames() { - schemaName = generateUniqueName(); - dataTableName = generateUniqueName() + "_DATA"; - dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); - indexTableName = generateUniqueName() + "_IDX"; - indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/178405d7/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java index 677cf92..600d0cd 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java @@ -40,14 +40,23 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.end2end.PartialScannerResultsDisabledIT; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; @@ -742,6 +751,54 @@ public class MutableIndexIT extends ParallelStatsDisabledIT { } } + // Tests that if major compaction is run on a table with a disabled index, + // deleted cells are kept + @Test + public void testCompactDisabledIndex() throws Exception { + try (Connection conn = getConnection()) { + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName() + "_DATA"; + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String indexTableName = generateUniqueName() + "_IDX"; + String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + conn.createStatement().execute( + String.format(PartialScannerResultsDisabledIT.TEST_TABLE_DDL, dataTableFullName)); + conn.createStatement().execute(String.format(PartialScannerResultsDisabledIT.INDEX_1_DDL, + indexTableName, dataTableFullName)); + + //insert a row, and delete it + PartialScannerResultsDisabledIT.writeSingleBatch(conn, 1, 1, dataTableFullName); + conn.createStatement().execute("DELETE FROM " + dataTableFullName); + conn.commit(); + + // disable the index, simulating an index write failure + PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class); + IndexUtil.updateIndexState(pConn, indexTableFullName, PIndexState.DISABLE, + EnvironmentEdgeManager.currentTimeMillis()); + + // major compaction should not remove the deleted row + List regions = getUtility().getHBaseCluster().getRegions(TableName.valueOf(dataTableFullName)); + HRegion hRegion = regions.get(0); + hRegion.flushcache(); + HStore store = (HStore) hRegion.getStore(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES); + store.triggerMajorCompaction(); + store.compactRecentForTestingAssumingDefaultPolicy(1); + HTableInterface dataHTI = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(dataTableFullName)); + assertEquals(1, TestUtil.getRawRowCount(dataHTI)); + + // reenable the index + IndexUtil.updateIndexState(pConn, indexTableFullName, PIndexState.INACTIVE, + EnvironmentEdgeManager.currentTimeMillis()); + IndexUtil.updateIndexState(pConn, indexTableFullName, PIndexState.ACTIVE, 0L); + + // now major compaction should remove the deleted row + store.triggerMajorCompaction(); + store.compactRecentForTestingAssumingDefaultPolicy(1); + dataHTI = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(dataTableFullName)); + assertEquals(0, TestUtil.getRawRowCount(dataHTI)); + } + } + private void upsertRow(String dml, Connection tenantConn, int i) throws SQLException { PreparedStatement stmt = tenantConn.prepareStatement(dml); stmt.setString(1, "00000000000000" + String.valueOf(i)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/178405d7/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java index 3961d32..46443e3 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java @@ -318,45 +318,6 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { conn.commit(); return hasInactiveIndex; } - - @Test - public void testCompactionDuringRebuild() throws Throwable { - String schemaName = generateUniqueName(); - String tableName = generateUniqueName(); - String indexName1 = generateUniqueName(); - String indexName2 = generateUniqueName(); - final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); - String fullIndexName1 = SchemaUtil.getTableName(schemaName, indexName1); - String fullIndexName2 = SchemaUtil.getTableName(schemaName, indexName2); - final MyClock clock = new MyClock(1000); - // Use our own clock to prevent race between partial rebuilder and compaction - EnvironmentEdgeManager.injectEdge(clock); - try (Connection conn = DriverManager.getConnection(getUrl())) { - conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k INTEGER PRIMARY KEY, v1 INTEGER, v2 INTEGER) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true, GUIDE_POSTS_WIDTH=1000"); - clock.time += 100; - conn.createStatement().execute("CREATE INDEX " + indexName1 + " ON " + fullTableName + " (v1) INCLUDE (v2)"); - clock.time += 100; - conn.createStatement().execute("CREATE INDEX " + indexName2 + " ON " + fullTableName + " (v2) INCLUDE (v1)"); - clock.time += 100; - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES(1, 2, 3)"); - conn.commit(); - clock.time += 100; - long disableTS = EnvironmentEdgeManager.currentTimeMillis(); - HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); - IndexUtil.updateIndexState(fullIndexName1, disableTS, metaTable, PIndexState.DISABLE); - IndexUtil.updateIndexState(fullIndexName2, disableTS, metaTable, PIndexState.DISABLE); - clock.time += 100; - TestUtil.doMajorCompaction(conn, fullIndexName1); - clock.time += 100; - assertTrue(TestUtil.checkIndexState(conn, fullIndexName1, PIndexState.DISABLE, 0L)); - assertFalse(TestUtil.checkIndexState(conn, fullIndexName2, PIndexState.DISABLE, 0L)); - TestUtil.doMajorCompaction(conn, fullTableName); - clock.time += 100; - assertTrue(TestUtil.checkIndexState(conn, fullIndexName2, PIndexState.DISABLE, 0L)); - } finally { - EnvironmentEdgeManager.injectEdge(null); - } - } @Test @Repeat(5) http://git-wip-us.apache.org/repos/asf/phoenix/blob/178405d7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index b1b8056..8ef7285 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -68,10 +68,11 @@ import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControlle import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -104,14 +105,12 @@ import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.PColumn; -import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PRow; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.SortOrder; -import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.ValueSchema.Field; import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker; @@ -146,7 +145,6 @@ import org.apache.phoenix.util.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -986,87 +984,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } - @Override - public void postCompact(final ObserverContext c, final Store store, - final StoreFile resultFile, CompactionRequest request) throws IOException { - // If we're compacting all files, then delete markers are removed - // and we must permanently disable an index that needs to be - // partially rebuild because we're potentially losing the information - // we need to successfully rebuilt it. - if (request.isMajor()) { - // Compaction and split upcalls run with the effective user context of the requesting user. - // This will lead to failure of cross cluster RPC if the effective user is not - // the login user. Switch to the login user context to ensure we have the expected - // security context. - try { - UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction() { - @Override - public Void run() throws Exception { - String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString(); - clearTsOnDisabledIndexes(fullTableName); - return null; - } - }); - } catch (InterruptedException ie) { - Thread.interrupted(); - throw new IOException(ie); - } - } - } - - @VisibleForTesting - public void clearTsOnDisabledIndexes(final String fullTableName) { - try (PhoenixConnection conn = - QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class)) { - String baseTable = fullTableName; - PTable table = PhoenixRuntime.getTableNoCache(conn, baseTable); - List indexes; - // if it's an index table, we just need to check if it's disabled - if (PTableType.INDEX.equals(table.getType())) { - indexes = Lists.newArrayList(table.getIndexes()); - indexes.add(table); - } else { - // for a data table, check all its indexes - indexes = table.getIndexes(); - } - // FIXME need handle views and indexes on views as well - // if any index is disabled, we won't have all the data for a rebuild after compaction - for (PTable index : indexes) { - if (index.getIndexDisableTimestamp() != 0) { - try { - logger.info( - "Major compaction running while index on table is disabled. Clearing index disable timestamp: " - + index); - IndexUtil.updateIndexState(conn, index.getName().getString(), - PIndexState.DISABLE, Long.valueOf(0L)); - } catch (SQLException e) { - logger.warn( - "Unable to permanently disable index " + index.getName().getString(), - e); - } - } - } - } catch (Exception e) { - if (e instanceof TableNotFoundException) { - logger.debug("Ignoring HBase table that is not a Phoenix table: " + fullTableName); - // non-Phoenix HBase tables won't be found, do nothing - return; - } - // If we can't reach the stats table, don't interrupt the normal - // compaction operation, just log a warning. - if (logger.isWarnEnabled()) { - logger.warn("Unable to permanently disable indexes being partially rebuild for " - + fullTableName, - e); - } - } - } - - @VisibleForTesting - public void setCompactionConfig(Configuration compactionConfig) { - this.compactionConfig = compactionConfig; - } - private static PTable deserializeTable(byte[] b) { try { PTableProtos.PTable ptableProto = PTableProtos.PTable.parseFrom(b); @@ -1424,4 +1341,47 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver protected boolean isRegionObserverFor(Scan scan) { return scan.getAttribute(BaseScannerRegionObserver.UNGROUPED_AGG) != null; } + + @Override + public InternalScanner preCompactScannerOpen(final ObserverContext c, + final Store store, final List scanners, ScanType scanType, + long earliestPutTs, final InternalScanner s, final CompactionRequest request) throws IOException { + // Compaction and split upcalls run with the effective user context of the requesting user. + // This will lead to failure of cross cluster RPC if the effective user is not + // the login user. Switch to the login user context to ensure we have the expected + // security context. + try { + return UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction() { + @Override + public InternalScanner run() throws Exception { + // If the index is disabled, keep the deleted cells so the rebuild doesn't corrupt the index + if (request.isMajor()) { + String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString(); + try (PhoenixConnection conn = + QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class)) { + String baseTable = fullTableName; + PTable table = PhoenixRuntime.getTableNoCache(conn, baseTable); + List indexes = PTableType.INDEX.equals(table.getType()) ? Lists.newArrayList(table) : table.getIndexes(); + // FIXME need to handle views and indexes on views as well + for (PTable index : indexes) { + if (index.getIndexDisableTimestamp() != 0) { + logger.info( + "Modifying major compaction scanner to retain deleted cells for a table with disabled index: " + + baseTable); + Scan scan = new Scan(); + scan.setMaxVersions(); + return new StoreScanner(store, store.getScanInfo(), scan, scanners, + ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), + HConstants.OLDEST_TIMESTAMP); + } + } + } + } + return s; + } + }); + } catch (InterruptedException e) { + throw new IOException(e); + } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/178405d7/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java index 7be2b6f..12c8f96 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java @@ -853,6 +853,25 @@ public class TestUtil { System.out.println("-----------------------------------------------"); } + public static int getRawRowCount(HTableInterface table) throws IOException { + Scan s = new Scan(); + s.setRaw(true);; + s.setMaxVersions(); + int rows = 0; + try (ResultScanner scanner = table.getScanner(s)) { + Result result = null; + while ((result = scanner.next()) != null) { + rows++; + CellScanner cellScanner = result.cellScanner(); + Cell current = null; + while (cellScanner.advance()) { + current = cellScanner.current(); + } + } + } + return rows; + } + public static void dumpIndexStatus(Connection conn, String indexName) throws IOException, SQLException { try (HTableInterface table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)) { System.out.println("************ dumping index status for " + indexName + " **************");