Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B7F72200C8E for ; Thu, 25 May 2017 05:47:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B675D160BDD; Thu, 25 May 2017 03:47:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EDA73160BDF for ; Thu, 25 May 2017 05:47:42 +0200 (CEST) Received: (qmail 11639 invoked by uid 500); 25 May 2017 03:47:41 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 10566 invoked by uid 99); 25 May 2017 03:47:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 May 2017 03:47:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D3DF9E04F2; Thu, 25 May 2017 03:47:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: stack@apache.org To: commits@hbase.apache.org Date: Thu, 25 May 2017 03:47:53 -0000 Message-Id: <75aa250afa574d6fb1587aafbde94bbf@git.apache.org> In-Reply-To: <047e58e9b68749358267db89aeb78712@git.apache.org> References: <047e58e9b68749358267db89aeb78712@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [14/27] hbase git commit: HBASE-14614 Procedure v2 - Core Assignment Manager (Matteo Bertozzi) Move to a new AssignmentManager, one that describes Assignment using a State Machine built on top of ProcedureV2 facility. archived-at: Thu, 25 May 2017 03:47:46 -0000 http://git-wip-us.apache.org/repos/asf/hbase/blob/dc1065a8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java new file mode 100644 index 0000000..2b1de9d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java @@ -0,0 +1,776 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.assignment; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MetaMutationAnnotation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.UnknownRegionException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MasterSwitchType; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.exceptions.MergeRegionException; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.master.CatalogJanitor; +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineTableProcedure; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MergeTableRegionsState; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; + +import com.google.common.annotations.VisibleForTesting; +import com.lmax.disruptor.YieldingWaitStrategy; + +/** + * The procedure to Merge a region in a table. + * This procedure takes an exclusive table lock since it is working over multiple regions. + * It holds the lock for the life of the procedure. + */ +@InterfaceAudience.Private +public class MergeTableRegionsProcedure + extends AbstractStateMachineTableProcedure { + private static final Log LOG = LogFactory.getLog(MergeTableRegionsProcedure.class); + private Boolean traceEnabled; + private volatile boolean lock = false; + private ServerName regionLocation; + private HRegionInfo[] regionsToMerge; + private HRegionInfo mergedRegion; + private boolean forcible; + + public MergeTableRegionsProcedure() { + // Required by the Procedure framework to create the procedure on replay + } + + public MergeTableRegionsProcedure(final MasterProcedureEnv env, + final HRegionInfo regionToMergeA, final HRegionInfo regionToMergeB) throws IOException { + this(env, regionToMergeA, regionToMergeB, false); + } + + public MergeTableRegionsProcedure(final MasterProcedureEnv env, + final HRegionInfo regionToMergeA, final HRegionInfo regionToMergeB, + final boolean forcible) throws MergeRegionException { + this(env, new HRegionInfo[] {regionToMergeA, regionToMergeB}, forcible); + } + + public MergeTableRegionsProcedure(final MasterProcedureEnv env, + final HRegionInfo[] regionsToMerge, final boolean forcible) + throws MergeRegionException { + super(env); + + // Check daughter regions and make sure that we have valid daughter regions + // before doing the real work. + checkRegionsToMerge(regionsToMerge, forcible); + + // WARN: make sure there is no parent region of the two merging regions in + // hbase:meta If exists, fixing up daughters would cause daughter regions(we + // have merged one) online again when we restart master, so we should clear + // the parent region to prevent the above case + // Since HBASE-7721, we don't need fix up daughters any more. so here do nothing + this.regionsToMerge = regionsToMerge; + this.mergedRegion = createMergedRegionInfo(regionsToMerge); + this.forcible = forcible; + } + + private static void checkRegionsToMerge(final HRegionInfo[] regionsToMerge, + final boolean forcible) throws MergeRegionException { + // For now, we only merge 2 regions. + // It could be extended to more than 2 regions in the future. + if (regionsToMerge == null || regionsToMerge.length != 2) { + throw new MergeRegionException("Expected to merge 2 regions, got: " + + Arrays.toString(regionsToMerge)); + } + + checkRegionsToMerge(regionsToMerge[0], regionsToMerge[1], forcible); + } + + private static void checkRegionsToMerge(final HRegionInfo regionToMergeA, + final HRegionInfo regionToMergeB, final boolean forcible) throws MergeRegionException { + if (!regionToMergeA.getTable().equals(regionToMergeB.getTable())) { + throw new MergeRegionException("Can't merge regions from two different tables: " + + regionToMergeA + ", " + regionToMergeB); + } + + if (regionToMergeA.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID || + regionToMergeB.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { + throw new MergeRegionException("Can't merge non-default replicas"); + } + + if (!HRegionInfo.areAdjacent(regionToMergeA, regionToMergeB)) { + String msg = "Unable to merge not adjacent regions " + regionToMergeA.getShortNameToLog() + + ", " + regionToMergeB.getShortNameToLog() + " where forcible = " + forcible; + LOG.warn(msg); + if (!forcible) { + throw new MergeRegionException(msg); + } + } + } + + private static HRegionInfo createMergedRegionInfo(final HRegionInfo[] regionsToMerge) { + return createMergedRegionInfo(regionsToMerge[0], regionsToMerge[1]); + } + + /** + * Create merged region info through the specified two regions + */ + private static HRegionInfo createMergedRegionInfo(final HRegionInfo regionToMergeA, + final HRegionInfo regionToMergeB) { + // Choose the smaller as start key + final byte[] startKey; + if (regionToMergeA.compareTo(regionToMergeB) <= 0) { + startKey = regionToMergeA.getStartKey(); + } else { + startKey = regionToMergeB.getStartKey(); + } + + // Choose the bigger as end key + final byte[] endKey; + if (Bytes.equals(regionToMergeA.getEndKey(), HConstants.EMPTY_BYTE_ARRAY) + || (!Bytes.equals(regionToMergeB.getEndKey(), HConstants.EMPTY_BYTE_ARRAY) + && Bytes.compareTo(regionToMergeA.getEndKey(), regionToMergeB.getEndKey()) > 0)) { + endKey = regionToMergeA.getEndKey(); + } else { + endKey = regionToMergeB.getEndKey(); + } + + // Merged region is sorted between two merging regions in META + final long rid = getMergedRegionIdTimestamp(regionToMergeA, regionToMergeB); + return new HRegionInfo(regionToMergeA.getTable(), startKey, endKey, false, rid); + } + + private static long getMergedRegionIdTimestamp(final HRegionInfo regionToMergeA, + final HRegionInfo regionToMergeB) { + long rid = EnvironmentEdgeManager.currentTime(); + // Regionid is timestamp. Merged region's id can't be less than that of + // merging regions else will insert at wrong location in hbase:meta (See HBASE-710). + if (rid < regionToMergeA.getRegionId() || rid < regionToMergeB.getRegionId()) { + LOG.warn("Clock skew; merging regions id are " + regionToMergeA.getRegionId() + + " and " + regionToMergeB.getRegionId() + ", but current time here is " + rid); + rid = Math.max(regionToMergeA.getRegionId(), regionToMergeB.getRegionId()) + 1; + } + return rid; + } + + @Override + protected Flow executeFromState( + final MasterProcedureEnv env, + final MergeTableRegionsState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + if (LOG.isDebugEnabled()) { + LOG.debug(this + " execute state=" + state); + } + try { + switch (state) { + case MERGE_TABLE_REGIONS_PREPARE: + if (!prepareMergeRegion(env)) { + assert isFailed() : "Merge region should have an exception here"; + return Flow.NO_MORE_STATE; + } + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION); + break; + case MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION: + preMergeRegions(env); + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE); + break; + case MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE: + setRegionStateToMerging(env); + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CLOSE_REGIONS); + break; + case MERGE_TABLE_REGIONS_CLOSE_REGIONS: + addChildProcedure(createUnassignProcedures(env, getRegionReplication(env))); + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_CREATE_MERGED_REGION); + break; + case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION: + createMergedRegion(env); + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION); + break; + case MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION: + preMergeRegionsCommit(env); + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_UPDATE_META); + break; + case MERGE_TABLE_REGIONS_UPDATE_META: + updateMetaForMergedRegions(env); + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION); + break; + case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION: + postMergeRegionsCommit(env); + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_OPEN_MERGED_REGION); + break; + case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION: + addChildProcedure(createAssignProcedures(env, getRegionReplication(env))); + setNextState(MergeTableRegionsState.MERGE_TABLE_REGIONS_POST_OPERATION); + break; + case MERGE_TABLE_REGIONS_POST_OPERATION: + postCompletedMergeRegions(env); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException(this + " unhandled state=" + state); + } + } catch (IOException e) { + LOG.warn("Error trying to merge regions " + HRegionInfo.getShortNameToLog(regionsToMerge) + + " in the table " + getTableName() + " (in state=" + state + ")", e); + + setFailure("master-merge-regions", e); + } + return Flow.HAS_MORE_STATE; + } + + @Override + protected void rollbackState( + final MasterProcedureEnv env, + final MergeTableRegionsState state) throws IOException, InterruptedException { + if (isTraceEnabled()) { + LOG.trace(this + " rollback state=" + state); + } + + try { + switch (state) { + case MERGE_TABLE_REGIONS_POST_OPERATION: + case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION: + case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION: + case MERGE_TABLE_REGIONS_UPDATE_META: + String msg = this + " We are in the " + state + " state." + + " It is complicated to rollback the merge operation that region server is working on." + + " Rollback is not supported and we should let the merge operation to complete"; + LOG.warn(msg); + // PONR + throw new UnsupportedOperationException(this + " unhandled state=" + state); + case MERGE_TABLE_REGIONS_PRE_MERGE_COMMIT_OPERATION: + break; + case MERGE_TABLE_REGIONS_CREATE_MERGED_REGION: + cleanupMergedRegion(env); + break; + case MERGE_TABLE_REGIONS_CLOSE_REGIONS: + rollbackCloseRegionsForMerge(env); + break; + case MERGE_TABLE_REGIONS_SET_MERGING_TABLE_STATE: + setRegionStateToRevertMerging(env); + break; + case MERGE_TABLE_REGIONS_PRE_MERGE_OPERATION: + postRollBackMergeRegions(env); + break; + case MERGE_TABLE_REGIONS_MOVE_REGION_TO_SAME_RS: + break; // nothing to rollback + case MERGE_TABLE_REGIONS_PREPARE: + break; + default: + throw new UnsupportedOperationException(this + " unhandled state=" + state); + } + } catch (Exception e) { + // This will be retried. Unless there is a bug in the code, + // this should be just a "temporary error" (e.g. network down) + LOG.warn("Failed rollback attempt step " + state + " for merging the regions " + + HRegionInfo.getShortNameToLog(regionsToMerge) + " in table " + getTableName(), e); + throw e; + } + } + + /* + * Check whether we are in the state that can be rollback + */ + @Override + protected boolean isRollbackSupported(final MergeTableRegionsState state) { + switch (state) { + case MERGE_TABLE_REGIONS_POST_OPERATION: + case MERGE_TABLE_REGIONS_OPEN_MERGED_REGION: + case MERGE_TABLE_REGIONS_POST_MERGE_COMMIT_OPERATION: + case MERGE_TABLE_REGIONS_UPDATE_META: + // It is not safe to rollback if we reach to these states. + return false; + default: + break; + } + return true; + } + + @Override + protected MergeTableRegionsState getState(final int stateId) { + return MergeTableRegionsState.forNumber(stateId); + } + + @Override + protected int getStateId(final MergeTableRegionsState state) { + return state.getNumber(); + } + + @Override + protected MergeTableRegionsState getInitialState() { + return MergeTableRegionsState.MERGE_TABLE_REGIONS_PREPARE; + } + + @Override + public void serializeStateData(final OutputStream stream) throws IOException { + super.serializeStateData(stream); + + final MasterProcedureProtos.MergeTableRegionsStateData.Builder mergeTableRegionsMsg = + MasterProcedureProtos.MergeTableRegionsStateData.newBuilder() + .setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser())) + .setMergedRegionInfo(HRegionInfo.convert(mergedRegion)) + .setForcible(forcible); + for (int i = 0; i < regionsToMerge.length; ++i) { + mergeTableRegionsMsg.addRegionInfo(HRegionInfo.convert(regionsToMerge[i])); + } + mergeTableRegionsMsg.build().writeDelimitedTo(stream); + } + + @Override + public void deserializeStateData(final InputStream stream) throws IOException { + super.deserializeStateData(stream); + + final MasterProcedureProtos.MergeTableRegionsStateData mergeTableRegionsMsg = + MasterProcedureProtos.MergeTableRegionsStateData.parseDelimitedFrom(stream); + setUser(MasterProcedureUtil.toUserInfo(mergeTableRegionsMsg.getUserInfo())); + + assert(mergeTableRegionsMsg.getRegionInfoCount() == 2); + regionsToMerge = new HRegionInfo[mergeTableRegionsMsg.getRegionInfoCount()]; + for (int i = 0; i < regionsToMerge.length; i++) { + regionsToMerge[i] = HRegionInfo.convert(mergeTableRegionsMsg.getRegionInfo(i)); + } + + mergedRegion = HRegionInfo.convert(mergeTableRegionsMsg.getMergedRegionInfo()); + } + + @Override + public void toStringClassDetails(StringBuilder sb) { + sb.append(getClass().getSimpleName()); + sb.append(" table="); + sb.append(getTableName()); + sb.append(", regions="); + sb.append(HRegionInfo.getShortNameToLog(regionsToMerge)); + sb.append(", forcibly="); + sb.append(forcible); + } + + @Override + protected LockState acquireLock(final MasterProcedureEnv env) { + if (env.waitInitialized(this)) return LockState.LOCK_EVENT_WAIT; + if (env.getProcedureScheduler().waitRegions(this, getTableName(), + mergedRegion, regionsToMerge[0], regionsToMerge[1])) { + try { + LOG.debug(LockState.LOCK_EVENT_WAIT + " " + env.getProcedureScheduler().dumpLocks()); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return LockState.LOCK_EVENT_WAIT; + } + this.lock = true; + return LockState.LOCK_ACQUIRED; + } + + @Override + protected void releaseLock(final MasterProcedureEnv env) { + this.lock = false; + env.getProcedureScheduler().wakeRegions(this, getTableName(), + mergedRegion, regionsToMerge[0], regionsToMerge[1]); + } + + @Override + protected boolean holdLock(MasterProcedureEnv env) { + return true; + } + + @Override + protected boolean hasLock(MasterProcedureEnv env) { + return this.lock; + } + + @Override + public TableName getTableName() { + return mergedRegion.getTable(); + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.REGION_MERGE; + } + + /** + * Prepare merge and do some check + * @param env MasterProcedureEnv + * @throws IOException + */ + private boolean prepareMergeRegion(final MasterProcedureEnv env) throws IOException { + // Note: the following logic assumes that we only have 2 regions to merge. In the future, + // if we want to extend to more than 2 regions, the code needs to modify a little bit. + // + CatalogJanitor catalogJanitor = env.getMasterServices().getCatalogJanitor(); + boolean regionAHasMergeQualifier = !catalogJanitor.cleanMergeQualifier(regionsToMerge[0]); + if (regionAHasMergeQualifier + || !catalogJanitor.cleanMergeQualifier(regionsToMerge[1])) { + String msg = "Skip merging regions " + HRegionInfo.getShortNameToLog(regionsToMerge) + + ", because region " + + (regionAHasMergeQualifier ? regionsToMerge[0].getEncodedName() : regionsToMerge[1] + .getEncodedName()) + " has merge qualifier"; + LOG.warn(msg); + throw new MergeRegionException(msg); + } + + RegionStates regionStates = env.getAssignmentManager().getRegionStates(); + RegionState regionStateA = regionStates.getRegionState(regionsToMerge[0].getEncodedName()); + RegionState regionStateB = regionStates.getRegionState(regionsToMerge[1].getEncodedName()); + if (regionStateA == null || regionStateB == null) { + throw new UnknownRegionException( + regionStateA == null ? + regionsToMerge[0].getEncodedName() : regionsToMerge[1].getEncodedName()); + } + + if (!regionStateA.isOpened() || !regionStateB.isOpened()) { + throw new MergeRegionException( + "Unable to merge regions not online " + regionStateA + ", " + regionStateB); + } + + if (!env.getMasterServices().isSplitOrMergeEnabled(MasterSwitchType.MERGE)) { + String regionsStr = Arrays.deepToString(regionsToMerge); + LOG.warn("merge switch is off! skip merge of " + regionsStr); + super.setFailure(getClass().getSimpleName(), + new IOException("Merge of " + regionsStr + " failed because merge switch is off")); + return false; + } + + + // Ask the remote regionserver if regions are mergeable. If we get an IOE, report it + // along w/ the failure so can see why we are not mergeable at this time. + IOException mergeableCheckIOE = null; + boolean mergeable = false; + RegionState current = regionStateA; + try { + mergeable = isMergeable(env, current); + } catch (IOException e) { + mergeableCheckIOE = e; + } + if (mergeable && mergeableCheckIOE == null) { + current = regionStateB; + try { + mergeable = isMergeable(env, current); + } catch (IOException e) { + mergeableCheckIOE = e; + } + } + if (!mergeable) { + IOException e = new IOException(current.getRegion().getShortNameToLog() + " NOT mergeable"); + if (mergeableCheckIOE != null) e.initCause(mergeableCheckIOE); + super.setFailure(getClass().getSimpleName(), e); + return false; + } + + return true; + } + + private boolean isMergeable(final MasterProcedureEnv env, final RegionState rs) + throws IOException { + GetRegionInfoResponse response = + Util.getRegionInfoResponse(env, rs.getServerName(), rs.getRegion()); + return response.hasSplittable() && response.getSplittable(); + } + + /** + * Pre merge region action + * @param env MasterProcedureEnv + **/ + private void preMergeRegions(final MasterProcedureEnv env) throws IOException { + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + boolean ret = cpHost.preMergeRegionsAction(regionsToMerge, getUser()); + if (ret) { + throw new IOException( + "Coprocessor bypassing regions " + HRegionInfo.getShortNameToLog(regionsToMerge) + + " merge."); + } + } + // TODO: Clean up split and merge. Currently all over the place. + env.getMasterServices().getMasterQuotaManager().onRegionMerged(this.mergedRegion); + } + + /** + * Action after rollback a merge table regions action. + * @param env MasterProcedureEnv + * @throws IOException + */ + private void postRollBackMergeRegions(final MasterProcedureEnv env) throws IOException { + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.postRollBackMergeRegionsAction(regionsToMerge, getUser()); + } + } + + /** + * Set the region states to MERGING state + * @param env MasterProcedureEnv + * @throws IOException + */ + public void setRegionStateToMerging(final MasterProcedureEnv env) throws IOException { + //transition.setTransitionCode(TransitionCode.READY_TO_MERGE); + } + + /** + * Rollback the region state change + * @param env MasterProcedureEnv + * @throws IOException + */ + private void setRegionStateToRevertMerging(final MasterProcedureEnv env) throws IOException { + //transition.setTransitionCode(TransitionCode.MERGE_REVERTED); + } + + /** + * Create merged region + * @param env MasterProcedureEnv + * @throws IOException + */ + private void createMergedRegion(final MasterProcedureEnv env) throws IOException { + final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); + final Path tabledir = FSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable()); + final FileSystem fs = mfs.getFileSystem(); + HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem( + env.getMasterConfiguration(), fs, tabledir, regionsToMerge[0], false); + regionFs.createMergesDir(); + + mergeStoreFiles(env, regionFs, regionFs.getMergesDir()); + HRegionFileSystem regionFs2 = HRegionFileSystem.openRegionFromFileSystem( + env.getMasterConfiguration(), fs, tabledir, regionsToMerge[1], false); + mergeStoreFiles(env, regionFs2, regionFs.getMergesDir()); + + regionFs.commitMergedRegion(mergedRegion); + } + + /** + * Create reference file(s) of merging regions under the merges directory + * @param env MasterProcedureEnv + * @param regionFs region file system + * @param mergedDir the temp directory of merged region + * @throws IOException + */ + private void mergeStoreFiles( + final MasterProcedureEnv env, final HRegionFileSystem regionFs, final Path mergedDir) + throws IOException { + final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); + final Configuration conf = env.getMasterConfiguration(); + final HTableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName()); + + for (String family: regionFs.getFamilies()) { + final HColumnDescriptor hcd = htd.getFamily(family.getBytes()); + final Collection storeFiles = regionFs.getStoreFiles(family); + + if (storeFiles != null && storeFiles.size() > 0) { + final CacheConfig cacheConf = new CacheConfig(conf, hcd); + for (StoreFileInfo storeFileInfo: storeFiles) { + // Create reference file(s) of the region in mergedDir + regionFs.mergeStoreFile( + mergedRegion, + family, + new StoreFile( + mfs.getFileSystem(), storeFileInfo, conf, cacheConf, hcd.getBloomFilterType()), + mergedDir); + } + } + } + } + + /** + * Clean up merged region + * @param env MasterProcedureEnv + * @throws IOException + */ + private void cleanupMergedRegion(final MasterProcedureEnv env) throws IOException { + final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); + final Path tabledir = FSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable()); + final FileSystem fs = mfs.getFileSystem(); + HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem( + env.getMasterConfiguration(), fs, tabledir, regionsToMerge[0], false); + regionFs.cleanupMergedRegion(mergedRegion); + } + + /** + * Rollback close regions + * @param env MasterProcedureEnv + **/ + private void rollbackCloseRegionsForMerge(final MasterProcedureEnv env) throws IOException { + // Check whether the region is closed; if so, open it in the same server + final int regionReplication = getRegionReplication(env); + final ServerName serverName = getServerName(env); + + final AssignProcedure[] procs = + new AssignProcedure[regionsToMerge.length * regionReplication]; + int procsIdx = 0; + for (int i = 0; i < regionsToMerge.length; ++i) { + for (int j = 0; j < regionReplication; ++j) { + final HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(regionsToMerge[i], j); + procs[procsIdx++] = env.getAssignmentManager().createAssignProcedure(hri, serverName); + } + } + env.getMasterServices().getMasterProcedureExecutor().submitProcedures(procs); + } + + private UnassignProcedure[] createUnassignProcedures(final MasterProcedureEnv env, + final int regionReplication) { + final UnassignProcedure[] procs = + new UnassignProcedure[regionsToMerge.length * regionReplication]; + int procsIdx = 0; + for (int i = 0; i < regionsToMerge.length; ++i) { + for (int j = 0; j < regionReplication; ++j) { + final HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(regionsToMerge[i], j); + procs[procsIdx++] = env.getAssignmentManager().createUnassignProcedure(hri,null,true); + } + } + return procs; + } + + private AssignProcedure[] createAssignProcedures(final MasterProcedureEnv env, + final int regionReplication) { + final ServerName targetServer = getServerName(env); + final AssignProcedure[] procs = new AssignProcedure[regionReplication]; + for (int i = 0; i < procs.length; ++i) { + final HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(mergedRegion, i); + procs[i] = env.getAssignmentManager().createAssignProcedure(hri, targetServer); + } + return procs; + } + + private int getRegionReplication(final MasterProcedureEnv env) throws IOException { + final HTableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName()); + return htd.getRegionReplication(); + } + + /** + * Post merge region action + * @param env MasterProcedureEnv + **/ + private void preMergeRegionsCommit(final MasterProcedureEnv env) throws IOException { + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + @MetaMutationAnnotation + final List metaEntries = new ArrayList(); + boolean ret = cpHost.preMergeRegionsCommit(regionsToMerge, metaEntries, getUser()); + + if (ret) { + throw new IOException( + "Coprocessor bypassing regions " + HRegionInfo.getShortNameToLog(regionsToMerge) + + " merge."); + } + try { + for (Mutation p : metaEntries) { + HRegionInfo.parseRegionName(p.getRow()); + } + } catch (IOException e) { + LOG.error("Row key of mutation from coprocessor is not parsable as region name." + + "Mutations from coprocessor should only be for hbase:meta table.", e); + throw e; + } + } + } + + /** + * Add merged region to META and delete original regions. + */ + private void updateMetaForMergedRegions(final MasterProcedureEnv env) + throws IOException, ProcedureYieldException { + final ServerName serverName = getServerName(env); + env.getAssignmentManager().markRegionAsMerged(mergedRegion, serverName, + regionsToMerge[0], regionsToMerge[1]); + } + + /** + * Post merge region action + * @param env MasterProcedureEnv + **/ + private void postMergeRegionsCommit(final MasterProcedureEnv env) throws IOException { + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.postMergeRegionsCommit(regionsToMerge, mergedRegion, getUser()); + } + } + + /** + * Post merge region action + * @param env MasterProcedureEnv + **/ + private void postCompletedMergeRegions(final MasterProcedureEnv env) throws IOException { + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + cpHost.postCompletedMergeRegionsAction(regionsToMerge, mergedRegion, getUser()); + } + } + + /** + * The procedure could be restarted from a different machine. If the variable is null, we need to + * retrieve it. + * @param env MasterProcedureEnv + * @return serverName + */ + private ServerName getServerName(final MasterProcedureEnv env) { + if (regionLocation == null) { + regionLocation = env.getAssignmentManager().getRegionStates(). + getRegionServerOfRegion(regionsToMerge[0]); + // May still be null here but return null and let caller deal. + // Means we lost the in-memory-only location. We are in recovery + // or so. The caller should be able to deal w/ a null ServerName. + // Let them go to the Balancer to find one to use instead. + } + return regionLocation; + } + + /** + * The procedure could be restarted from a different machine. If the variable is null, we need to + * retrieve it. + * @return traceEnabled + */ + private Boolean isTraceEnabled() { + if (traceEnabled == null) { + traceEnabled = LOG.isTraceEnabled(); + } + return traceEnabled; + } + + /** + * @return The merged region. Maybe be null if called to early or we failed. + */ + @VisibleForTesting + public HRegionInfo getMergedRegion() { + return this.mergedRegion; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/dc1065a8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java new file mode 100644 index 0000000..d8c1b7d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MoveRegionProcedure.java @@ -0,0 +1,145 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.assignment; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineRegionProcedure; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MoveRegionState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MoveRegionStateData; + +/** + * Procedure that implements a RegionPlan. + * It first runs an unassign subprocedure followed + * by an assign subprocedure. It takes a lock on the region being moved. + * It holds the lock for the life of the procedure. + */ +@InterfaceAudience.Private +public class MoveRegionProcedure extends AbstractStateMachineRegionProcedure { + private static final Log LOG = LogFactory.getLog(MoveRegionProcedure.class); + private RegionPlan plan; + + public MoveRegionProcedure() { + // Required by the Procedure framework to create the procedure on replay + super(); + } + + public MoveRegionProcedure(final MasterProcedureEnv env, final RegionPlan plan) { + super(env, plan.getRegionInfo()); + assert plan.getDestination() != null: plan.toString(); + this.plan = plan; + } + + @Override + protected Flow executeFromState(final MasterProcedureEnv env, final MoveRegionState state) + throws InterruptedException { + if (LOG.isTraceEnabled()) { + LOG.trace(this + " execute state=" + state); + } + switch (state) { + case MOVE_REGION_UNASSIGN: + addChildProcedure(new UnassignProcedure(plan.getRegionInfo(), plan.getSource(), true)); + setNextState(MoveRegionState.MOVE_REGION_ASSIGN); + break; + case MOVE_REGION_ASSIGN: + addChildProcedure(new AssignProcedure(plan.getRegionInfo(), plan.getDestination())); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + return Flow.HAS_MORE_STATE; + } + + @Override + protected void rollbackState(final MasterProcedureEnv env, final MoveRegionState state) + throws IOException { + // no-op + } + + @Override + public boolean abort(final MasterProcedureEnv env) { + return false; + } + + @Override + public void toStringClassDetails(final StringBuilder sb) { + sb.append(getClass().getSimpleName()); + sb.append(" "); + sb.append(plan); + } + + @Override + protected MoveRegionState getInitialState() { + return MoveRegionState.MOVE_REGION_UNASSIGN; + } + + @Override + protected int getStateId(final MoveRegionState state) { + return state.getNumber(); + } + + @Override + protected MoveRegionState getState(final int stateId) { + return MoveRegionState.valueOf(stateId); + } + + @Override + public TableName getTableName() { + return plan.getRegionInfo().getTable(); + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.REGION_EDIT; + } + + @Override + protected void serializeStateData(final OutputStream stream) throws IOException { + super.serializeStateData(stream); + + final MoveRegionStateData.Builder state = MoveRegionStateData.newBuilder() + // No need to serialize the HRegionInfo. The super class has the region. + .setSourceServer(ProtobufUtil.toServerName(plan.getSource())) + .setDestinationServer(ProtobufUtil.toServerName(plan.getDestination())); + state.build().writeDelimitedTo(stream); + } + + @Override + protected void deserializeStateData(final InputStream stream) throws IOException { + super.deserializeStateData(stream); + + final MoveRegionStateData state = MoveRegionStateData.parseDelimitedFrom(stream); + final HRegionInfo regionInfo = getRegion(); // Get it from super class deserialization. + final ServerName sourceServer = ProtobufUtil.toServerName(state.getSourceServer()); + final ServerName destinationServer = ProtobufUtil.toServerName(state.getDestinationServer()); + this.plan = new RegionPlan(regionInfo, sourceServer, destinationServer); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/dc1065a8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java new file mode 100644 index 0000000..21e0d9c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java @@ -0,0 +1,327 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.assignment; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.master.RegionState.State; +import org.apache.hadoop.hbase.procedure2.util.StringUtils; +import org.apache.hadoop.hbase.util.MultiHConnection; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; +import org.apache.zookeeper.KeeperException; + +import com.google.common.base.Preconditions; + +/** + * Store Region State to hbase:meta table. + */ +@InterfaceAudience.Private +public class RegionStateStore { + private static final Log LOG = LogFactory.getLog(RegionStateStore.class); + + /** The delimiter for meta columns for replicaIds > 0 */ + protected static final char META_REPLICA_ID_DELIMITER = '_'; + + private final MasterServices master; + + private MultiHConnection multiHConnection; + + public RegionStateStore(final MasterServices master) { + this.master = master; + } + + public void start() throws IOException { + } + + public void stop() { + if (multiHConnection != null) { + multiHConnection.close(); + multiHConnection = null; + } + } + + public interface RegionStateVisitor { + void visitRegionState(HRegionInfo regionInfo, State state, + ServerName regionLocation, ServerName lastHost, long openSeqNum); + } + + public void visitMeta(final RegionStateVisitor visitor) throws IOException { + MetaTableAccessor.fullScanRegions(master.getConnection(), new MetaTableAccessor.Visitor() { + final boolean isDebugEnabled = LOG.isDebugEnabled(); + + @Override + public boolean visit(final Result r) throws IOException { + if (r != null && !r.isEmpty()) { + long st = System.currentTimeMillis(); + visitMetaEntry(visitor, r); + long et = System.currentTimeMillis(); + LOG.info("[T] LOAD META PERF " + StringUtils.humanTimeDiff(et - st)); + } else if (isDebugEnabled) { + LOG.debug("NULL result from meta - ignoring but this is strange."); + } + return true; + } + }); + } + + private void visitMetaEntry(final RegionStateVisitor visitor, final Result result) + throws IOException { + final RegionLocations rl = MetaTableAccessor.getRegionLocations(result); + if (rl == null) return; + + final HRegionLocation[] locations = rl.getRegionLocations(); + if (locations == null) return; + + for (int i = 0; i < locations.length; ++i) { + final HRegionLocation hrl = locations[i]; + if (hrl == null) continue; + + final HRegionInfo regionInfo = hrl.getRegionInfo(); + if (regionInfo == null) continue; + + final int replicaId = regionInfo.getReplicaId(); + final State state = getRegionState(result, replicaId); + + final ServerName lastHost = hrl.getServerName(); + final ServerName regionLocation = getRegionServer(result, replicaId); + final long openSeqNum = -1; + + // TODO: move under trace, now is visible for debugging + LOG.info(String.format("Load hbase:meta entry region=%s regionState=%s lastHost=%s regionLocation=%s", + regionInfo, state, lastHost, regionLocation)); + + visitor.visitRegionState(regionInfo, state, regionLocation, lastHost, openSeqNum); + } + } + + public void updateRegionLocation(final HRegionInfo regionInfo, final State state, + final ServerName regionLocation, final ServerName lastHost, final long openSeqNum, + final long pid) + throws IOException { + if (regionInfo.isMetaRegion()) { + updateMetaLocation(regionInfo, regionLocation); + } else { + updateUserRegionLocation(regionInfo, state, regionLocation, lastHost, openSeqNum, pid); + } + } + + public void updateRegionState(final long openSeqNum, final long pid, + final RegionState newState, final RegionState oldState) throws IOException { + updateRegionLocation(newState.getRegion(), newState.getState(), newState.getServerName(), + oldState != null ? oldState.getServerName() : null, openSeqNum, pid); + } + + protected void updateMetaLocation(final HRegionInfo regionInfo, final ServerName serverName) + throws IOException { + try { + MetaTableLocator.setMetaLocation(master.getZooKeeper(), serverName, + regionInfo.getReplicaId(), State.OPEN); + } catch (KeeperException e) { + throw new IOException(e); + } + } + + protected void updateUserRegionLocation(final HRegionInfo regionInfo, final State state, + final ServerName regionLocation, final ServerName lastHost, final long openSeqNum, + final long pid) + throws IOException { + final int replicaId = regionInfo.getReplicaId(); + final Put put = new Put(MetaTableAccessor.getMetaKeyForRegion(regionInfo)); + MetaTableAccessor.addRegionInfo(put, regionInfo); + final StringBuilder info = new StringBuilder("pid=" + pid + " updating hbase:meta row="); + info.append(regionInfo.getRegionNameAsString()).append(", regionState=").append(state); + if (openSeqNum >= 0) { + Preconditions.checkArgument(state == State.OPEN && regionLocation != null, + "Open region should be on a server"); + MetaTableAccessor.addLocation(put, regionLocation, openSeqNum, -1, replicaId); + info.append(", openSeqNum=").append(openSeqNum); + info.append(", regionLocation=").append(regionLocation); + } else if (regionLocation != null && !regionLocation.equals(lastHost)) { + // Ideally, if no regionLocation, write null to the hbase:meta but this will confuse clients + // currently; they want a server to hit. TODO: Make clients wait if no location. + put.addImmutable(HConstants.CATALOG_FAMILY, getServerNameColumn(replicaId), + Bytes.toBytes(regionLocation.getServerName())); + info.append(", regionLocation=").append(regionLocation); + } + put.addImmutable(HConstants.CATALOG_FAMILY, getStateColumn(replicaId), + Bytes.toBytes(state.name())); + LOG.info(info); + + final boolean serialReplication = hasSerialReplicationScope(regionInfo.getTable()); + if (serialReplication && state == State.OPEN) { + Put barrierPut = MetaTableAccessor.makeBarrierPut(regionInfo.getEncodedNameAsBytes(), + openSeqNum, regionInfo.getTable().getName()); + updateRegionLocation(regionInfo, state, put, barrierPut); + } else { + updateRegionLocation(regionInfo, state, put); + } + } + + protected void updateRegionLocation(final HRegionInfo regionInfo, final State state, + final Put... put) throws IOException { + synchronized (this) { + if (multiHConnection == null) { + multiHConnection = new MultiHConnection(master.getConfiguration(), 1); + } + } + + try { + multiHConnection.processBatchCallback(Arrays.asList(put), TableName.META_TABLE_NAME, null, null); + } catch (IOException e) { + // TODO: Revist!!!! Means that if a server is loaded, then we will abort our host! + // In tests we abort the Master! + String msg = String.format("FAILED persisting region=%s state=%s", + regionInfo.getShortNameToLog(), state); + LOG.error(msg, e); + master.abort(msg, e); + throw e; + } + } + + // ============================================================================================ + // Update Region Splitting State helpers + // ============================================================================================ + public void splitRegion(final HRegionInfo parent, final HRegionInfo hriA, + final HRegionInfo hriB, final ServerName serverName) throws IOException { + final HTableDescriptor htd = getTableDescriptor(parent.getTable()); + MetaTableAccessor.splitRegion(master.getConnection(), parent, hriA, hriB, serverName, + getRegionReplication(htd), hasSerialReplicationScope(htd)); + } + + // ============================================================================================ + // Update Region Merging State helpers + // ============================================================================================ + public void mergeRegions(final HRegionInfo parent, final HRegionInfo hriA, + final HRegionInfo hriB, final ServerName serverName) throws IOException { + final HTableDescriptor htd = getTableDescriptor(parent.getTable()); + MetaTableAccessor.mergeRegions(master.getConnection(), parent, hriA, hriB, serverName, + getRegionReplication(htd), EnvironmentEdgeManager.currentTime(), + hasSerialReplicationScope(htd)); + } + + // ============================================================================================ + // Delete Region State helpers + // ============================================================================================ + public void deleteRegion(final HRegionInfo regionInfo) throws IOException { + deleteRegions(Collections.singletonList(regionInfo)); + } + + public void deleteRegions(final List regions) throws IOException { + MetaTableAccessor.deleteRegions(master.getConnection(), regions); + } + + // ========================================================================== + // Table Descriptors helpers + // ========================================================================== + private boolean hasSerialReplicationScope(final TableName tableName) throws IOException { + return hasSerialReplicationScope(getTableDescriptor(tableName)); + } + + private boolean hasSerialReplicationScope(final HTableDescriptor htd) { + return (htd != null)? htd.hasSerialReplicationScope(): false; + } + + private int getRegionReplication(final HTableDescriptor htd) { + return (htd != null) ? htd.getRegionReplication() : 1; + } + + private HTableDescriptor getTableDescriptor(final TableName tableName) throws IOException { + return master.getTableDescriptors().get(tableName); + } + + // ========================================================================== + // Server Name + // ========================================================================== + + /** + * Returns the {@link ServerName} from catalog table {@link Result} + * where the region is transitioning. It should be the same as + * {@link MetaTableAccessor#getServerName(Result,int)} if the server is at OPEN state. + * @param r Result to pull the transitioning server name from + * @return A ServerName instance or {@link MetaTableAccessor#getServerName(Result,int)} + * if necessary fields not found or empty. + */ + static ServerName getRegionServer(final Result r, int replicaId) { + final Cell cell = r.getColumnLatestCell(HConstants.CATALOG_FAMILY, + getServerNameColumn(replicaId)); + if (cell == null || cell.getValueLength() == 0) { + RegionLocations locations = MetaTableAccessor.getRegionLocations(r); + if (locations != null) { + HRegionLocation location = locations.getRegionLocation(replicaId); + if (location != null) { + return location.getServerName(); + } + } + return null; + } + return ServerName.parseServerName(Bytes.toString(cell.getValueArray(), + cell.getValueOffset(), cell.getValueLength())); + } + + private static byte[] getServerNameColumn(int replicaId) { + return replicaId == 0 + ? HConstants.SERVERNAME_QUALIFIER + : Bytes.toBytes(HConstants.SERVERNAME_QUALIFIER_STR + META_REPLICA_ID_DELIMITER + + String.format(HRegionInfo.REPLICA_ID_FORMAT, replicaId)); + } + + // ========================================================================== + // Region State + // ========================================================================== + + /** + * Pull the region state from a catalog table {@link Result}. + * @param r Result to pull the region state from + * @return the region state, or OPEN if there's no value written. + */ + protected State getRegionState(final Result r, int replicaId) { + Cell cell = r.getColumnLatestCell(HConstants.CATALOG_FAMILY, getStateColumn(replicaId)); + if (cell == null || cell.getValueLength() == 0) return State.OPENING; + return State.valueOf(Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); + } + + private static byte[] getStateColumn(int replicaId) { + return replicaId == 0 + ? HConstants.STATE_QUALIFIER + : Bytes.toBytes(HConstants.STATE_QUALIFIER_STR + META_REPLICA_ID_DELIMITER + + String.format(HRegionInfo.REPLICA_ID_FORMAT, replicaId)); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/dc1065a8/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java new file mode 100644 index 0000000..082e171 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStates.java @@ -0,0 +1,969 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.assignment; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.master.RegionState.State; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import com.google.common.annotations.VisibleForTesting; + +/** + * RegionStates contains a set of Maps that describes the in-memory state of the AM, with + * the regions available in the system, the region in transition, the offline regions and + * the servers holding regions. + */ +@InterfaceAudience.Private +public class RegionStates { + private static final Log LOG = LogFactory.getLog(RegionStates.class); + + protected static final State[] STATES_EXPECTED_ON_OPEN = new State[] { + State.OFFLINE, State.CLOSED, // disable/offline + State.SPLITTING, State.SPLIT, // ServerCrashProcedure + State.OPENING, State.FAILED_OPEN, // already in-progress (retrying) + }; + + protected static final State[] STATES_EXPECTED_ON_CLOSE = new State[] { + State.SPLITTING, State.SPLIT, // ServerCrashProcedure + State.OPEN, // enabled/open + State.CLOSING // already in-progress (retrying) + }; + + private static class AssignmentProcedureEvent extends ProcedureEvent { + public AssignmentProcedureEvent(final HRegionInfo regionInfo) { + super(regionInfo); + } + } + + private static class ServerReportEvent extends ProcedureEvent { + public ServerReportEvent(final ServerName serverName) { + super(serverName); + } + } + + /** + * Current Region State. + * In-memory only. Not persisted. + */ + // Mutable/Immutable? Changes have to be synchronized or not? + // Data members are volatile which seems to say multi-threaded access is fine. + // In the below we do check and set but the check state could change before + // we do the set because no synchronization....which seems dodgy. Clear up + // understanding here... how many threads accessing? Do locks make it so one + // thread at a time working on a single Region's RegionStateNode? Lets presume + // so for now. Odd is that elsewhere in this RegionStates, we synchronize on + // the RegionStateNode instance. TODO. + public static class RegionStateNode implements Comparable { + private final HRegionInfo regionInfo; + private final ProcedureEvent event; + + private volatile RegionTransitionProcedure procedure = null; + private volatile ServerName regionLocation = null; + private volatile ServerName lastHost = null; + /** + * A Region-in-Transition (RIT) moves through states. + * See {@link State} for complete list. A Region that + * is opened moves from OFFLINE => OPENING => OPENED. + */ + private volatile State state = State.OFFLINE; + + /** + * Updated whenever a call to {@link #setRegionLocation(ServerName)} + * or {@link #setState(State, State...)}. + */ + private volatile long lastUpdate = 0; + + private volatile long openSeqNum = HConstants.NO_SEQNUM; + + public RegionStateNode(final HRegionInfo regionInfo) { + this.regionInfo = regionInfo; + this.event = new AssignmentProcedureEvent(regionInfo); + } + + public boolean setState(final State update, final State... expected) { + final boolean expectedState = isInState(expected); + if (expectedState) { + this.state = update; + this.lastUpdate = EnvironmentEdgeManager.currentTime(); + } + return expectedState; + } + + /** + * Put region into OFFLINE mode (set state and clear location). + * @return Last recorded server deploy + */ + public ServerName offline() { + setState(State.OFFLINE); + return setRegionLocation(null); + } + + /** + * Set new {@link State} but only if currently in expected State + * (if not, throw {@link UnexpectedStateException}. + */ + public State transitionState(final State update, final State... expected) + throws UnexpectedStateException { + if (!setState(update, expected)) { + throw new UnexpectedStateException("Expected " + Arrays.toString(expected) + + " so could move to " + update + " but current state=" + getState()); + } + return update; + } + + public boolean isInState(final State... expected) { + if (expected != null && expected.length > 0) { + boolean expectedState = false; + for (int i = 0; i < expected.length; ++i) { + expectedState |= (getState() == expected[i]); + } + return expectedState; + } + return true; + } + + public boolean isStuck() { + return isInState(State.FAILED_OPEN) && getProcedure() != null; + } + + public boolean isInTransition() { + return getProcedure() != null; + } + + public long getLastUpdate() { + return procedure != null ? procedure.getLastUpdate() : lastUpdate; + } + + public void setLastHost(final ServerName serverName) { + this.lastHost = serverName; + } + + public void setOpenSeqNum(final long seqId) { + this.openSeqNum = seqId; + } + + + public ServerName setRegionLocation(final ServerName serverName) { + ServerName lastRegionLocation = this.regionLocation; + if (LOG.isTraceEnabled() && serverName == null) { + LOG.trace("Tracking when we are set to null " + this, new Throwable("TRACE")); + } + this.regionLocation = serverName; + this.lastUpdate = EnvironmentEdgeManager.currentTime(); + return lastRegionLocation; + } + + public boolean setProcedure(final RegionTransitionProcedure proc) { + if (this.procedure != null && this.procedure != proc) { + return false; + } + this.procedure = proc; + return true; + } + + public boolean unsetProcedure(final RegionTransitionProcedure proc) { + if (this.procedure != null && this.procedure != proc) { + return false; + } + this.procedure = null; + return true; + } + + public RegionTransitionProcedure getProcedure() { + return procedure; + } + + public ProcedureEvent getProcedureEvent() { + return event; + } + + public HRegionInfo getRegionInfo() { + return regionInfo; + } + + public TableName getTable() { + return getRegionInfo().getTable(); + } + + public boolean isSystemTable() { + return getTable().isSystemTable(); + } + + public ServerName getLastHost() { + return lastHost; + } + + public ServerName getRegionLocation() { + return regionLocation; + } + + public State getState() { + return state; + } + + public long getOpenSeqNum() { + return openSeqNum; + } + + public int getFormatVersion() { + // we don't have any format for now + // it should probably be in regionInfo.getFormatVersion() + return 0; + } + + @Override + public int compareTo(final RegionStateNode other) { + // NOTE: HRegionInfo sort by table first, so we are relying on that. + // we have a TestRegionState#testOrderedByTable() that check for that. + return getRegionInfo().compareTo(other.getRegionInfo()); + } + + @Override + public int hashCode() { + return getRegionInfo().hashCode(); + } + + @Override + public boolean equals(final Object other) { + if (this == other) return true; + if (!(other instanceof RegionStateNode)) return false; + return compareTo((RegionStateNode)other) == 0; + } + + @Override + public String toString() { + return toDescriptiveString(); + } + + public String toShortString() { + // rit= is the current Region-In-Transition State -- see State enum. + return String.format("rit=%s, location=%s", getState(), getRegionLocation()); + } + + public String toDescriptiveString() { + return String.format("%s, table=%s, region=%s", + toShortString(), getTable(), getRegionInfo().getEncodedName()); + } + } + + // This comparator sorts the RegionStates by time stamp then Region name. + // Comparing by timestamp alone can lead us to discard different RegionStates that happen + // to share a timestamp. + private static class RegionStateStampComparator implements Comparator { + @Override + public int compare(final RegionState l, final RegionState r) { + int stampCmp = Long.compare(l.getStamp(), r.getStamp()); + return stampCmp != 0 ? stampCmp : l.getRegion().compareTo(r.getRegion()); + } + } + + public enum ServerState { ONLINE, SPLITTING, OFFLINE } + public static class ServerStateNode implements Comparable { + private final ServerReportEvent reportEvent; + + private final Set regions; + private final ServerName serverName; + + private volatile ServerState state = ServerState.ONLINE; + private volatile int versionNumber = 0; + + public ServerStateNode(final ServerName serverName) { + this.serverName = serverName; + this.regions = new HashSet(); + this.reportEvent = new ServerReportEvent(serverName); + } + + public ServerName getServerName() { + return serverName; + } + + public ServerState getState() { + return state; + } + + public int getVersionNumber() { + return versionNumber; + } + + public ProcedureEvent getReportEvent() { + return reportEvent; + } + + public boolean isInState(final ServerState... expected) { + boolean expectedState = false; + if (expected != null) { + for (int i = 0; i < expected.length; ++i) { + expectedState |= (state == expected[i]); + } + } + return expectedState; + } + + public void setState(final ServerState state) { + this.state = state; + } + + public void setVersionNumber(final int versionNumber) { + this.versionNumber = versionNumber; + } + + public Set getRegions() { + return regions; + } + + public int getRegionCount() { + return regions.size(); + } + + public ArrayList getRegionInfoList() { + ArrayList hris = new ArrayList(regions.size()); + for (RegionStateNode region: regions) { + hris.add(region.getRegionInfo()); + } + return hris; + } + + public void addRegion(final RegionStateNode regionNode) { + this.regions.add(regionNode); + } + + public void removeRegion(final RegionStateNode regionNode) { + this.regions.remove(regionNode); + } + + @Override + public int compareTo(final ServerStateNode other) { + return getServerName().compareTo(other.getServerName()); + } + + @Override + public int hashCode() { + return getServerName().hashCode(); + } + + @Override + public boolean equals(final Object other) { + if (this == other) return true; + if (!(other instanceof ServerStateNode)) return false; + return compareTo((ServerStateNode)other) == 0; + } + + @Override + public String toString() { + return String.format("ServerStateNode(%s)", getServerName()); + } + } + + public final static RegionStateStampComparator REGION_STATE_STAMP_COMPARATOR = + new RegionStateStampComparator(); + + // TODO: Replace the ConcurrentSkipListMaps + /** + * RegionName -- i.e. HRegionInfo.getRegionName() -- as bytes to {@link RegionStateNode} + */ + private final ConcurrentSkipListMap regionsMap = + new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); + + private final ConcurrentSkipListMap regionInTransition = + new ConcurrentSkipListMap(); + + /** + * Regions marked as offline on a read of hbase:meta. Unused or at least, once + * offlined, regions have no means of coming on line again. TODO. + */ + private final ConcurrentSkipListMap regionOffline = + new ConcurrentSkipListMap(); + + private final ConcurrentSkipListMap regionFailedOpen = + new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); + + private final ConcurrentHashMap serverMap = + new ConcurrentHashMap(); + + public RegionStates() { } + + public void clear() { + regionsMap.clear(); + regionInTransition.clear(); + regionOffline.clear(); + serverMap.clear(); + } + + // ========================================================================== + // RegionStateNode helpers + // ========================================================================== + protected RegionStateNode createRegionNode(final HRegionInfo regionInfo) { + RegionStateNode newNode = new RegionStateNode(regionInfo); + RegionStateNode oldNode = regionsMap.putIfAbsent(regionInfo.getRegionName(), newNode); + return oldNode != null ? oldNode : newNode; + } + + protected RegionStateNode getOrCreateRegionNode(final HRegionInfo regionInfo) { + RegionStateNode node = regionsMap.get(regionInfo.getRegionName()); + return node != null ? node : createRegionNode(regionInfo); + } + + RegionStateNode getRegionNodeFromName(final byte[] regionName) { + return regionsMap.get(regionName); + } + + protected RegionStateNode getRegionNode(final HRegionInfo regionInfo) { + return getRegionNodeFromName(regionInfo.getRegionName()); + } + + RegionStateNode getRegionNodeFromEncodedName(final String encodedRegionName) { + // TODO: Need a map but it is just dispatch merge... + for (RegionStateNode node: regionsMap.values()) { + if (node.getRegionInfo().getEncodedName().equals(encodedRegionName)) { + return node; + } + } + return null; + } + + public void deleteRegion(final HRegionInfo regionInfo) { + regionsMap.remove(regionInfo.getRegionName()); + // Remove from the offline regions map too if there. + if (this.regionOffline.containsKey(regionInfo)) { + if (LOG.isTraceEnabled()) LOG.trace("Removing from regionOffline Map: " + regionInfo); + this.regionOffline.remove(regionInfo); + } + } + + ArrayList getTableRegionStateNodes(final TableName tableName) { + final ArrayList regions = new ArrayList(); + for (RegionStateNode node: regionsMap.tailMap(tableName.getName()).values()) { + if (!node.getTable().equals(tableName)) break; + regions.add(node); + } + return regions; + } + + ArrayList getTableRegionStates(final TableName tableName) { + final ArrayList regions = new ArrayList(); + for (RegionStateNode node: regionsMap.tailMap(tableName.getName()).values()) { + if (!node.getTable().equals(tableName)) break; + regions.add(createRegionState(node)); + } + return regions; + } + + ArrayList getTableRegionsInfo(final TableName tableName) { + final ArrayList regions = new ArrayList(); + for (RegionStateNode node: regionsMap.tailMap(tableName.getName()).values()) { + if (!node.getTable().equals(tableName)) break; + regions.add(node.getRegionInfo()); + } + return regions; + } + + Collection getRegionNodes() { + return regionsMap.values(); + } + + public ArrayList getRegionStates() { + final ArrayList regions = new ArrayList(regionsMap.size()); + for (RegionStateNode node: regionsMap.values()) { + regions.add(createRegionState(node)); + } + return regions; + } + + // ========================================================================== + // RegionState helpers + // ========================================================================== + public RegionState getRegionState(final HRegionInfo regionInfo) { + return createRegionState(getRegionNode(regionInfo)); + } + + public RegionState getRegionState(final String encodedRegionName) { + return createRegionState(getRegionNodeFromEncodedName(encodedRegionName)); + } + + private RegionState createRegionState(final RegionStateNode node) { + return node == null ? null : + new RegionState(node.getRegionInfo(), node.getState(), + node.getLastUpdate(), node.getRegionLocation()); + } + + // ============================================================================================ + // TODO: helpers + // ============================================================================================ + public boolean hasTableRegionStates(final TableName tableName) { + // TODO + return !getTableRegionStates(tableName).isEmpty(); + } + + public List getRegionsOfTable(final TableName table) { + return getRegionsOfTable(table, false); + } + + List getRegionsOfTable(final TableName table, final boolean offline) { + final ArrayList nodes = getTableRegionStateNodes(table); + final ArrayList hris = new ArrayList(nodes.size()); + for (RegionStateNode node: nodes) { + if (include(node, offline)) hris.add(node.getRegionInfo()); + } + return hris; + } + + /** + * Utility. Whether to include region in list of regions. Default is to + * weed out split and offline regions. + * @return True if we should include the node (do not include + * if split or offline unless offline is set to true. + */ + boolean include(final RegionStateNode node, final boolean offline) { + if (LOG.isTraceEnabled()) { + LOG.trace("WORKING ON " + node + " " + node.getRegionInfo()); + } + if (node.isInState(State.SPLIT)) return false; + if (node.isInState(State.OFFLINE) && !offline) return false; + final HRegionInfo hri = node.getRegionInfo(); + return (!hri.isOffline() && !hri.isSplit()) || + ((hri.isOffline() || hri.isSplit()) && offline); + } + + /** + * Returns the set of regions hosted by the specified server + * @param serverName the server we are interested in + * @return set of HRegionInfo hosted by the specified server + */ + public List getServerRegionInfoSet(final ServerName serverName) { + final ServerStateNode serverInfo = getServerNode(serverName); + if (serverInfo == null) return Collections.emptyList(); + + synchronized (serverInfo) { + return serverInfo.getRegionInfoList(); + } + } + + // ============================================================================================ + // TODO: split helpers + // ============================================================================================ + public void logSplit(final ServerName serverName) { + final ServerStateNode serverNode = getOrCreateServer(serverName); + synchronized (serverNode) { + serverNode.setState(ServerState.SPLITTING); + /* THIS HAS TO BE WRONG. THIS IS SPLITTING OF REGION, NOT SPLITTING WALs. + for (RegionStateNode regionNode: serverNode.getRegions()) { + synchronized (regionNode) { + // TODO: Abort procedure if present + regionNode.setState(State.SPLITTING); + } + }*/ + } + } + + public void logSplit(final HRegionInfo regionInfo) { + final RegionStateNode regionNode = getRegionNode(regionInfo); + synchronized (regionNode) { + regionNode.setState(State.SPLIT); + } + } + + @VisibleForTesting + public void updateRegionState(final HRegionInfo regionInfo, final State state) { + final RegionStateNode regionNode = getOrCreateRegionNode(regionInfo); + synchronized (regionNode) { + regionNode.setState(state); + } + } + + // ============================================================================================ + // TODO: + // ============================================================================================ + public List getAssignedRegions() { + final List result = new ArrayList(); + for (RegionStateNode node: regionsMap.values()) { + if (!node.isInTransition()) { + result.add(node.getRegionInfo()); + } + } + return result; + } + + public boolean isRegionInState(final HRegionInfo regionInfo, final State... state) { + final RegionStateNode region = getRegionNode(regionInfo); + if (region != null) { + synchronized (region) { + return region.isInState(state); + } + } + return false; + } + + public boolean isRegionOnline(final HRegionInfo regionInfo) { + return isRegionInState(regionInfo, State.OPEN); + } + + /** + * @return True if region is offline (In OFFLINE or CLOSED state). + */ + public boolean isRegionOffline(final HRegionInfo regionInfo) { + return isRegionInState(regionInfo, State.OFFLINE, State.CLOSED); + } + + public Map> getSnapShotOfAssignment( + final Collection regions) { + final Map> result = new HashMap>(); + for (HRegionInfo hri: regions) { + final RegionStateNode node = getRegionNode(hri); + if (node == null) continue; + + // TODO: State.OPEN + final ServerName serverName = node.getRegionLocation(); + if (serverName == null) continue; + + List serverRegions = result.get(serverName); + if (serverRegions == null) { + serverRegions = new ArrayList(); + result.put(serverName, serverRegions); + } + + serverRegions.add(node.getRegionInfo()); + } + return result; + } + + public Map getRegionAssignments() { + final HashMap assignments = new HashMap(); + for (RegionStateNode node: regionsMap.values()) { + assignments.put(node.getRegionInfo(), node.getRegionLocation()); + } + return assignments; + } + + public Map> getRegionByStateOfTable(TableName tableName) { + final State[] states = State.values(); + final Map> tableRegions = + new HashMap>(states.length); + for (int i = 0; i < states.length; ++i) { + tableRegions.put(states[i], new ArrayList()); + } + + for (RegionStateNode node: regionsMap.values()) { + tableRegions.get(node.getState()).add(node.getRegionInfo()); + } + return tableRegions; + } + + public ServerName getRegionServerOfRegion(final HRegionInfo regionInfo) { + final RegionStateNode region = getRegionNode(regionInfo); + if (region != null) { + synchronized (region) { + ServerName server = region.getRegionLocation(); + return server != null ? server : region.getLastHost(); + } + } + return null; + } + + /** + * This is an EXPENSIVE clone. Cloning though is the safest thing to do. + * Can't let out original since it can change and at least the load balancer + * wants to iterate this exported list. We need to synchronize on regions + * since all access to this.servers is under a lock on this.regions. + * @param forceByCluster a flag to force to aggregate the server-load to the cluster level + * @return A clone of current assignments by table. + */ + public Map>> getAssignmentsByTable( + final boolean forceByCluster) { + if (!forceByCluster) return getAssignmentsByTable(); + + final HashMap> ensemble = + new HashMap>(serverMap.size()); + for (ServerStateNode serverNode: serverMap.values()) { + ensemble.put(serverNode.getServerName(), serverNode.getRegionInfoList()); + } + + // TODO: can we use Collections.singletonMap(HConstants.ENSEMBLE_TABLE_NAME, ensemble)? + final Map>> result = + new HashMap>>(1); + result.put(HConstants.ENSEMBLE_TABLE_NAME, ensemble); + return result; + } + + public Map>> getAssignmentsByTable() { + final Map>> result = new HashMap<>(); + for (RegionStateNode node: regionsMap.values()) { + Map> tableResult = result.get(node.getTable()); + if (tableResult == null) { + tableResult = new HashMap>(); + result.put(node.getTable(), tableResult); + } + + final ServerName serverName = node.getRegionLocation(); + if (serverName == null) { + LOG.info("Skipping, no server for " + node); + continue; + } + List serverResult = tableResult.get(serverName); + if (serverResult == null) { + serverResult = new ArrayList(); + tableResult.put(serverName, serverResult); + } + + serverResult.add(node.getRegionInfo()); + } + return result; + } + + // ========================================================================== + // Region in transition helpers + // ========================================================================== + protected boolean addRegionInTransition(final RegionStateNode regionNode, + final RegionTransitionProcedure procedure) { + if (procedure != null && !regionNode.setProcedure(procedure)) return false; + + regionInTransition.put(regionNode.getRegionInfo(), regionNode); + return true; + } + + protected void removeRegionInTransition(final RegionStateNode regionNode, + final RegionTransitionProcedure procedure) { + regionInTransition.remove(regionNode.getRegionInfo()); + regionNode.unsetProcedure(procedure); + } + + public boolean hasRegionsInTransition() { + return !regionInTransition.isEmpty(); + } + + public boolean isRegionInTransition(final HRegionInfo regionInfo) { + final RegionStateNode node = regionInTransition.get(regionInfo); + return node != null ? node.isInTransition() : false; + } + + /** + * @return If a procedure-in-transition for hri, return it else null. + */ + public RegionTransitionProcedure getRegionTransitionProcedure(final HRegionInfo hri) { + RegionStateNode node = regionInTransition.get(hri); + if (node == null) return null; + return node.getProcedure(); + } + + public RegionState getRegionTransitionState(final HRegionInfo hri) { + RegionStateNode node = regionInTransition.get(hri); + if (node == null) return null; + + synchronized (node) { + return node.isInTransition() ? createRegionState(node) : null; + } + } + + public List getRegionsInTransition() { + return new ArrayList(regionInTransition.values()); + } + + /** + * Get the number of regions in transition. + */ + public int getRegionsInTransitionCount() { + return regionInTransition.size(); + } + + public List getRegionsStateInTransition() { + final List rit = new ArrayList(regionInTransition.size()); + for (RegionStateNode node: regionInTransition.values()) { + rit.add(createRegionState(node)); + } + return rit; + } + + public SortedSet getRegionsInTransitionOrderedByTimestamp() { + final SortedSet rit = new TreeSet(REGION_STATE_STAMP_COMPARATOR); + for (RegionStateNode node: regionInTransition.values()) { + rit.add(createRegionState(node)); + } + return rit; + } + + // ========================================================================== + // Region offline helpers + // ========================================================================== + // TODO: Populated when we read meta but regions never make it out of here. + public void addToOfflineRegions(final RegionStateNode regionNode) { + LOG.info("Added to offline, CURRENTLY NEVER CLEARED!!! " + regionNode); + regionOffline.put(regionNode.getRegionInfo(), regionNode); + } + + // TODO: Unused. + public void removeFromOfflineRegions(final HRegionInfo regionInfo) { + regionOffline.remove(regionInfo); + } + + // ========================================================================== + // Region FAIL_OPEN helpers + // ========================================================================== + public static final class RegionFailedOpen { + private final RegionStateNode regionNode; + + private volatile Exception exception = null; + private volatile int retries = 0; + + public RegionFailedOpen(final RegionStateNode regionNode) { + this.regionNode = regionNode; + } + + public RegionStateNode getRegionNode() { + return regionNode; + } + + public HRegionInfo getRegionInfo() { + return regionNode.getRegionInfo(); + } + + public int incrementAndGetRetries() { + return ++this.retries; + } + + public int getRetries() { + return retries; + } + + public void setException(final Exception exception) { + this.exception = exception; + } + + public Exception getException() { + return this.exception; + } + } + + public RegionFailedOpen addToFailedOpen(final RegionStateNode regionNode) { + final byte[] key = regionNode.getRegionInfo().getRegionName(); + RegionFailedOpen node = regionFailedOpen.get(key); + if (node == null) { + RegionFailedOpen newNode = new RegionFailedOpen(regionNode); + RegionFailedOpen oldNode = regionFailedOpen.putIfAbsent(key, newNode); + node = oldNode != null ? oldNode : newNode; + } + return node; + } + + public RegionFailedOpen getFailedOpen(final HRegionInfo regionInfo) { + return regionFailedOpen.get(regionInfo.getRegionName()); + } + + public void removeFromFailedOpen(final HRegionInfo regionInfo) { + regionFailedOpen.remove(regionInfo.getRegionName()); + } + + public List getRegionFailedOpen() { + if (regionFailedOpen.isEmpty()) return Collections.emptyList(); + + ArrayList regions = new ArrayList(regionFailedOpen.size()); + for (RegionFailedOpen r: regionFailedOpen.values()) { + regions.add(createRegionState(r.getRegionNode())); + } + return regions; + } + + // ========================================================================== + // Servers + // ========================================================================== + public ServerStateNode getOrCreateServer(final ServerName serverName) { + ServerStateNode node = serverMap.get(serverName); + if (node == null) { + node = new ServerStateNode(serverName); + ServerStateNode oldNode = serverMap.putIfAbsent(serverName, node); + node = oldNode != null ? oldNode : node; + } + return node; + } + + public void removeServer(final ServerName serverName) { + serverMap.remove(serverName); + } + + protected ServerStateNode getServerNode(final ServerName serverName) { + return serverMap.get(serverName); + } + + public double getAverageLoad() { + int numServers = 0; + int totalLoad = 0; + for (ServerStateNode node: serverMap.values()) { + totalLoad += node.getRegionCount(); + numServers++; + } + return numServers == 0 ? 0.0: (double)totalLoad / (double)numServers; + } + + public ServerStateNode addRegionToServer(final ServerName serverName, + final RegionStateNode regionNode) { + ServerStateNode serverNode = getOrCreateServer(serverName); + serverNode.addRegion(regionNode); + return serverNode; + } + + public ServerStateNode removeRegionFromServer(final ServerName serverName, + final RegionStateNode regionNode) { + ServerStateNode serverNode = getOrCreateServer(serverName); + serverNode.removeRegion(regionNode); + return serverNode; + } + + // ========================================================================== + // ToString helpers + // ========================================================================== + public static String regionNamesToString(final Collection regions) { + final StringBuilder sb = new StringBuilder(); + final Iterator it = regions.iterator(); + sb.append("["); + if (it.hasNext()) { + sb.append(Bytes.toStringBinary(it.next())); + while (it.hasNext()) { + sb.append(", "); + sb.append(Bytes.toStringBinary(it.next())); + } + } + sb.append("]"); + return sb.toString(); + } +}