Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2F3D3200B85 for ; Thu, 15 Sep 2016 10:17:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2DE68160AD7; Thu, 15 Sep 2016 08:17:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 23F81160AB7 for ; Thu, 15 Sep 2016 10:17:44 +0200 (CEST) Received: (qmail 12727 invoked by uid 500); 15 Sep 2016 08:17:44 -0000 Mailing-List: contact commits-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list commits@phoenix.apache.org Received: (qmail 12711 invoked by uid 99); 15 Sep 2016 08:17:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Sep 2016 08:17:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F02D5DFA0C; Thu, 15 Sep 2016 08:17:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jamestaylor@apache.org To: commits@phoenix.apache.org Date: Thu, 15 Sep 2016 08:17:44 -0000 Message-Id: <006717b4c4704fbf81e62d997e8069f8@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] phoenix git commit: PHOENIX-3280 Automatic attempt to rebuild all disabled index archived-at: Thu, 15 Sep 2016 08:17:46 -0000 PHOENIX-3280 Automatic attempt to rebuild all disabled index Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e2fc00db Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e2fc00db Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e2fc00db Branch: refs/heads/4.x-HBase-1.1 Commit: e2fc00dbcabf84c6a8b6ed36e58c32b73d2af23d Parents: a2ef919 Author: James Taylor Authored: Thu Sep 15 00:48:24 2016 -0700 Committer: James Taylor Committed: Thu Sep 15 01:18:30 2016 -0700 ---------------------------------------------------------------------- .../coprocessor/MetaDataRegionObserver.java | 179 +++++++++++-------- 1 file changed, 104 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2fc00db/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java index ff100a0..00981f5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java @@ -20,8 +20,8 @@ package org.apache.phoenix.coprocessor; import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.TimerTask; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -79,6 +79,7 @@ import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.UpgradeUtil; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; /** @@ -223,13 +224,11 @@ public class MetaDataRegionObserver extends BaseRegionObserver { scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES); - PTable dataPTable = null; + Map> dataTableToIndexesMap = null; MetaDataClient client = null; boolean hasMore = false; List results = new ArrayList(); - List indexesToPartiallyRebuild = Collections.emptyList(); scanner = this.env.getRegion().getScanner(scan); - long earliestDisableTimestamp = Long.MAX_VALUE; do { results.clear(); @@ -249,19 +248,12 @@ public class MetaDataRegionObserver extends BaseRegionObserver { if (disabledTimeStampVal <= 0) { continue; } - if (disabledTimeStampVal < earliestDisableTimestamp) { - earliestDisableTimestamp = disabledTimeStampVal; - } - byte[] dataTable = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES); - byte[] indexStat = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, + byte[] indexState = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES); - if ((dataTable == null || dataTable.length == 0) || (indexStat == null || indexStat.length == 0) - || (dataPTable != null - && !dataPTable.getName().getString().equals(Bytes.toString(dataTable)))) { + if ((dataTable == null || dataTable.length == 0) || (indexState == null || indexState.length == 0)) { // data table name can't be empty - // we need to build indexes of same data table. so skip other indexes for this task. continue; } @@ -284,14 +276,19 @@ public class MetaDataRegionObserver extends BaseRegionObserver { // don't run a second index populations upsert select props.setProperty(QueryServices.INDEX_POPULATION_SLEEP_TIME, "0"); conn = QueryUtil.getConnectionOnServer(props, env.getConfiguration()).unwrap(PhoenixConnection.class); - String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTable); - dataPTable = PhoenixRuntime.getTable(conn, dataTableFullName); - indexesToPartiallyRebuild = Lists.newArrayListWithExpectedSize(dataPTable.getIndexes().size()); client = new MetaDataClient(conn); + dataTableToIndexesMap = Maps.newHashMap(); } + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTable); + PTable dataPTable = PhoenixRuntime.getTableNoCache(conn, dataTableFullName); String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTable); - PTable indexPTable = PhoenixRuntime.getTable(conn, indexTableFullName); + PTable indexPTable = PhoenixRuntime.getTableNoCache(conn, indexTableFullName); + // Sanity check in case index was removed from table + if (!dataPTable.getIndexes().contains(indexPTable)) { + continue; + } + if (!MetaDataUtil.tableRegionsOnline(this.env.getConfiguration(), indexPTable)) { LOG.debug("Index rebuild has been skipped because not all regions of index table=" + indexPTable.getName() + " are online."); @@ -299,82 +296,114 @@ public class MetaDataRegionObserver extends BaseRegionObserver { } // Allow index to begin incremental maintenance as index is back online and we // cannot transition directly from DISABLED -> ACTIVE - if (Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexStat) == 0) { + if (Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexState) == 0) { AlterIndexStatement statement = new AlterIndexStatement( NamedTableNode.create(indexPTable.getSchemaName().getString(), indexPTable.getTableName().getString()), dataPTable.getTableName().getString(), false, PIndexState.INACTIVE); client.alterIndex(statement); } + List indexesToPartiallyRebuild = dataTableToIndexesMap.get(dataPTable); + if (indexesToPartiallyRebuild == null) { + indexesToPartiallyRebuild = Lists.newArrayListWithExpectedSize(dataPTable.getIndexes().size()); + dataTableToIndexesMap.put(dataPTable, indexesToPartiallyRebuild); + } indexesToPartiallyRebuild.add(indexPTable); } while (hasMore); - if (!indexesToPartiallyRebuild.isEmpty()) { + if (dataTableToIndexesMap != null) { long overlapTime = env.getConfiguration().getLong( - QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, - QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME); - long timeStamp = Math.max(0, earliestDisableTimestamp - overlapTime); - - LOG.info("Starting to build indexes=" + indexesToPartiallyRebuild + " from timestamp=" + timeStamp); - new Scan(); - List maintainers = Lists.newArrayListWithExpectedSize(indexesToPartiallyRebuild.size()); - for (PTable index : indexesToPartiallyRebuild) { - maintainers.add(index.getIndexMaintainer(dataPTable, conn)); - } - Scan dataTableScan = IndexManagementUtil.newLocalStateScan(maintainers); - dataTableScan.setTimeRange(timeStamp, HConstants.LATEST_TIMESTAMP); - byte[] physicalTableName = dataPTable.getPhysicalName().getBytes(); - try (HTableInterface dataHTable = conn.getQueryServices().getTable(physicalTableName)) { - Result result; - try (ResultScanner dataTableScanner = dataHTable.getScanner(dataTableScan)) { - int batchSize = conn.getMutateBatchSize(); - List mutations = Lists.newArrayListWithExpectedSize(batchSize); - ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY); - IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr, indexesToPartiallyRebuild, conn); - byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr); - byte[] uuidValue = ServerCacheClient.generateId(); - - while ((result = dataTableScanner.next()) != null && !result.isEmpty()) { - Put put = null; - Delete del = null; - for (Cell cell : result.rawCells()) { - if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) { - if (put == null) { - put = new Put(CellUtil.cloneRow(cell)); - put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); - put.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); - put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES); - mutations.add(put); + QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, + QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME); + for (Map.Entry> entry : dataTableToIndexesMap.entrySet()) { + PTable dataPTable = entry.getKey(); + List indexesToPartiallyRebuild = entry.getValue(); + try { + long earliestDisableTimestamp = Long.MAX_VALUE; + List maintainers = Lists + .newArrayListWithExpectedSize(indexesToPartiallyRebuild.size()); + for (PTable index : indexesToPartiallyRebuild) { + long disabledTimeStampVal = index.getIndexDisableTimestamp(); + if (disabledTimeStampVal > 0) { + if (disabledTimeStampVal < earliestDisableTimestamp) { + earliestDisableTimestamp = disabledTimeStampVal; + } + + maintainers.add(index.getIndexMaintainer(dataPTable, conn)); + } + } + // No indexes are disabled, so skip this table + if (earliestDisableTimestamp == Long.MAX_VALUE) { + continue; + } + + long timeStamp = Math.max(0, earliestDisableTimestamp - overlapTime); + LOG.info("Starting to build " + dataPTable + " indexes " + indexesToPartiallyRebuild + + " from timestamp=" + timeStamp); + Scan dataTableScan = IndexManagementUtil.newLocalStateScan(maintainers); + dataTableScan.setTimeRange(timeStamp, HConstants.LATEST_TIMESTAMP); + byte[] physicalTableName = dataPTable.getPhysicalName().getBytes(); + try (HTableInterface dataHTable = conn.getQueryServices().getTable(physicalTableName)) { + Result result; + try (ResultScanner dataTableScanner = dataHTable.getScanner(dataTableScan)) { + int batchSize = conn.getMutateBatchSize(); + List mutations = Lists.newArrayListWithExpectedSize(batchSize); + ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable( + ByteUtil.EMPTY_BYTE_ARRAY); + IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr, + indexesToPartiallyRebuild, conn); + byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr); + byte[] uuidValue = ServerCacheClient.generateId(); + + while ((result = dataTableScanner.next()) != null && !result.isEmpty()) { + Put put = null; + Delete del = null; + for (Cell cell : result.rawCells()) { + if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) { + if (put == null) { + put = new Put(CellUtil.cloneRow(cell)); + put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); + put.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); + put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, + PDataType.TRUE_BYTES); + mutations.add(put); + } + put.add(cell); + } else { + if (del == null) { + del = new Delete(CellUtil.cloneRow(cell)); + del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); + del.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); + del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, + PDataType.TRUE_BYTES); + mutations.add(del); + } + del.addDeleteMarker(cell); + } } - put.add(cell); - } else { - if (del == null) { - del = new Delete(CellUtil.cloneRow(cell)); - del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); - del.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); - del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES); - mutations.add(del); + if (mutations.size() == batchSize) { + dataHTable.batch(mutations); + uuidValue = ServerCacheClient.generateId(); + mutations.clear(); } - del.addDeleteMarker(cell); } - } - if (mutations.size() == batchSize) { - dataHTable.batch(mutations); - uuidValue = ServerCacheClient.generateId(); + if (!mutations.isEmpty()) { + dataHTable.batch(mutations); + } } } - if (!mutations.isEmpty()) { - dataHTable.batch(mutations); + for (PTable indexPTable : indexesToPartiallyRebuild) { + AlterIndexStatement statement = new AlterIndexStatement( + NamedTableNode.create(indexPTable.getSchemaName().getString(), indexPTable + .getTableName().getString()), dataPTable.getTableName().getString(), + false, PIndexState.ACTIVE); + client.alterIndex(statement); } + } catch (Exception e) { // Log, but try next table's indexes + LOG.warn("Unable to rebuild " + dataPTable + " indexes " + indexesToPartiallyRebuild + + ". Will try again next on next scheduled invocation.", e); } } - for (PTable indexPTable : indexesToPartiallyRebuild) { - AlterIndexStatement statement = new AlterIndexStatement( - NamedTableNode.create(indexPTable.getSchemaName().getString(), indexPTable.getTableName().getString()), - dataPTable.getTableName().getString(), - false, PIndexState.ACTIVE); - client.alterIndex(statement); - } } } catch (Throwable t) { LOG.warn("ScheduledBuildIndexTask failed!", t);