phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [2/2] phoenix git commit: PHOENIX-3280 Automatic attempt to rebuild all disabled index
Date Thu, 15 Sep 2016 08:17:44 GMT
PHOENIX-3280 Automatic attempt to rebuild all disabled index


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e2fc00db
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e2fc00db
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e2fc00db

Branch: refs/heads/4.x-HBase-1.1
Commit: e2fc00dbcabf84c6a8b6ed36e58c32b73d2af23d
Parents: a2ef919
Author: James Taylor <jamestaylor@apache.org>
Authored: Thu Sep 15 00:48:24 2016 -0700
Committer: James Taylor <jamestaylor@apache.org>
Committed: Thu Sep 15 01:18:30 2016 -0700

----------------------------------------------------------------------
 .../coprocessor/MetaDataRegionObserver.java     | 179 +++++++++++--------
 1 file changed, 104 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2fc00db/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index ff100a0..00981f5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -20,8 +20,8 @@ package org.apache.phoenix.coprocessor;
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.TimerTask;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -79,6 +79,7 @@ import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.UpgradeUtil;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 
 /**
@@ -223,13 +224,11 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                 scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                     PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
 
-                PTable dataPTable = null;
+                Map<PTable, List<PTable>> dataTableToIndexesMap = null;
                 MetaDataClient client = null;
                 boolean hasMore = false;
                 List<Cell> results = new ArrayList<Cell>();
-                List<PTable> indexesToPartiallyRebuild = Collections.emptyList();
                 scanner = this.env.getRegion().getScanner(scan);
-                long earliestDisableTimestamp = Long.MAX_VALUE;
 
                 do {
                     results.clear();
@@ -249,19 +248,12 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                     if (disabledTimeStampVal <= 0) {
                         continue;
                     }
-                    if (disabledTimeStampVal < earliestDisableTimestamp) {
-                        earliestDisableTimestamp = disabledTimeStampVal;
-                    }
-
                     byte[] dataTable = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                         PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
-                    byte[] indexStat = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                    byte[] indexState = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                         PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
-                    if ((dataTable == null || dataTable.length == 0) || (indexStat == null
|| indexStat.length == 0)
-                            || (dataPTable != null
-                                    && !dataPTable.getName().getString().equals(Bytes.toString(dataTable))))
{
+                    if ((dataTable == null || dataTable.length == 0) || (indexState == null
|| indexState.length == 0)) {
                         // data table name can't be empty
-                        // we need to build indexes of same data table. so skip other indexes
for this task.
                         continue;
                     }
 
@@ -284,14 +276,19 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                     	// don't run a second index populations upsert select 
                         props.setProperty(QueryServices.INDEX_POPULATION_SLEEP_TIME, "0");

                         conn = QueryUtil.getConnectionOnServer(props, env.getConfiguration()).unwrap(PhoenixConnection.class);
-                        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTable);
-                        dataPTable = PhoenixRuntime.getTable(conn, dataTableFullName);
-                        indexesToPartiallyRebuild = Lists.newArrayListWithExpectedSize(dataPTable.getIndexes().size());
                         client = new MetaDataClient(conn);
+                        dataTableToIndexesMap = Maps.newHashMap();
                     }
+                    String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTable);
+                    PTable dataPTable = PhoenixRuntime.getTableNoCache(conn, dataTableFullName);
 
                     String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTable);
-                    PTable indexPTable = PhoenixRuntime.getTable(conn, indexTableFullName);
+                    PTable indexPTable = PhoenixRuntime.getTableNoCache(conn, indexTableFullName);
+                    // Sanity check in case index was removed from table
+                    if (!dataPTable.getIndexes().contains(indexPTable)) {
+                        continue;
+                    }
+                    
                     if (!MetaDataUtil.tableRegionsOnline(this.env.getConfiguration(), indexPTable))
{
                         LOG.debug("Index rebuild has been skipped because not all regions
of index table="
                                 + indexPTable.getName() + " are online.");
@@ -299,82 +296,114 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                     }
                     // Allow index to begin incremental maintenance as index is back online
and we
                     // cannot transition directly from DISABLED -> ACTIVE
-                    if (Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexStat)
== 0) {
+                    if (Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexState)
== 0) {
                         AlterIndexStatement statement = new AlterIndexStatement(
                                 NamedTableNode.create(indexPTable.getSchemaName().getString(),
indexPTable.getTableName().getString()),
                                 dataPTable.getTableName().getString(),
                                 false, PIndexState.INACTIVE);
                         client.alterIndex(statement);
                     }
+                    List<PTable> indexesToPartiallyRebuild = dataTableToIndexesMap.get(dataPTable);
+                    if (indexesToPartiallyRebuild == null) {
+                        indexesToPartiallyRebuild = Lists.newArrayListWithExpectedSize(dataPTable.getIndexes().size());
+                        dataTableToIndexesMap.put(dataPTable, indexesToPartiallyRebuild);
+                    }
                     indexesToPartiallyRebuild.add(indexPTable);
                 } while (hasMore);
 
-                if (!indexesToPartiallyRebuild.isEmpty()) {
+                if (dataTableToIndexesMap != null) {
                     long overlapTime = env.getConfiguration().getLong(
-                        QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB,
-                        QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME);
-                    long timeStamp = Math.max(0, earliestDisableTimestamp - overlapTime);
-                    
-                    LOG.info("Starting to build indexes=" + indexesToPartiallyRebuild + "
from timestamp=" + timeStamp);
-                    new Scan();
-                    List<IndexMaintainer> maintainers = Lists.newArrayListWithExpectedSize(indexesToPartiallyRebuild.size());
-                    for (PTable index : indexesToPartiallyRebuild) {
-                        maintainers.add(index.getIndexMaintainer(dataPTable, conn));
-                    }
-                    Scan dataTableScan = IndexManagementUtil.newLocalStateScan(maintainers);
-                    dataTableScan.setTimeRange(timeStamp, HConstants.LATEST_TIMESTAMP);
-                    byte[] physicalTableName = dataPTable.getPhysicalName().getBytes();
-                    try (HTableInterface dataHTable = conn.getQueryServices().getTable(physicalTableName))
{
-                        Result result;
-                        try (ResultScanner dataTableScanner = dataHTable.getScanner(dataTableScan))
{
-                            int batchSize = conn.getMutateBatchSize();
-                            List<Mutation> mutations = Lists.newArrayListWithExpectedSize(batchSize);
-                            ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
-                            IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr,
indexesToPartiallyRebuild, conn);
-                            byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
-                            byte[] uuidValue = ServerCacheClient.generateId();
-        
-                            while ((result = dataTableScanner.next()) != null &&
!result.isEmpty()) {
-                                Put put = null;
-                                Delete del = null;
-                                for (Cell cell : result.rawCells()) {
-                                    if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put)
{
-                                        if (put == null) {
-                                            put = new Put(CellUtil.cloneRow(cell));
-                                            put.setAttribute(PhoenixIndexCodec.INDEX_UUID,
uuidValue);
-                                            put.setAttribute(PhoenixIndexCodec.INDEX_MD,
attribValue);
-                                            put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS,
PDataType.TRUE_BYTES);
-                                            mutations.add(put);
+                            QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB,
+                            QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME);
+                    for (Map.Entry<PTable, List<PTable>> entry : dataTableToIndexesMap.entrySet())
{
+                        PTable dataPTable = entry.getKey();
+                        List<PTable> indexesToPartiallyRebuild = entry.getValue();
+                        try {
+                            long earliestDisableTimestamp = Long.MAX_VALUE;
+                            List<IndexMaintainer> maintainers = Lists
+                                    .newArrayListWithExpectedSize(indexesToPartiallyRebuild.size());
+                            for (PTable index : indexesToPartiallyRebuild) {
+                                long disabledTimeStampVal = index.getIndexDisableTimestamp();
+                                if (disabledTimeStampVal > 0) {
+                                    if (disabledTimeStampVal < earliestDisableTimestamp)
{
+                                        earliestDisableTimestamp = disabledTimeStampVal;
+                                    }
+    
+                                    maintainers.add(index.getIndexMaintainer(dataPTable,
conn));
+                                }
+                            }
+                            // No indexes are disabled, so skip this table
+                            if (earliestDisableTimestamp == Long.MAX_VALUE) {
+                                continue;
+                            }
+
+                            long timeStamp = Math.max(0, earliestDisableTimestamp - overlapTime);
+                            LOG.info("Starting to build " + dataPTable + " indexes " + indexesToPartiallyRebuild
+                                    + " from timestamp=" + timeStamp);
+                            Scan dataTableScan = IndexManagementUtil.newLocalStateScan(maintainers);
+                            dataTableScan.setTimeRange(timeStamp, HConstants.LATEST_TIMESTAMP);
+                            byte[] physicalTableName = dataPTable.getPhysicalName().getBytes();
+                            try (HTableInterface dataHTable = conn.getQueryServices().getTable(physicalTableName))
{
+                                Result result;
+                                try (ResultScanner dataTableScanner = dataHTable.getScanner(dataTableScan))
{
+                                    int batchSize = conn.getMutateBatchSize();
+                                    List<Mutation> mutations = Lists.newArrayListWithExpectedSize(batchSize);
+                                    ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(
+                                            ByteUtil.EMPTY_BYTE_ARRAY);
+                                    IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr,
+                                            indexesToPartiallyRebuild, conn);
+                                    byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
+                                    byte[] uuidValue = ServerCacheClient.generateId();
+
+                                    while ((result = dataTableScanner.next()) != null &&
!result.isEmpty()) {
+                                        Put put = null;
+                                        Delete del = null;
+                                        for (Cell cell : result.rawCells()) {
+                                            if (KeyValue.Type.codeToType(cell.getTypeByte())
== KeyValue.Type.Put) {
+                                                if (put == null) {
+                                                    put = new Put(CellUtil.cloneRow(cell));
+                                                    put.setAttribute(PhoenixIndexCodec.INDEX_UUID,
uuidValue);
+                                                    put.setAttribute(PhoenixIndexCodec.INDEX_MD,
attribValue);
+                                                    put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS,
+                                                            PDataType.TRUE_BYTES);
+                                                    mutations.add(put);
+                                                }
+                                                put.add(cell);
+                                            } else {
+                                                if (del == null) {
+                                                    del = new Delete(CellUtil.cloneRow(cell));
+                                                    del.setAttribute(PhoenixIndexCodec.INDEX_UUID,
uuidValue);
+                                                    del.setAttribute(PhoenixIndexCodec.INDEX_MD,
attribValue);
+                                                    del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS,
+                                                            PDataType.TRUE_BYTES);
+                                                    mutations.add(del);
+                                                }
+                                                del.addDeleteMarker(cell);
+                                            }
                                         }
-                                        put.add(cell);
-                                    } else {
-                                        if (del == null) {
-                                            del = new Delete(CellUtil.cloneRow(cell));
-                                            del.setAttribute(PhoenixIndexCodec.INDEX_UUID,
uuidValue);
-                                            del.setAttribute(PhoenixIndexCodec.INDEX_MD,
attribValue);
-                                            del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS,
PDataType.TRUE_BYTES);
-                                            mutations.add(del);
+                                        if (mutations.size() == batchSize) {
+                                            dataHTable.batch(mutations);
+                                            uuidValue = ServerCacheClient.generateId();
+                                            mutations.clear();
                                         }
-                                        del.addDeleteMarker(cell);
                                     }
-                                }
-                                if (mutations.size() == batchSize) {
-                                    dataHTable.batch(mutations);
-                                    uuidValue = ServerCacheClient.generateId();
+                                    if (!mutations.isEmpty()) {
+                                        dataHTable.batch(mutations);
+                                    }
                                 }
                             }
-                            if (!mutations.isEmpty()) {
-                                dataHTable.batch(mutations);
+                            for (PTable indexPTable : indexesToPartiallyRebuild) {
+                                AlterIndexStatement statement = new AlterIndexStatement(
+                                        NamedTableNode.create(indexPTable.getSchemaName().getString(),
indexPTable
+                                                .getTableName().getString()), dataPTable.getTableName().getString(),
+                                        false, PIndexState.ACTIVE);
+                                client.alterIndex(statement);
                             }
+                        } catch (Exception e) { // Log, but try next table's indexes
+                            LOG.warn("Unable to rebuild " + dataPTable + " indexes " + indexesToPartiallyRebuild
+                                    + ". Will try again next on next scheduled invocation.",
e);
                         }
                     }
-                    for (PTable indexPTable : indexesToPartiallyRebuild) {
-                        AlterIndexStatement statement = new AlterIndexStatement(
-                                NamedTableNode.create(indexPTable.getSchemaName().getString(),
indexPTable.getTableName().getString()),
-                                dataPTable.getTableName().getString(),
-                                false, PIndexState.ACTIVE);
-                        client.alterIndex(statement);
-                    }
                 }
             } catch (Throwable t) {
                 LOG.warn("ScheduledBuildIndexTask failed!", t);


Mime
View raw message