Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 623F9200CBD for ; Thu, 1 Jun 2017 02:50:16 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 60D47160BE5; Thu, 1 Jun 2017 00:50:16 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 66C0C160BCB for ; Thu, 1 Jun 2017 02:50:14 +0200 (CEST) Received: (qmail 43313 invoked by uid 500); 1 Jun 2017 00:50:11 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 42327 invoked by uid 99); 1 Jun 2017 00:50:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Jun 2017 00:50:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 79908F2171; Thu, 1 Jun 2017 00:50:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: stack@apache.org To: commits@hbase.apache.org Date: Thu, 01 Jun 2017 00:50:21 -0000 Message-Id: <8b78537f2dde454e8ac3f17a458fc9a6@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [12/27] hbase git commit: HBASE-14614 Procedure v2 - Core Assignment Manager (Matteo Bertozzi) Move to a new AssignmentManager, one that describes Assignment using a State Machine built on top of ProcedureV2 facility. archived-at: Thu, 01 Jun 2017 00:50:16 -0000 http://git-wip-us.apache.org/repos/asf/hbase/blob/3975bbd0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java index ced7abc..c3900dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java @@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.TableState; -import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -107,10 +106,12 @@ public class CreateTableProcedure setNextState(CreateTableState.CREATE_TABLE_ASSIGN_REGIONS); break; case CREATE_TABLE_ASSIGN_REGIONS: - assignRegions(env, getTableName(), newRegions); + setEnablingState(env, getTableName()); + addChildProcedure(env.getAssignmentManager().createAssignProcedures(newRegions)); setNextState(CreateTableState.CREATE_TABLE_UPDATE_DESC_CACHE); break; case CREATE_TABLE_UPDATE_DESC_CACHE: + setEnabledState(env, getTableName()); updateTableDescCache(env, getTableName()); setNextState(CreateTableState.CREATE_TABLE_POST_OPERATION); break; @@ -333,21 +334,21 @@ public class CreateTableProcedure protected static List addTableToMeta(final MasterProcedureEnv env, final HTableDescriptor hTableDescriptor, final List regions) throws IOException { - if (regions != null && regions.size() > 0) { - ProcedureSyncWait.waitMetaRegions(env); + assert (regions != null && regions.size() > 0) : "expected at least 1 region, got " + regions; - // Add regions to META - addRegionsToMeta(env, hTableDescriptor, regions); - // Add replicas if needed - List newRegions = addReplicas(env, hTableDescriptor, regions); + ProcedureSyncWait.waitMetaRegions(env); - // Setup replication for region replicas if needed - if (hTableDescriptor.getRegionReplication() > 1) { - ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterConfiguration()); - } - return newRegions; + // Add replicas if needed + List newRegions = addReplicas(env, hTableDescriptor, regions); + + // Add regions to META + addRegionsToMeta(env, hTableDescriptor, newRegions); + + // Setup replication for region replicas if needed + if (hTableDescriptor.getRegionReplication() > 1) { + ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterConfiguration()); } - return regions; + return newRegions; } /** @@ -374,18 +375,16 @@ public class CreateTableProcedure return hRegionInfos; } - protected static void assignRegions(final MasterProcedureEnv env, - final TableName tableName, final List regions) throws IOException { - ProcedureSyncWait.waitRegionServers(env); + protected static void setEnablingState(final MasterProcedureEnv env, final TableName tableName) + throws IOException { // Mark the table as Enabling env.getMasterServices().getTableStateManager() .setTableState(tableName, TableState.State.ENABLING); + } - // Trigger immediate assignment of the regions in round-robin fashion - final AssignmentManager assignmentManager = env.getMasterServices().getAssignmentManager(); - ModifyRegionUtils.assignRegions(assignmentManager, regions); - + protected static void setEnabledState(final MasterProcedureEnv env, final TableName tableName) + throws IOException { // Enable table env.getMasterServices().getTableStateManager() .setTableState(tableName, TableState.State.ENABLED); http://git-wip-us.apache.org/repos/asf/hbase/blob/3975bbd0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java index 096172a..78bd715 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -106,7 +105,10 @@ public class DeleteColumnFamilyProcedure setNextState(DeleteColumnFamilyState.DELETE_COLUMN_FAMILY_REOPEN_ALL_REGIONS); break; case DELETE_COLUMN_FAMILY_REOPEN_ALL_REGIONS: - reOpenAllRegionsIfTableIsOnline(env); + if (env.getAssignmentManager().isTableEnabled(getTableName())) { + addChildProcedure(env.getAssignmentManager() + .createReopenProcedures(getRegionInfoList(env))); + } return Flow.NO_MORE_STATE; default: throw new UnsupportedOperationException(this + " unhandled state=" + state); @@ -292,7 +294,8 @@ public class DeleteColumnFamilyProcedure env.getMasterServices().getTableDescriptors().add(unmodifiedHTableDescriptor); // Make sure regions are opened after table descriptor is updated. - reOpenAllRegionsIfTableIsOnline(env); + //reOpenAllRegionsIfTableIsOnline(env); + // TODO: NUKE ROLLBACK!!!! } /** @@ -316,25 +319,6 @@ public class DeleteColumnFamilyProcedure } /** - * Last action from the procedure - executed when online schema change is supported. - * @param env MasterProcedureEnv - * @throws IOException - */ - private void reOpenAllRegionsIfTableIsOnline(final MasterProcedureEnv env) throws IOException { - // This operation only run when the table is enabled. - if (!env.getMasterServices().getTableStateManager() - .isTableState(getTableName(), TableState.State.ENABLED)) { - return; - } - - if (MasterDDLOperationHelper.reOpenAllRegions(env, getTableName(), getRegionInfoList(env))) { - LOG.info("Completed delete column family operation on table " + getTableName()); - } else { - LOG.warn("Error on reopening the regions on table " + getTableName()); - } - } - - /** * The procedure could be restarted from a different machine. If the variable is null, we need to * retrieve it. * @return traceEnabled @@ -376,7 +360,8 @@ public class DeleteColumnFamilyProcedure private List getRegionInfoList(final MasterProcedureEnv env) throws IOException { if (regionInfoList == null) { - regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, getTableName()); + regionInfoList = env.getAssignmentManager().getRegionStates() + .getRegionsOfTable(getTableName()); } return regionInfoList; } http://git-wip-us.apache.org/repos/asf/hbase/blob/3975bbd0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java index bda68eb..04dfc60 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.exceptions.HBaseException; import org.apache.hadoop.hbase.favored.FavoredNodesManager; -import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.mob.MobConstants; @@ -97,8 +96,8 @@ public class DeleteTableProcedure } // TODO: Move out... in the acquireLock() - LOG.debug("waiting for '" + getTableName() + "' regions in transition"); - regions = ProcedureSyncWait.getRegionsFromMeta(env, getTableName()); + LOG.debug("Waiting for '" + getTableName() + "' regions in transition"); + regions = env.getAssignmentManager().getRegionStates().getRegionsOfTable(getTableName()); assert regions != null && !regions.isEmpty() : "unexpected 0 regions"; ProcedureSyncWait.waitRegionInTransition(env, regions); @@ -350,8 +349,7 @@ public class DeleteTableProcedure final TableName tableName) throws IOException { Connection connection = env.getMasterServices().getConnection(); Scan tableScan = MetaTableAccessor.getScanForTableName(connection, tableName); - try (Table metaTable = - connection.getTable(TableName.META_TABLE_NAME)) { + try (Table metaTable = connection.getTable(TableName.META_TABLE_NAME)) { List deletes = new ArrayList<>(); try (ResultScanner resScanner = metaTable.getScanner(tableScan)) { for (Result result : resScanner) { @@ -385,11 +383,9 @@ public class DeleteTableProcedure protected static void deleteAssignmentState(final MasterProcedureEnv env, final TableName tableName) throws IOException { - final AssignmentManager am = env.getMasterServices().getAssignmentManager(); - // Clean up regions of the table in RegionStates. LOG.debug("Removing '" + tableName + "' from region states."); - am.getRegionStates().tableDeleted(tableName); + env.getMasterServices().getAssignmentManager().deleteTable(tableName); // If entry for this table states, remove it. LOG.debug("Marking '" + tableName + "' as deleted."); http://git-wip-us.apache.org/repos/asf/hbase/blob/3975bbd0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java index b53ce45..409ca26 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java @@ -21,12 +21,9 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.List; -import java.util.concurrent.ExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotEnabledException; @@ -34,17 +31,11 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.constraint.ConstraintException; -import org.apache.hadoop.hbase.master.AssignmentManager; -import org.apache.hadoop.hbase.master.BulkAssigner; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; -import org.apache.hadoop.hbase.master.RegionState; -import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.TableStateManager; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.DisableTableState; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.htrace.Trace; @InterfaceAudience.Private public class DisableTableProcedure @@ -116,12 +107,8 @@ public class DisableTableProcedure setNextState(DisableTableState.DISABLE_TABLE_MARK_REGIONS_OFFLINE); break; case DISABLE_TABLE_MARK_REGIONS_OFFLINE: - if (markRegionsOffline(env, tableName, true) == - MarkRegionOfflineOpResult.MARK_ALL_REGIONS_OFFLINE_SUCCESSFUL) { - setNextState(DisableTableState.DISABLE_TABLE_SET_DISABLED_TABLE_STATE); - } else { - LOG.trace("Retrying later to disable the missing regions"); - } + addChildProcedure(env.getAssignmentManager().createUnassignProcedures(tableName)); + setNextState(DisableTableState.DISABLE_TABLE_SET_DISABLED_TABLE_STATE); break; case DISABLE_TABLE_SET_DISABLED_TABLE_STATE: setTableStateToDisabled(env, tableName); @@ -249,7 +236,7 @@ public class DisableTableProcedure // set the state later on). A quick state check should be enough for us to move forward. TableStateManager tsm = env.getMasterServices().getTableStateManager(); TableState.State state = tsm.getTableState(tableName); - if(!state.equals(TableState.State.ENABLED)){ + if (!state.equals(TableState.State.ENABLED)){ LOG.info("Table " + tableName + " isn't enabled;is "+state.name()+"; skipping disable"); setFailure("master-disable-table", new TableNotEnabledException( tableName+" state is "+state.name())); @@ -290,83 +277,6 @@ public class DisableTableProcedure } /** - * Mark regions of the table offline with retries - * @param env MasterProcedureEnv - * @param tableName the target table - * @param retryRequired whether to retry if the first run failed - * @return whether the operation is fully completed or being interrupted. - * @throws IOException - */ - protected static MarkRegionOfflineOpResult markRegionsOffline( - final MasterProcedureEnv env, - final TableName tableName, - final Boolean retryRequired) throws IOException { - // Dev consideration: add a config to control max number of retry. For now, it is hard coded. - int maxTry = (retryRequired ? 10 : 1); - MarkRegionOfflineOpResult operationResult = - MarkRegionOfflineOpResult.BULK_ASSIGN_REGIONS_FAILED; - do { - try { - operationResult = markRegionsOffline(env, tableName); - if (operationResult == MarkRegionOfflineOpResult.MARK_ALL_REGIONS_OFFLINE_SUCCESSFUL) { - break; - } - maxTry--; - } catch (Exception e) { - LOG.warn("Received exception while marking regions online. tries left: " + maxTry, e); - maxTry--; - if (maxTry > 0) { - continue; // we still have some retry left, try again. - } - throw e; - } - } while (maxTry > 0); - - if (operationResult != MarkRegionOfflineOpResult.MARK_ALL_REGIONS_OFFLINE_SUCCESSFUL) { - LOG.warn("Some or all regions of the Table '" + tableName + "' were still online"); - } - - return operationResult; - } - - /** - * Mark regions of the table offline - * @param env MasterProcedureEnv - * @param tableName the target table - * @return whether the operation is fully completed or being interrupted. - * @throws IOException - */ - private static MarkRegionOfflineOpResult markRegionsOffline( - final MasterProcedureEnv env, - final TableName tableName) throws IOException { - // Get list of online regions that are of this table. Regions that are - // already closed will not be included in this list; i.e. the returned - // list is not ALL regions in a table, its all online regions according - // to the in-memory state on this master. - MarkRegionOfflineOpResult operationResult = - MarkRegionOfflineOpResult.MARK_ALL_REGIONS_OFFLINE_SUCCESSFUL; - final List regions = - env.getMasterServices().getAssignmentManager().getRegionStates() - .getRegionsOfTable(tableName); - if (regions.size() > 0) { - LOG.info("Offlining " + regions.size() + " regions."); - - BulkDisabler bd = new BulkDisabler(env, tableName, regions); - try { - if (!bd.bulkAssign()) { - operationResult = MarkRegionOfflineOpResult.BULK_ASSIGN_REGIONS_FAILED; - } - } catch (InterruptedException e) { - LOG.warn("Disable was interrupted"); - // Preserve the interrupt. - Thread.currentThread().interrupt(); - operationResult = MarkRegionOfflineOpResult.MARK_ALL_REGIONS_OFFLINE_INTERRUPTED; - } - } - return operationResult; - } - - /** * Mark table state to Disabled * @param env MasterProcedureEnv * @throws IOException @@ -428,64 +338,4 @@ public class DisableTableProcedure } } } - - /** - * Run bulk disable. - */ - private static class BulkDisabler extends BulkAssigner { - private final AssignmentManager assignmentManager; - private final List regions; - private final TableName tableName; - private final int waitingTimeForEvents; - - public BulkDisabler(final MasterProcedureEnv env, final TableName tableName, - final List regions) { - super(env.getMasterServices()); - this.assignmentManager = env.getMasterServices().getAssignmentManager(); - this.tableName = tableName; - this.regions = regions; - this.waitingTimeForEvents = - env.getMasterServices().getConfiguration() - .getInt("hbase.master.event.waiting.time", 1000); - } - - @Override - protected void populatePool(ExecutorService pool) { - RegionStates regionStates = assignmentManager.getRegionStates(); - for (final HRegionInfo region : regions) { - if (regionStates.isRegionInTransition(region) - && !regionStates.isRegionInState(region, RegionState.State.FAILED_CLOSE)) { - continue; - } - pool.execute(Trace.wrap("DisableTableHandler.BulkDisabler", new Runnable() { - @Override - public void run() { - assignmentManager.unassign(region); - } - })); - } - } - - @Override - protected boolean waitUntilDone(long timeout) throws InterruptedException { - long startTime = EnvironmentEdgeManager.currentTime(); - long remaining = timeout; - List regions = null; - long lastLogTime = startTime; - while (!server.isStopped() && remaining > 0) { - Thread.sleep(waitingTimeForEvents); - regions = assignmentManager.getRegionStates().getRegionsOfTable(tableName); - long now = EnvironmentEdgeManager.currentTime(); - // Don't log more than once every ten seconds. Its obnoxious. And only log table regions - // if we are waiting a while for them to go down... - if (LOG.isDebugEnabled() && ((now - lastLogTime) > 10000)) { - lastLogTime = now; - LOG.debug("Disable waiting until done; " + remaining + " ms remaining; " + regions); - } - if (regions.isEmpty()) break; - remaining = timeout - (now - startTime); - } - return regions != null && regions.isEmpty(); - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/3975bbd0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DispatchMergingRegionsProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DispatchMergingRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DispatchMergingRegionsProcedure.java new file mode 100644 index 0000000..15ed429 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DispatchMergingRegionsProcedure.java @@ -0,0 +1,584 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.RegionLoad; +import org.apache.hadoop.hbase.ServerLoad; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.UnknownRegionException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.MergeRegionException; +import org.apache.hadoop.hbase.exceptions.RegionOpeningException; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; +import org.apache.hadoop.hbase.master.assignment.RegionStates; +import org.apache.hadoop.hbase.master.CatalogJanitor; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.DispatchMergingRegionsState; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * The procedure to Merge a region in a table. + */ +@InterfaceAudience.Private +public class DispatchMergingRegionsProcedure + extends AbstractStateMachineTableProcedure { + private static final Log LOG = LogFactory.getLog(DispatchMergingRegionsProcedure.class); + + private final AtomicBoolean aborted = new AtomicBoolean(false); + private Boolean traceEnabled; + private AssignmentManager assignmentManager; + private int timeout; + private ServerName regionLocation; + private String regionsToMergeListFullName; + private String regionsToMergeListEncodedName; + + private TableName tableName; + private HRegionInfo [] regionsToMerge; + private boolean forcible; + + public DispatchMergingRegionsProcedure() { + this.traceEnabled = isTraceEnabled(); + this.assignmentManager = null; + this.timeout = -1; + this.regionLocation = null; + this.regionsToMergeListFullName = null; + this.regionsToMergeListEncodedName = null; + } + + public DispatchMergingRegionsProcedure( + final MasterProcedureEnv env, + final TableName tableName, + final HRegionInfo [] regionsToMerge, + final boolean forcible) { + super(env); + this.traceEnabled = isTraceEnabled(); + this.assignmentManager = getAssignmentManager(env); + this.tableName = tableName; + // For now, we only merge 2 regions. It could be extended to more than 2 regions in + // the future. + assert(regionsToMerge.length == 2); + this.regionsToMerge = regionsToMerge; + this.forcible = forcible; + + this.timeout = -1; + this.regionsToMergeListFullName = getRegionsToMergeListFullNameString(); + this.regionsToMergeListEncodedName = getRegionsToMergeListEncodedNameString(); + } + + @Override + protected Flow executeFromState( + final MasterProcedureEnv env, + final DispatchMergingRegionsState state) throws InterruptedException { + if (isTraceEnabled()) { + LOG.trace(this + " execute state=" + state); + } + + try { + switch (state) { + case DISPATCH_MERGING_REGIONS_PREPARE: + prepareMergeRegion(env); + setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_PRE_OPERATION); + break; + case DISPATCH_MERGING_REGIONS_PRE_OPERATION: + //Unused for now - reserve to add preMerge coprocessor in the future + setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_MOVE_REGION_TO_SAME_RS); + break; + case DISPATCH_MERGING_REGIONS_MOVE_REGION_TO_SAME_RS: + if (MoveRegionsToSameRS(env)) { + setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_DO_MERGE_IN_RS); + } else { + LOG.info("Cancel merging regions " + getRegionsToMergeListFullNameString() + + ", because can't move them to the same RS"); + setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_POST_OPERATION); + } + break; + case DISPATCH_MERGING_REGIONS_DO_MERGE_IN_RS: + doMergeInRS(env); + setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_POST_OPERATION); + break; + case DISPATCH_MERGING_REGIONS_POST_OPERATION: + //Unused for now - reserve to add postCompletedMerge coprocessor in the future + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException(this + " unhandled state=" + state); + } + } catch (IOException e) { + LOG.warn("Error trying to merge regions " + getRegionsToMergeListFullNameString() + + " in the table " + tableName + " (in state=" + state + ")", e); + + setFailure("master-merge-regions", e); + } + return Flow.HAS_MORE_STATE; + } + + @Override + protected void rollbackState( + final MasterProcedureEnv env, + final DispatchMergingRegionsState state) throws IOException, InterruptedException { + if (isTraceEnabled()) { + LOG.trace(this + " rollback state=" + state); + } + + try { + switch (state) { + case DISPATCH_MERGING_REGIONS_POST_OPERATION: + case DISPATCH_MERGING_REGIONS_DO_MERGE_IN_RS: + String msg = this + " We are in the " + state + " state." + + " It is complicated to rollback the merge operation that region server is working on." + + " Rollback is not supported and we should let the merge operation to complete"; + LOG.warn(msg); + // PONR + throw new UnsupportedOperationException(this + " unhandled state=" + state); + case DISPATCH_MERGING_REGIONS_MOVE_REGION_TO_SAME_RS: + break; // nothing to rollback + case DISPATCH_MERGING_REGIONS_PRE_OPERATION: + break; // nothing to rollback + case DISPATCH_MERGING_REGIONS_PREPARE: + break; // nothing to rollback + default: + throw new UnsupportedOperationException(this + " unhandled state=" + state); + } + } catch (Exception e) { + // This will be retried. Unless there is a bug in the code, + // this should be just a "temporary error" (e.g. network down) + LOG.warn("Failed rollback attempt step " + state + " for merging the regions " + + getRegionsToMergeListFullNameString() + " in table " + tableName, e); + throw e; + } + } + + @Override + protected DispatchMergingRegionsState getState(final int stateId) { + return DispatchMergingRegionsState.valueOf(stateId); + } + + @Override + protected int getStateId(final DispatchMergingRegionsState state) { + return state.getNumber(); + } + + @Override + protected DispatchMergingRegionsState getInitialState() { + return DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_PREPARE; + } + + /* + * Check whether we are in the state that can be rollback + */ + @Override + protected boolean isRollbackSupported(final DispatchMergingRegionsState state) { + switch (state) { + case DISPATCH_MERGING_REGIONS_POST_OPERATION: + case DISPATCH_MERGING_REGIONS_DO_MERGE_IN_RS: + // It is not safe to rollback if we reach to these states. + return false; + default: + break; + } + return true; + } + + @Override + public void serializeStateData(final OutputStream stream) throws IOException { + super.serializeStateData(stream); + + MasterProcedureProtos.DispatchMergingRegionsStateData.Builder dispatchMergingRegionsMsg = + MasterProcedureProtos.DispatchMergingRegionsStateData.newBuilder() + .setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser())) + .setTableName(ProtobufUtil.toProtoTableName(tableName)) + .setForcible(forcible); + for (HRegionInfo hri: regionsToMerge) { + dispatchMergingRegionsMsg.addRegionInfo(HRegionInfo.convert(hri)); + } + dispatchMergingRegionsMsg.build().writeDelimitedTo(stream); + } + + @Override + public void deserializeStateData(final InputStream stream) throws IOException { + super.deserializeStateData(stream); + + MasterProcedureProtos.DispatchMergingRegionsStateData dispatchMergingRegionsMsg = + MasterProcedureProtos.DispatchMergingRegionsStateData.parseDelimitedFrom(stream); + setUser(MasterProcedureUtil.toUserInfo(dispatchMergingRegionsMsg.getUserInfo())); + tableName = ProtobufUtil.toTableName(dispatchMergingRegionsMsg.getTableName()); + + assert(dispatchMergingRegionsMsg.getRegionInfoCount() == 2); + regionsToMerge = new HRegionInfo[dispatchMergingRegionsMsg.getRegionInfoCount()]; + for (int i = 0; i < regionsToMerge.length; i++) { + regionsToMerge[i] = HRegionInfo.convert(dispatchMergingRegionsMsg.getRegionInfo(i)); + } + } + + @Override + public void toStringClassDetails(StringBuilder sb) { + sb.append(getClass().getSimpleName()); + sb.append(" (table="); + sb.append(tableName); + sb.append(" regions="); + sb.append(getRegionsToMergeListFullNameString()); + sb.append(" forcible="); + sb.append(forcible); + sb.append(")"); + } + + @Override + protected LockState acquireLock(final MasterProcedureEnv env) { + if (!getTableName().isSystemTable() && env.waitInitialized(this)) { + return LockState.LOCK_EVENT_WAIT; + } + if (env.getProcedureScheduler().waitRegions(this, getTableName(), regionsToMerge)) { + return LockState.LOCK_EVENT_WAIT; + } + return LockState.LOCK_ACQUIRED; + } + + @Override + protected void releaseLock(final MasterProcedureEnv env) { + env.getProcedureScheduler().wakeRegions(this, getTableName(), regionsToMerge[0], regionsToMerge[1]); + } + + @Override + public TableName getTableName() { + return tableName; + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.REGION_MERGE; + } + + /** + * Prepare merge and do some check + * @param env MasterProcedureEnv + * @throws IOException + */ + private void prepareMergeRegion(final MasterProcedureEnv env) throws IOException { + // Note: the following logic assumes that we only have 2 regions to merge. In the future, + // if we want to extend to more than 2 regions, the code needs to modify a little bit. + // + CatalogJanitor catalogJanitor = env.getMasterServices().getCatalogJanitor(); + boolean regionAHasMergeQualifier = !catalogJanitor.cleanMergeQualifier(regionsToMerge[0]); + if (regionAHasMergeQualifier + || !catalogJanitor.cleanMergeQualifier(regionsToMerge[1])) { + String msg = "Skip merging regions " + regionsToMerge[0].getRegionNameAsString() + + ", " + regionsToMerge[1].getRegionNameAsString() + ", because region " + + (regionAHasMergeQualifier ? regionsToMerge[0].getEncodedName() : regionsToMerge[1] + .getEncodedName()) + " has merge qualifier"; + LOG.info(msg); + throw new MergeRegionException(msg); + } + + RegionStates regionStates = getAssignmentManager(env).getRegionStates(); + RegionState regionStateA = regionStates.getRegionState(regionsToMerge[0].getEncodedName()); + RegionState regionStateB = regionStates.getRegionState(regionsToMerge[1].getEncodedName()); + if (regionStateA == null || regionStateB == null) { + throw new UnknownRegionException( + regionStateA == null ? + regionsToMerge[0].getEncodedName() : regionsToMerge[1].getEncodedName()); + } + + if (!regionStateA.isOpened() || !regionStateB.isOpened()) { + throw new MergeRegionException( + "Unable to merge regions not online " + regionStateA + ", " + regionStateB); + } + + if (regionsToMerge[0].getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID || + regionsToMerge[1].getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { + throw new MergeRegionException("Can't merge non-default replicas"); + } + + if (!forcible && !HRegionInfo.areAdjacent(regionsToMerge[0], regionsToMerge[1])) { + throw new MergeRegionException( + "Unable to merge not adjacent regions " + + regionsToMerge[0].getRegionNameAsString() + ", " + + regionsToMerge[1].getRegionNameAsString() + + " where forcible = " + forcible); + } + } + + /** + * Move all regions to the same region server + * @param env MasterProcedureEnv + * @return whether target regions hosted by the same RS + * @throws IOException + */ + private boolean MoveRegionsToSameRS(final MasterProcedureEnv env) throws IOException { + // Make sure regions are on the same regionserver before send merge + // regions request to region server. + // + boolean onSameRS = isRegionsOnTheSameServer(env); + if (!onSameRS) { + // Note: the following logic assumes that we only have 2 regions to merge. In the future, + // if we want to extend to more than 2 regions, the code needs to modify a little bit. + // + RegionStates regionStates = getAssignmentManager(env).getRegionStates(); + ServerName regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[1]); + + RegionLoad loadOfRegionA = getRegionLoad(env, regionLocation, regionsToMerge[0]); + RegionLoad loadOfRegionB = getRegionLoad(env, regionLocation2, regionsToMerge[1]); + if (loadOfRegionA != null && loadOfRegionB != null + && loadOfRegionA.getRequestsCount() < loadOfRegionB.getRequestsCount()) { + // switch regionsToMerge[0] and regionsToMerge[1] + HRegionInfo tmpRegion = this.regionsToMerge[0]; + this.regionsToMerge[0] = this.regionsToMerge[1]; + this.regionsToMerge[1] = tmpRegion; + ServerName tmpLocation = regionLocation; + regionLocation = regionLocation2; + regionLocation2 = tmpLocation; + } + + long startTime = EnvironmentEdgeManager.currentTime(); + + RegionPlan regionPlan = new RegionPlan(regionsToMerge[1], regionLocation2, regionLocation); + LOG.info("Moving regions to same server for merge: " + regionPlan.toString()); + getAssignmentManager(env).moveAsync(regionPlan); + do { + try { + Thread.sleep(20); + // Make sure check RIT first, then get region location, otherwise + // we would make a wrong result if region is online between getting + // region location and checking RIT + boolean isRIT = regionStates.isRegionInTransition(regionsToMerge[1]); + regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[1]); + onSameRS = regionLocation.equals(regionLocation2); + if (onSameRS || !isRIT) { + // Regions are on the same RS, or regionsToMerge[1] is not in + // RegionInTransition any more + break; + } + } catch (InterruptedException e) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(e); + throw iioe; + } + } while ((EnvironmentEdgeManager.currentTime() - startTime) <= getTimeout(env)); + } + return onSameRS; + } + + /** + * Do the real merge operation in the region server that hosts regions + * @param env MasterProcedureEnv + * @throws IOException + */ + private void doMergeInRS(final MasterProcedureEnv env) throws IOException { + long duration = 0; + long startTime = EnvironmentEdgeManager.currentTime(); + do { + try { + if (getServerName(env) == null) { + // The merge probably already happen. Check + RegionState regionState = getAssignmentManager(env).getRegionStates().getRegionState( + regionsToMerge[0].getEncodedName()); + if (regionState.isMerging() || regionState.isMerged()) { + LOG.info("Merge regions " + getRegionsToMergeListEncodedNameString() + + " is in progress or completed. No need to send a new request."); + } else { + LOG.warn("Cannot sending merge to hosting server of the regions " + + getRegionsToMergeListEncodedNameString() + " as the server is unknown"); + } + return; + } + // TODO: the following RPC call is not idempotent. Multiple calls (eg. after master + // failover, re-execute this step) could result in some exception thrown that does not + // paint the correct picture. This behavior is on-par with old releases. Improvement + // could happen in the future. + env.getMasterServices().getServerManager().sendRegionsMerge( + getServerName(env), + regionsToMerge[0], + regionsToMerge[1], + forcible, + getUser()); + LOG.info("Sent merge to server " + getServerName(env) + " for region " + + getRegionsToMergeListEncodedNameString() + ", forcible=" + forcible); + return; + } catch (RegionOpeningException roe) { + // Do a retry since region should be online on RS immediately + LOG.warn("Failed mergering regions in " + getServerName(env) + ", retrying...", roe); + } catch (Exception ie) { + LOG.warn("Failed sending merge to " + getServerName(env) + " for regions " + + getRegionsToMergeListEncodedNameString() + ", forcible=" + forcible, ie); + return; + } + } while ((duration = EnvironmentEdgeManager.currentTime() - startTime) <= getTimeout(env)); + + // If we reaches here, it means that we get timed out. + String msg = "Failed sending merge to " + getServerName(env) + " after " + duration + "ms"; + LOG.warn(msg); + throw new IOException(msg); + } + + private RegionLoad getRegionLoad( + final MasterProcedureEnv env, + final ServerName sn, + final HRegionInfo hri) { + ServerManager serverManager = env.getMasterServices().getServerManager(); + ServerLoad load = serverManager.getLoad(sn); + if (load != null) { + Map regionsLoad = load.getRegionsLoad(); + if (regionsLoad != null) { + return regionsLoad.get(hri.getRegionName()); + } + } + return null; + } + + /** + * The procedure could be restarted from a different machine. If the variable is null, we need to + * retrieve it. + * @param env MasterProcedureEnv + * @return whether target regions hosted by the same RS + */ + private boolean isRegionsOnTheSameServer(final MasterProcedureEnv env) throws IOException{ + Boolean onSameRS = true; + int i = 0; + RegionStates regionStates = getAssignmentManager(env).getRegionStates(); + regionLocation = regionStates.getRegionServerOfRegion(regionsToMerge[i]); + if (regionLocation != null) { + for(i = 1; i < regionsToMerge.length; i++) { + ServerName regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[i]); + if (regionLocation2 != null) { + if (onSameRS) { + onSameRS = regionLocation.equals(regionLocation2); + } + } else { + // At least one region is not online, merge will fail, no need to continue. + break; + } + } + if (i == regionsToMerge.length) { + // Finish checking all regions, return the result; + return onSameRS; + } + } + + // If reaching here, at least one region is not online. + String msg = "Skip merging regions " + getRegionsToMergeListFullNameString() + + ", because region " + regionsToMerge[i].getEncodedName() + " is not online now."; + LOG.warn(msg); + throw new IOException(msg); + } + + /** + * The procedure could be restarted from a different machine. If the variable is null, we need to + * retrieve it. + * @param env MasterProcedureEnv + * @return assignmentManager + */ + private AssignmentManager getAssignmentManager(final MasterProcedureEnv env) { + if (assignmentManager == null) { + assignmentManager = env.getMasterServices().getAssignmentManager(); + } + return assignmentManager; + } + + /** + * The procedure could be restarted from a different machine. If the variable is null, we need to + * retrieve it. + * @param env MasterProcedureEnv + * @return timeout value + */ + private int getTimeout(final MasterProcedureEnv env) { + if (timeout == -1) { + timeout = env.getMasterConfiguration().getInt( + "hbase.master.regionmerge.timeout", regionsToMerge.length * 60 * 1000); + } + return timeout; + } + + /** + * The procedure could be restarted from a different machine. If the variable is null, we need to + * retrieve it. + * @param env MasterProcedureEnv + * @return serverName + */ + private ServerName getServerName(final MasterProcedureEnv env) { + if (regionLocation == null) { + regionLocation = + getAssignmentManager(env).getRegionStates().getRegionServerOfRegion(regionsToMerge[0]); + } + return regionLocation; + } + + /** + * The procedure could be restarted from a different machine. If the variable is null, we need to + * retrieve it. + * @param fullName whether return only encoded name + * @return region names in a list + */ + private String getRegionsToMergeListFullNameString() { + if (regionsToMergeListFullName == null) { + StringBuilder sb = new StringBuilder("["); + int i = 0; + while(i < regionsToMerge.length - 1) { + sb.append(regionsToMerge[i].getRegionNameAsString() + ", "); + i++; + } + sb.append(regionsToMerge[i].getRegionNameAsString() + " ]"); + regionsToMergeListFullName = sb.toString(); + } + return regionsToMergeListFullName; + } + + /** + * The procedure could be restarted from a different machine. If the variable is null, we need to + * retrieve it. + * @return encoded region names + */ + private String getRegionsToMergeListEncodedNameString() { + if (regionsToMergeListEncodedName == null) { + StringBuilder sb = new StringBuilder("["); + int i = 0; + while(i < regionsToMerge.length - 1) { + sb.append(regionsToMerge[i].getEncodedName() + ", "); + i++; + } + sb.append(regionsToMerge[i].getEncodedName() + " ]"); + regionsToMergeListEncodedName = sb.toString(); + } + return regionsToMergeListEncodedName; + } + + /** + * The procedure could be restarted from a different machine. If the variable is null, we need to + * retrieve it. + * @return traceEnabled + */ + private Boolean isTraceEnabled() { + if (traceEnabled == null) { + traceEnabled = LOG.isTraceEnabled(); + } + return traceEnabled; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/3975bbd0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java index 4d67edd..4f4b5b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java @@ -21,34 +21,20 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.TableState; -import org.apache.hadoop.hbase.master.AssignmentManager; -import org.apache.hadoop.hbase.master.BulkAssigner; -import org.apache.hadoop.hbase.master.GeneralBulkAssigner; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; -import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.RegionStates; -import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.master.TableStateManager; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.EnableTableState; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; @InterfaceAudience.Private public class EnableTableProcedure @@ -114,7 +100,7 @@ public class EnableTableProcedure setNextState(EnableTableState.ENABLE_TABLE_MARK_REGIONS_ONLINE); break; case ENABLE_TABLE_MARK_REGIONS_ONLINE: - markRegionsOnline(env, tableName, true); + addChildProcedure(env.getAssignmentManager().createAssignProcedures(tableName)); setNextState(EnableTableState.ENABLE_TABLE_SET_ENABLED_TABLE_STATE); break; case ENABLE_TABLE_SET_ENABLED_TABLE_STATE: @@ -287,137 +273,6 @@ public class EnableTableProcedure } /** - * Mark offline regions of the table online with retry - * @param env MasterProcedureEnv - * @param tableName the target table - * @param retryRequired whether to retry if the first run failed - * @throws IOException - */ - protected static void markRegionsOnline( - final MasterProcedureEnv env, - final TableName tableName, - final Boolean retryRequired) throws IOException { - // This is best effort approach to make all regions of a table online. If we fail to do - // that, it is ok that the table has some offline regions; user can fix it manually. - - // Dev consideration: add a config to control max number of retry. For now, it is hard coded. - int maxTry = (retryRequired ? 10 : 1); - boolean done = false; - - do { - try { - done = markRegionsOnline(env, tableName); - if (done) { - break; - } - maxTry--; - } catch (Exception e) { - LOG.warn("Received exception while marking regions online. tries left: " + maxTry, e); - maxTry--; - if (maxTry > 0) { - continue; // we still have some retry left, try again. - } - throw e; - } - } while (maxTry > 0); - - if (!done) { - LOG.warn("Some or all regions of the Table '" + tableName + "' were offline"); - } - } - - /** - * Mark offline regions of the table online - * @param env MasterProcedureEnv - * @param tableName the target table - * @return whether the operation is fully completed or being interrupted. - * @throws IOException - */ - private static boolean markRegionsOnline(final MasterProcedureEnv env, final TableName tableName) - throws IOException { - final AssignmentManager assignmentManager = env.getMasterServices().getAssignmentManager(); - final MasterServices masterServices = env.getMasterServices(); - final ServerManager serverManager = masterServices.getServerManager(); - boolean done = false; - // Get the regions of this table. We're done when all listed - // tables are onlined. - List> tableRegionsAndLocations; - - if (TableName.META_TABLE_NAME.equals(tableName)) { - tableRegionsAndLocations = - new MetaTableLocator().getMetaRegionsAndLocations(masterServices.getZooKeeper()); - } else { - tableRegionsAndLocations = - MetaTableAccessor.getTableRegionsAndLocations(masterServices.getConnection(), tableName); - } - - int countOfRegionsInTable = tableRegionsAndLocations.size(); - Map regionsToAssign = - regionsToAssignWithServerName(env, tableRegionsAndLocations); - - // need to potentially create some regions for the replicas - List unrecordedReplicas = - AssignmentManager.replicaRegionsNotRecordedInMeta(new HashSet<>( - regionsToAssign.keySet()), masterServices); - Map> srvToUnassignedRegs = - assignmentManager.getBalancer().roundRobinAssignment(unrecordedReplicas, - serverManager.getOnlineServersList()); - if (srvToUnassignedRegs != null) { - for (Map.Entry> entry : srvToUnassignedRegs.entrySet()) { - for (HRegionInfo h : entry.getValue()) { - regionsToAssign.put(h, entry.getKey()); - } - } - } - - int offlineRegionsCount = regionsToAssign.size(); - - LOG.info("Table '" + tableName + "' has " + countOfRegionsInTable + " regions, of which " - + offlineRegionsCount + " are offline."); - if (offlineRegionsCount == 0) { - return true; - } - - List onlineServers = serverManager.createDestinationServersList(); - Map> bulkPlan = - env.getMasterServices().getAssignmentManager().getBalancer() - .retainAssignment(regionsToAssign, onlineServers); - if (bulkPlan != null) { - LOG.info("Bulk assigning " + offlineRegionsCount + " region(s) across " + bulkPlan.size() - + " server(s), retainAssignment=true"); - - BulkAssigner ba = new GeneralBulkAssigner(masterServices, bulkPlan, assignmentManager, true); - try { - if (ba.bulkAssign()) { - done = true; - } - } catch (InterruptedException e) { - LOG.warn("Enable operation was interrupted when enabling table '" + tableName + "'"); - // Preserve the interrupt. - Thread.currentThread().interrupt(); - } - } else { - LOG.info("Balancer was unable to find suitable servers for table " + tableName - + ", leaving unassigned"); - } - return done; - } - - /** - * Mark regions of the table offline during recovery - * @param env MasterProcedureEnv - */ - private void markRegionsOfflineDuringRecovery(final MasterProcedureEnv env) { - try { - // This is a best effort attempt. We will move on even it does not succeed. We will retry - // several times until we giving up. - DisableTableProcedure.markRegionsOffline(env, tableName, true); - } catch (Exception e) { - LOG.debug("Failed to offline all regions of table " + tableName + ". Ignoring", e); - } - } - - /** * Mark table state to Enabled * @param env MasterProcedureEnv * @throws IOException @@ -457,31 +312,6 @@ public class EnableTableProcedure } /** - * @param regionsInMeta - * @return List of regions neither in transition nor assigned. - * @throws IOException - */ - private static Map regionsToAssignWithServerName( - final MasterProcedureEnv env, - final List> regionsInMeta) throws IOException { - Map regionsToAssign = new HashMap<>(regionsInMeta.size()); - RegionStates regionStates = env.getMasterServices().getAssignmentManager().getRegionStates(); - for (Pair regionLocation : regionsInMeta) { - HRegionInfo hri = regionLocation.getFirst(); - ServerName sn = regionLocation.getSecond(); - if (regionStates.isRegionOffline(hri)) { - regionsToAssign.put(hri, sn); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping assign for the region " + hri + " during enable table " - + hri.getTable() + " because its already in tranition or assigned."); - } - } - } - return regionsToAssign; - } - - /** * Coprocessor Action. * @param env MasterProcedureEnv * @param state the procedure state http://git-wip-us.apache.org/repos/asf/hbase/blob/3975bbd0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterDDLOperationHelper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterDDLOperationHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterDDLOperationHelper.java index 4b9a7ab..31d05a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterDDLOperationHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterDDLOperationHelper.java @@ -19,32 +19,19 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; -import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; -import java.util.NavigableMap; -import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.master.AssignmentManager; -import org.apache.hadoop.hbase.master.BulkReOpen; import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.util.Bytes; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - /** * Helper class for schema change procedures */ @@ -60,16 +47,13 @@ public final class MasterDDLOperationHelper { public static void deleteColumnFamilyFromFileSystem( final MasterProcedureEnv env, final TableName tableName, - List regionInfoList, + final List regionInfoList, final byte[] familyName, - boolean hasMob) throws IOException { + final boolean hasMob) throws IOException { final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); if (LOG.isDebugEnabled()) { LOG.debug("Removing family=" + Bytes.toString(familyName) + " from table=" + tableName); } - if (regionInfoList == null) { - regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, tableName); - } for (HRegionInfo hri : regionInfoList) { // Delete the family directory in FS for all the regions one by one mfs.deleteFamilyFromFS(hri, familyName); @@ -81,77 +65,4 @@ public final class MasterDDLOperationHelper { mfs.deleteFamilyFromFS(mobRootDir, mobRegionInfo, familyName); } } - - /** - * Reopen all regions from a table after a schema change operation. - **/ - public static boolean reOpenAllRegions( - final MasterProcedureEnv env, - final TableName tableName, - final List regionInfoList) throws IOException { - boolean done = false; - LOG.info("Bucketing regions by region server..."); - List regionLocations = null; - Connection connection = env.getMasterServices().getConnection(); - try (RegionLocator locator = connection.getRegionLocator(tableName)) { - regionLocations = locator.getAllRegionLocations(); - } - // Convert List to Map. - NavigableMap hri2Sn = new TreeMap<>(); - for (HRegionLocation location : regionLocations) { - hri2Sn.put(location.getRegionInfo(), location.getServerName()); - } - TreeMap> serverToRegions = Maps.newTreeMap(); - List reRegions = new ArrayList<>(); - for (HRegionInfo hri : regionInfoList) { - ServerName sn = hri2Sn.get(hri); - // Skip the offlined split parent region - // See HBASE-4578 for more information. - if (null == sn) { - LOG.info("Skip " + hri); - continue; - } - if (!serverToRegions.containsKey(sn)) { - LinkedList hriList = Lists.newLinkedList(); - serverToRegions.put(sn, hriList); - } - reRegions.add(hri); - serverToRegions.get(sn).add(hri); - } - - LOG.info("Reopening " + reRegions.size() + " regions on " + serverToRegions.size() - + " region servers."); - AssignmentManager am = env.getMasterServices().getAssignmentManager(); - am.setRegionsToReopen(reRegions); - BulkReOpen bulkReopen = new BulkReOpen(env.getMasterServices(), serverToRegions, am); - while (true) { - try { - if (bulkReopen.bulkReOpen()) { - done = true; - break; - } else { - LOG.warn("Timeout before reopening all regions"); - } - } catch (InterruptedException e) { - LOG.warn("Reopen was interrupted"); - // Preserve the interrupt. - Thread.currentThread().interrupt(); - break; - } - } - return done; - } - - /** - * Get the region info list of a table from meta if it is not already known by the caller. - **/ - public static List getRegionInfoList( - final MasterProcedureEnv env, - final TableName tableName, - List regionInfoList) throws IOException { - if (regionInfoList == null) { - regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, tableName); - } - return regionInfoList; - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/3975bbd0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java index c21137d..f815bea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java @@ -29,7 +29,7 @@ public final class MasterProcedureConstants { /** Number of threads used by the procedure executor */ public static final String MASTER_PROCEDURE_THREADS = "hbase.master.procedure.threads"; - public static final int DEFAULT_MIN_MASTER_PROCEDURE_THREADS = 4; + public static final int DEFAULT_MIN_MASTER_PROCEDURE_THREADS = 16; /** * Procedure replay sanity check. In case a WAL is missing or unreadable we http://git-wip-us.apache.org/repos/asf/hbase/blob/3975bbd0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index 2cd5b08..0f1c40f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; @@ -93,12 +94,19 @@ public class MasterProcedureEnv implements ConfigurationObserver { } } + private final RSProcedureDispatcher remoteDispatcher; private final MasterProcedureScheduler procSched; private final MasterServices master; public MasterProcedureEnv(final MasterServices master) { + this(master, new RSProcedureDispatcher(master)); + } + + public MasterProcedureEnv(final MasterServices master, + final RSProcedureDispatcher remoteDispatcher) { this.master = master; this.procSched = new MasterProcedureScheduler(master.getConfiguration()); + this.remoteDispatcher = remoteDispatcher; } public User getRequestUser() { @@ -117,6 +125,10 @@ public class MasterProcedureEnv implements ConfigurationObserver { return master.getConfiguration(); } + public AssignmentManager getAssignmentManager() { + return master.getAssignmentManager(); + } + public MasterCoprocessorHost getMasterCoprocessorHost() { return master.getMasterCoprocessorHost(); } @@ -125,7 +137,12 @@ public class MasterProcedureEnv implements ConfigurationObserver { return procSched; } + public RSProcedureDispatcher getRemoteDispatcher() { + return remoteDispatcher; + } + public boolean isRunning() { + if (this.master == null || this.master.getMasterProcedureExecutor() == null) return false; return master.getMasterProcedureExecutor().isRunning(); } @@ -134,11 +151,18 @@ public class MasterProcedureEnv implements ConfigurationObserver { } public boolean waitInitialized(Procedure proc) { - return procSched.waitEvent(((HMaster)master).getInitializedEvent(), proc); + return procSched.waitEvent(master.getInitializedEvent(), proc); } public boolean waitServerCrashProcessingEnabled(Procedure proc) { - return procSched.waitEvent(((HMaster)master).getServerCrashProcessingEnabledEvent(), proc); + if (master instanceof HMaster) { + return procSched.waitEvent(((HMaster)master).getServerCrashProcessingEnabledEvent(), proc); + } + return false; + } + + public boolean waitFailoverCleanup(Procedure proc) { + return procSched.waitEvent(master.getAssignmentManager().getFailoverCleanupEvent(), proc); } public void setEventReady(ProcedureEvent event, boolean isReady) { @@ -153,4 +177,4 @@ public class MasterProcedureEnv implements ConfigurationObserver { public void onConfigurationChange(Configuration conf) { master.getMasterProcedureExecutor().refreshConfiguration(conf); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/3975bbd0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 15b557a..1410748 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -598,11 +598,13 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { return false; // region operations are using the shared-lock on the table // and then they will grab an xlock on the region. - case SPLIT: - case MERGE: - case ASSIGN: - case UNASSIGN: + case REGION_SPLIT: + case REGION_MERGE: + case REGION_ASSIGN: + case REGION_UNASSIGN: case REGION_EDIT: + case REGION_GC: + case MERGED_REGIONS_GC: return false; default: break; @@ -815,7 +817,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { boolean hasLock = true; final LockAndQueue[] regionLocks = new LockAndQueue[regionInfo.length]; for (int i = 0; i < regionInfo.length; ++i) { - assert regionInfo[i].getTable().equals(table); + LOG.info(procedure + " " + table + " " + regionInfo[i].getRegionNameAsString()); + assert table != null; + assert regionInfo[i] != null; + assert regionInfo[i].getTable() != null; + assert regionInfo[i].getTable().equals(table): regionInfo[i] + " " + procedure; assert i == 0 || regionInfo[i] != regionInfo[i - 1] : "duplicate region: " + regionInfo[i]; regionLocks[i] = locking.getRegionLock(regionInfo[i].getEncodedName()); @@ -1254,7 +1260,12 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { */ @VisibleForTesting public String dumpLocks() throws IOException { - // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter - return this.locking.toString(); + schedLock(); + try { + // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter + return this.locking.toString(); + } finally { + schedUnlock(); + } } }