Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5701B1738E for ; Fri, 10 Apr 2015 07:55:24 +0000 (UTC) Received: (qmail 71112 invoked by uid 500); 10 Apr 2015 07:55:23 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 70935 invoked by uid 500); 10 Apr 2015 07:55:23 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 70472 invoked by uid 99); 10 Apr 2015 07:55:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Apr 2015 07:55:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CC485E03A7; Fri, 10 Apr 2015 07:55:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mbertozzi@apache.org To: commits@hbase.apache.org Date: Fri, 10 Apr 2015 07:55:27 -0000 Message-Id: <0b958272555546b3acab78e5ed016b5e@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [06/28] hbase git commit: HBASE-13203 Procedure v2 - master create/delete table http://git-wip-us.apache.org/repos/asf/hbase/blob/9d684ac3/hbase-protocol/src/main/protobuf/MasterProcedure.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/protobuf/MasterProcedure.proto b/hbase-protocol/src/main/protobuf/MasterProcedure.proto new file mode 100644 index 0000000..4e9b05e --- /dev/null +++ b/hbase-protocol/src/main/protobuf/MasterProcedure.proto @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +option java_package = "org.apache.hadoop.hbase.protobuf.generated"; +option java_outer_classname = "MasterProcedureProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +import "HBase.proto"; +import "RPC.proto"; + +// ============================================================================ +// WARNING - Compatibility rules +// ============================================================================ +// This .proto contains the data serialized by the master procedures. +// Each procedure has some state stored to know, which step were executed +// and what were the parameters or data created by the previous steps. +// new code should be able to handle the old format or at least fail cleanly +// triggering a rollback/cleanup. +// +// Procedures that are inheriting from a StateMachineProcedure have an enum: +// - Do not change the number of the 'State' enums. +// doing so, will cause executing the wrong 'step' on the pending +// procedures when they will be replayed. +// - Do not remove items from the enum, new code must be able to handle +// all the previous 'steps'. There may be pending procedure ready to be +// recovered replayed. alternative you can make sure that not-known state +// will result in a failure that will rollback the already executed steps. +// ============================================================================ + +enum CreateTableState { + CREATE_TABLE_PRE_OPERATION = 1; + CREATE_TABLE_WRITE_FS_LAYOUT = 2; + CREATE_TABLE_ADD_TO_META = 3; + CREATE_TABLE_ASSIGN_REGIONS = 4; + CREATE_TABLE_UPDATE_DESC_CACHE = 5; + CREATE_TABLE_POST_OPERATION = 6; +} + +message CreateTableStateData { + required UserInformation user_info = 1; + required TableSchema table_schema = 2; + repeated RegionInfo region_info = 3; +} + +enum DeleteTableState { + DELETE_TABLE_PRE_OPERATION = 1; + DELETE_TABLE_REMOVE_FROM_META = 2; + DELETE_TABLE_CLEAR_FS_LAYOUT = 3; + DELETE_TABLE_UPDATE_DESC_CACHE = 4; + DELETE_TABLE_UNASSIGN_REGIONS = 5; + DELETE_TABLE_POST_OPERATION = 6; +} + +message DeleteTableStateData { + required UserInformation user_info = 1; + required TableName table_name = 2; + repeated RegionInfo region_info = 3; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/9d684ac3/hbase-server/pom.xml ---------------------------------------------------------------------- diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml index edebb1a..107480a 100644 --- a/hbase-server/pom.xml +++ b/hbase-server/pom.xml @@ -316,6 +316,10 @@ org.apache.hbase + hbase-procedure + + + org.apache.hbase hbase-client @@ -336,6 +340,12 @@ test + org.apache.hbase + hbase-procedure + test-jar + test + + commons-httpclient commons-httpclient http://git-wip-us.apache.org/repos/asf/hbase/blob/9d684ac3/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java index 971fa50..0da16a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java @@ -21,6 +21,7 @@ import java.net.InetAddress; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.VersionInfo; public interface RpcCallContext extends Delayable { /** @@ -57,4 +58,9 @@ public interface RpcCallContext extends Delayable { * @return Address of remote client if a request is ongoing, else null */ InetAddress getRemoteAddress(); + + /** + * @return the client version info, or null if the information is not present + */ + VersionInfo getClientVersionInfo(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/9d684ac3/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 770f4cd..c69a187 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -91,6 +91,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.VersionInfo; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.AuthMethod; @@ -399,7 +400,7 @@ public class RpcServer implements RpcServerInterface { // Set the exception as the result of the method invocation. headerBuilder.setException(exceptionBuilder.build()); } - // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the + // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the // reservoir when finished. This is hacky and the hack is not contained but benefits are // high when we can avoid a big buffer allocation on each rpc. this.cellBlock = ipcUtil.buildCellBlock(this.connection.codec, @@ -544,6 +545,11 @@ public class RpcServer implements RpcServerInterface { public InetAddress getRemoteAddress() { return remoteAddress; } + + @Override + public VersionInfo getClientVersionInfo() { + return connection.getVersionInfo(); + } } /** Listens on the socket. Creates jobs for the handler threads*/ @@ -1273,6 +1279,13 @@ public class RpcServer implements RpcServerInterface { this.lastContact = lastContact; } + public VersionInfo getVersionInfo() { + if (connectionHeader.hasVersionInfo()) { + return connectionHeader.getVersionInfo(); + } + return null; + } + /* Return true if the connection has no outstanding rpc */ private boolean isIdle() { return rpcCount.get() == 0; http://git-wip-us.apache.org/repos/asf/hbase/blob/9d684ac3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 581e3c9..8ec883a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -90,8 +90,6 @@ import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.LogCleaner; -import org.apache.hadoop.hbase.master.handler.CreateTableHandler; -import org.apache.hadoop.hbase.master.handler.DeleteTableHandler; import org.apache.hadoop.hbase.master.handler.DisableTableHandler; import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler; import org.apache.hadoop.hbase.master.handler.EnableTableHandler; @@ -100,11 +98,18 @@ import org.apache.hadoop.hbase.master.handler.TableAddFamilyHandler; import org.apache.hadoop.hbase.master.handler.TableDeleteFamilyHandler; import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler; import org.apache.hadoop.hbase.master.handler.TruncateTableHandler; +import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure; +import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; @@ -123,6 +128,7 @@ import org.apache.hadoop.hbase.util.EncryptionTest; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.HasThread; +import org.apache.hadoop.hbase.util.ModifyRegionUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.VersionInfo; @@ -290,6 +296,9 @@ public class HMaster extends HRegionServer implements MasterServices, Server { // it is assigned after 'initialized' guard set to true, so should be volatile private volatile MasterQuotaManager quotaManager; + private ProcedureExecutor procedureExecutor; + private WALProcedureStore procedureStore; + // handle table states private TableStateManager tableStateManager; @@ -1002,6 +1011,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { // Any time changing this maxThreads to > 1, pls see the comment at // AccessController#postCreateTableHandler this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1); + startProcedureExecutor(); // Start log cleaner thread int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000); @@ -1023,6 +1033,12 @@ public class HMaster extends HRegionServer implements MasterServices, Server { } @Override + protected void sendShutdownInterrupt() { + super.sendShutdownInterrupt(); + stopProcedureExecutor(); + } + + @Override protected void stopServiceThreads() { if (masterJettyServer != null) { LOG.info("Stopping master jetty server"); @@ -1034,6 +1050,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { } super.stopServiceThreads(); stopChores(); + // Wait for all the remaining region servers to report in IFF we were // running a cluster shutdown AND we were NOT aborting. if (!isAborted() && this.serverManager != null && @@ -1054,6 +1071,34 @@ public class HMaster extends HRegionServer implements MasterServices, Server { if (this.mpmHost != null) this.mpmHost.stop("server shutting down."); } + private void startProcedureExecutor() throws IOException { + final MasterProcedureEnv procEnv = new MasterProcedureEnv(this); + final Path logDir = new Path(fileSystemManager.getRootDir(), + MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR); + + procedureStore = new WALProcedureStore(conf, fileSystemManager.getFileSystem(), logDir, + new MasterProcedureEnv.WALStoreLeaseRecovery(this)); + procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this)); + procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore, + procEnv.getProcedureQueue()); + + final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, + Math.max(Runtime.getRuntime().availableProcessors(), + MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS)); + procedureStore.start(numThreads); + procedureExecutor.start(numThreads); + } + + private void stopProcedureExecutor() { + if (procedureExecutor != null) { + procedureExecutor.stop(); + } + + if (procedureStore != null) { + procedureStore.stop(isAborted()); + } + } + private void stopChores() { if (this.balancerChore != null) { this.balancerChore.cancel(true); @@ -1290,7 +1335,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { String namespace = hTableDescriptor.getTableName().getNamespaceAsString(); ensureNamespaceExists(namespace); - HRegionInfo[] newRegions = getHRegionInfos(hTableDescriptor, splitKeys); + HRegionInfo[] newRegions = ModifyRegionUtils.createHRegionInfos(hTableDescriptor, splitKeys); checkInitialized(); sanityCheckTableDescriptor(hTableDescriptor); this.quotaManager.checkNamespaceTableAndRegionQuota(hTableDescriptor.getTableName(), @@ -1299,13 +1344,22 @@ public class HMaster extends HRegionServer implements MasterServices, Server { cpHost.preCreateTable(hTableDescriptor, newRegions); } LOG.info(getClientIdAuditPrefix() + " create " + hTableDescriptor); - this.service.submit(new CreateTableHandler(this, - this.fileSystemManager, hTableDescriptor, conf, - newRegions, this).prepare()); + + // TODO: We can handle/merge duplicate requests, and differentiate the case of + // TableExistsException by saying if the schema is the same or not. + ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(); + long procId = this.procedureExecutor.submitProcedure( + new CreateTableProcedure(procedureExecutor.getEnvironment(), + hTableDescriptor, newRegions, latch)); + latch.await(); + if (cpHost != null) { cpHost.postCreateTable(hTableDescriptor, newRegions); } + // TODO: change the interface to return the procId, + // and add it to the response protobuf. + //return procId; } /** @@ -1512,29 +1566,6 @@ public class HMaster extends HRegionServer implements MasterServices, Server { RegionCoprocessorHost.testTableCoprocessorAttrs(conf, htd); } - private HRegionInfo[] getHRegionInfos(HTableDescriptor hTableDescriptor, - byte[][] splitKeys) { - long regionId = System.currentTimeMillis(); - HRegionInfo[] hRegionInfos = null; - if (splitKeys == null || splitKeys.length == 0) { - hRegionInfos = new HRegionInfo[]{new HRegionInfo(hTableDescriptor.getTableName(), null, null, - false, regionId)}; - } else { - int numRegions = splitKeys.length + 1; - hRegionInfos = new HRegionInfo[numRegions]; - byte[] startKey = null; - byte[] endKey = null; - for (int i = 0; i < numRegions; i++) { - endKey = (i == splitKeys.length) ? null : splitKeys[i]; - hRegionInfos[i] = - new HRegionInfo(hTableDescriptor.getTableName(), startKey, endKey, - false, regionId); - startKey = endKey; - } - } - return hRegionInfos; - } - private static boolean isCatalogTable(final TableName tableName) { return tableName.equals(TableName.META_TABLE_NAME); } @@ -1546,10 +1577,20 @@ public class HMaster extends HRegionServer implements MasterServices, Server { cpHost.preDeleteTable(tableName); } LOG.info(getClientIdAuditPrefix() + " delete " + tableName); - this.service.submit(new DeleteTableHandler(tableName, this, this).prepare()); + + // TODO: We can handle/merge duplicate request + ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(); + long procId = this.procedureExecutor.submitProcedure( + new DeleteTableProcedure(procedureExecutor.getEnvironment(), tableName, latch)); + latch.await(); + if (cpHost != null) { cpHost.postDeleteTable(tableName); } + + // TODO: change the interface to return the procId, + // and add it to the response protobuf. + //return procId; } @Override @@ -1851,6 +1892,11 @@ public class HMaster extends HRegionServer implements MasterServices, Server { } @Override + public ProcedureExecutor getMasterProcedureExecutor() { + return procedureExecutor; + } + + @Override public ServerName getServerName() { return this.serverName; } http://git-wip-us.apache.org/repos/asf/hbase/blob/9d684ac3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 63f3119..7352fe8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.quotas.MasterQuotaManager; @@ -82,6 +84,11 @@ public interface MasterServices extends Server { MasterQuotaManager getMasterQuotaManager(); /** + * @return Master's instance of {@link ProcedureExecutor} + */ + ProcedureExecutor getMasterProcedureExecutor(); + + /** * Check table is modifiable; i.e. exists and is offline. * @param tableName Name of table to check. * @throws TableNotDisabledException http://git-wip-us.apache.org/repos/asf/hbase/blob/9d684ac3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableNamespaceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableNamespaceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableNamespaceManager.java index f0f8fdd..02912b9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableNamespaceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableNamespaceManager.java @@ -49,7 +49,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.constraint.ConstraintException; -import org.apache.hadoop.hbase.master.handler.CreateTableHandler; +import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.util.Bytes; @@ -231,18 +231,15 @@ public class TableNamespaceManager { } private void createNamespaceTable(MasterServices masterServices) throws IOException { - HRegionInfo newRegions[] = new HRegionInfo[]{ + HRegionInfo[] newRegions = new HRegionInfo[]{ new HRegionInfo(HTableDescriptor.NAMESPACE_TABLEDESC.getTableName(), null, null)}; - //we need to create the table this way to bypass - //checkInitialized - masterServices.getExecutorService() - .submit(new CreateTableHandler(masterServices, - masterServices.getMasterFileSystem(), - HTableDescriptor.NAMESPACE_TABLEDESC, - masterServices.getConfiguration(), - newRegions, - masterServices).prepare()); + // we need to create the table this way to bypass checkInitialized + masterServices.getMasterProcedureExecutor() + .submitProcedure(new CreateTableProcedure( + masterServices.getMasterProcedureExecutor().getEnvironment(), + HTableDescriptor.NAMESPACE_TABLEDESC, + newRegions)); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/9d684ac3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java new file mode 100644 index 0000000..dd6d387 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java @@ -0,0 +1,442 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.InputStream; +import java.io.OutputStream; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.TableDescriptor; +import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.client.TableState; +import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos; +import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateTableState; +import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.ModifyRegionUtils; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import org.apache.hadoop.security.UserGroupInformation; + +import com.google.common.collect.Lists; + +@InterfaceAudience.Private +public class CreateTableProcedure + extends StateMachineProcedure + implements TableProcedureInterface { + private static final Log LOG = LogFactory.getLog(CreateTableProcedure.class); + + private final AtomicBoolean aborted = new AtomicBoolean(false); + + // used for compatibility with old clients + private final ProcedurePrepareLatch syncLatch; + + private HTableDescriptor hTableDescriptor; + private List newRegions; + private UserGroupInformation user; + + public CreateTableProcedure() { + // Required by the Procedure framework to create the procedure on replay + syncLatch = null; + } + + public CreateTableProcedure(final MasterProcedureEnv env, + final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions) + throws IOException { + this(env, hTableDescriptor, newRegions, null); + } + + public CreateTableProcedure(final MasterProcedureEnv env, + final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions, + final ProcedurePrepareLatch syncLatch) + throws IOException { + this.hTableDescriptor = hTableDescriptor; + this.newRegions = newRegions != null ? Lists.newArrayList(newRegions) : null; + this.user = env.getRequestUser().getUGI(); + + // used for compatibility with clients without procedures + // they need a sync TableExistsException + this.syncLatch = syncLatch; + } + + @Override + protected Flow executeFromState(final MasterProcedureEnv env, final CreateTableState state) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + " execute state=" + state); + } + try { + switch (state) { + case CREATE_TABLE_PRE_OPERATION: + // Verify if we can create the table + boolean exists = !prepareCreate(env); + ProcedurePrepareLatch.releaseLatch(syncLatch, this); + + if (exists) { + assert isFailed() : "the delete should have an exception here"; + return Flow.NO_MORE_STATE; + } + + preCreate(env); + setNextState(CreateTableState.CREATE_TABLE_WRITE_FS_LAYOUT); + break; + case CREATE_TABLE_WRITE_FS_LAYOUT: + newRegions = createFsLayout(env, hTableDescriptor, newRegions); + setNextState(CreateTableState.CREATE_TABLE_ADD_TO_META); + break; + case CREATE_TABLE_ADD_TO_META: + newRegions = addTableToMeta(env, hTableDescriptor, newRegions); + setNextState(CreateTableState.CREATE_TABLE_ASSIGN_REGIONS); + break; + case CREATE_TABLE_ASSIGN_REGIONS: + assignRegions(env, getTableName(), newRegions); + setNextState(CreateTableState.CREATE_TABLE_UPDATE_DESC_CACHE); + break; + case CREATE_TABLE_UPDATE_DESC_CACHE: + updateTableDescCache(env, getTableName()); + setNextState(CreateTableState.CREATE_TABLE_POST_OPERATION); + break; + case CREATE_TABLE_POST_OPERATION: + postCreate(env); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } catch (InterruptedException|IOException e) { + LOG.error("Error trying to create table=" + getTableName() + " state=" + state, e); + setFailure("master-create-table", e); + } + return Flow.HAS_MORE_STATE; + } + + @Override + protected void rollbackState(final MasterProcedureEnv env, final CreateTableState state) + throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace(this + " rollback state=" + state); + } + try { + switch (state) { + case CREATE_TABLE_POST_OPERATION: + break; + case CREATE_TABLE_UPDATE_DESC_CACHE: + DeleteTableProcedure.deleteTableDescriptorCache(env, getTableName()); + break; + case CREATE_TABLE_ASSIGN_REGIONS: + DeleteTableProcedure.deleteAssignmentState(env, getTableName()); + break; + case CREATE_TABLE_ADD_TO_META: + DeleteTableProcedure.deleteFromMeta(env, getTableName(), newRegions); + break; + case CREATE_TABLE_WRITE_FS_LAYOUT: + DeleteTableProcedure.deleteFromFs(env, getTableName(), newRegions, false); + break; + case CREATE_TABLE_PRE_OPERATION: + DeleteTableProcedure.deleteTableStates(env, getTableName()); + // TODO-MAYBE: call the deleteTable coprocessor event? + ProcedurePrepareLatch.releaseLatch(syncLatch, this); + break; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } catch (IOException e) { + // This will be retried. Unless there is a bug in the code, + // this should be just a "temporary error" (e.g. network down) + LOG.warn("Failed rollback attempt step=" + state + " table=" + getTableName(), e); + throw e; + } + } + + @Override + protected CreateTableState getState(final int stateId) { + return CreateTableState.valueOf(stateId); + } + + @Override + protected int getStateId(final CreateTableState state) { + return state.getNumber(); + } + + @Override + protected CreateTableState getInitialState() { + return CreateTableState.CREATE_TABLE_PRE_OPERATION; + } + + @Override + protected void setNextState(final CreateTableState state) { + if (aborted.get()) { + setAbortFailure("create-table", "abort requested"); + } else { + super.setNextState(state); + } + } + + @Override + public TableName getTableName() { + return hTableDescriptor.getTableName(); + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.CREATE; + } + + @Override + public boolean abort(final MasterProcedureEnv env) { + aborted.set(true); + return true; + } + + @Override + public void toStringClassDetails(StringBuilder sb) { + sb.append(getClass().getSimpleName()); + sb.append(" (table="); + sb.append(getTableName()); + sb.append(") user="); + sb.append(user); + } + + @Override + public void serializeStateData(final OutputStream stream) throws IOException { + super.serializeStateData(stream); + + MasterProcedureProtos.CreateTableStateData.Builder state = + MasterProcedureProtos.CreateTableStateData.newBuilder() + .setUserInfo(MasterProcedureUtil.toProtoUserInfo(this.user)) + .setTableSchema(hTableDescriptor.convert()); + if (newRegions != null) { + for (HRegionInfo hri: newRegions) { + state.addRegionInfo(HRegionInfo.convert(hri)); + } + } + state.build().writeDelimitedTo(stream); + } + + @Override + public void deserializeStateData(final InputStream stream) throws IOException { + super.deserializeStateData(stream); + + MasterProcedureProtos.CreateTableStateData state = + MasterProcedureProtos.CreateTableStateData.parseDelimitedFrom(stream); + user = MasterProcedureUtil.toUserInfo(state.getUserInfo()); + hTableDescriptor = HTableDescriptor.convert(state.getTableSchema()); + if (state.getRegionInfoCount() == 0) { + newRegions = null; + } else { + newRegions = new ArrayList(state.getRegionInfoCount()); + for (HBaseProtos.RegionInfo hri: state.getRegionInfoList()) { + newRegions.add(HRegionInfo.convert(hri)); + } + } + } + + @Override + protected boolean acquireLock(final MasterProcedureEnv env) { + return env.getProcedureQueue().tryAcquireTableWrite(getTableName(), "create table"); + } + + @Override + protected void releaseLock(final MasterProcedureEnv env) { + env.getProcedureQueue().releaseTableWrite(getTableName()); + } + + private boolean prepareCreate(final MasterProcedureEnv env) throws IOException { + final TableName tableName = getTableName(); + if (MetaTableAccessor.tableExists(env.getMasterServices().getConnection(), tableName)) { + setFailure("master-create-table", new TableExistsException(getTableName())); + return false; + } + return true; + } + + private void preCreate(final MasterProcedureEnv env) + throws IOException, InterruptedException { + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + final HRegionInfo[] regions = newRegions == null ? null : + newRegions.toArray(new HRegionInfo[newRegions.size()]); + user.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + cpHost.preCreateTableHandler(hTableDescriptor, regions); + return null; + } + }); + } + } + + private void postCreate(final MasterProcedureEnv env) + throws IOException, InterruptedException { + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + final HRegionInfo[] regions = (newRegions == null) ? null : + newRegions.toArray(new HRegionInfo[newRegions.size()]); + user.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + cpHost.postCreateTableHandler(hTableDescriptor, regions); + return null; + } + }); + } + } + + protected interface CreateHdfsRegions { + List createHdfsRegions(final MasterProcedureEnv env, + final Path tableRootDir, final TableName tableName, + final List newRegions) throws IOException; + } + + protected static List createFsLayout(final MasterProcedureEnv env, + final HTableDescriptor hTableDescriptor, final List newRegions) + throws IOException { + return createFsLayout(env, hTableDescriptor, newRegions, new CreateHdfsRegions() { + @Override + public List createHdfsRegions(final MasterProcedureEnv env, + final Path tableRootDir, final TableName tableName, + final List newRegions) throws IOException { + HRegionInfo[] regions = newRegions != null ? + newRegions.toArray(new HRegionInfo[newRegions.size()]) : null; + return ModifyRegionUtils.createRegions(env.getMasterConfiguration(), + tableRootDir, hTableDescriptor, regions, null); + } + }); + } + + protected static List createFsLayout(final MasterProcedureEnv env, + final HTableDescriptor hTableDescriptor, List newRegions, + final CreateHdfsRegions hdfsRegionHandler) throws IOException { + final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); + final Path tempdir = mfs.getTempDir(); + + // 1. Create Table Descriptor + // using a copy of descriptor, table will be created enabling first + TableDescriptor underConstruction = new TableDescriptor(hTableDescriptor); + final Path tempTableDir = FSUtils.getTableDir(tempdir, hTableDescriptor.getTableName()); + ((FSTableDescriptors)(env.getMasterServices().getTableDescriptors())) + .createTableDescriptorForTableDirectory( + tempTableDir, underConstruction, false); + + // 2. Create Regions + newRegions = hdfsRegionHandler.createHdfsRegions(env, tempdir, + hTableDescriptor.getTableName(), newRegions); + + // 3. Move Table temp directory to the hbase root location + final Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), hTableDescriptor.getTableName()); + FileSystem fs = mfs.getFileSystem(); + if (!fs.delete(tableDir, true) && fs.exists(tableDir)) { + throw new IOException("Couldn't delete " + tableDir); + } + if (!fs.rename(tempTableDir, tableDir)) { + throw new IOException("Unable to move table from temp=" + tempTableDir + + " to hbase root=" + tableDir); + } + return newRegions; + } + + protected static List addTableToMeta(final MasterProcedureEnv env, + final HTableDescriptor hTableDescriptor, + final List regions) throws IOException { + if (regions != null && regions.size() > 0) { + ProcedureSyncWait.waitMetaRegions(env); + + // Add regions to META + addRegionsToMeta(env, hTableDescriptor, regions); + // Add replicas if needed + List newRegions = addReplicas(env, hTableDescriptor, regions); + + // Setup replication for region replicas if needed + if (hTableDescriptor.getRegionReplication() > 1) { + ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterConfiguration()); + } + return newRegions; + } + return regions; + } + + /** + * Create any replicas for the regions (the default replicas that was + * already created is passed to the method) + * @param hTableDescriptor descriptor to use + * @param regions default replicas + * @return the combined list of default and non-default replicas + */ + private static List addReplicas(final MasterProcedureEnv env, + final HTableDescriptor hTableDescriptor, + final List regions) { + int numRegionReplicas = hTableDescriptor.getRegionReplication() - 1; + if (numRegionReplicas <= 0) { + return regions; + } + List hRegionInfos = + new ArrayList((numRegionReplicas+1)*regions.size()); + for (int i = 0; i < regions.size(); i++) { + for (int j = 1; j <= numRegionReplicas; j++) { + hRegionInfos.add(RegionReplicaUtil.getRegionInfoForReplica(regions.get(i), j)); + } + } + hRegionInfos.addAll(regions); + return hRegionInfos; + } + + protected static void assignRegions(final MasterProcedureEnv env, + final TableName tableName, final List regions) throws IOException { + ProcedureSyncWait.waitRegionServers(env); + + // Trigger immediate assignment of the regions in round-robin fashion + final AssignmentManager assignmentManager = env.getMasterServices().getAssignmentManager(); + ModifyRegionUtils.assignRegions(assignmentManager, regions); + + // Enable table + assignmentManager.getTableStateManager() + .setTableState(tableName, TableState.State.ENABLED); + } + + /** + * Add the specified set of regions to the hbase:meta table. + */ + protected static void addRegionsToMeta(final MasterProcedureEnv env, + final HTableDescriptor hTableDescriptor, + final List regionInfos) throws IOException { + MetaTableAccessor.addRegionsToMeta(env.getMasterServices().getConnection(), + regionInfos, hTableDescriptor.getRegionReplication()); + } + + protected static void updateTableDescCache(final MasterProcedureEnv env, + final TableName tableName) throws IOException { + env.getMasterServices().getTableDescriptors().get(tableName); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/9d684ac3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java new file mode 100644 index 0000000..ad5e671 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java @@ -0,0 +1,420 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.InputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotDisabledException; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.backup.HFileArchiver; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.exceptions.HBaseException; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos; +import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.quotas.MasterQuotaManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.security.UserGroupInformation; + +@InterfaceAudience.Private +public class DeleteTableProcedure + extends StateMachineProcedure + implements TableProcedureInterface { + private static final Log LOG = LogFactory.getLog(DeleteTableProcedure.class); + + private List regions; + private UserGroupInformation user; + private TableName tableName; + + // used for compatibility with old clients + private final ProcedurePrepareLatch syncLatch; + + public DeleteTableProcedure() { + // Required by the Procedure framework to create the procedure on replay + syncLatch = null; + } + + public DeleteTableProcedure(final MasterProcedureEnv env, final TableName tableName) + throws IOException { + this(env, tableName, null); + } + + public DeleteTableProcedure(final MasterProcedureEnv env, final TableName tableName, + final ProcedurePrepareLatch syncLatch) throws IOException { + this.tableName = tableName; + this.user = env.getRequestUser().getUGI(); + + // used for compatibility with clients without procedures + // they need a sync TableNotFoundException, TableNotDisabledException, ... + this.syncLatch = syncLatch; + } + + @Override + protected Flow executeFromState(final MasterProcedureEnv env, DeleteTableState state) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + " execute state=" + state); + } + try { + switch (state) { + case DELETE_TABLE_PRE_OPERATION: + // Verify if we can delete the table + boolean deletable = prepareDelete(env); + ProcedurePrepareLatch.releaseLatch(syncLatch, this); + if (!deletable) { + assert isFailed() : "the delete should have an exception here"; + return Flow.NO_MORE_STATE; + } + + preDelete(env); + + // TODO: Move out... in the acquireLock() + LOG.debug("waiting for '" + getTableName() + "' regions in transition"); + regions = ProcedureSyncWait.getRegionsFromMeta(env, getTableName()); + assert regions != null && !regions.isEmpty() : "unexpected 0 regions"; + ProcedureSyncWait.waitRegionInTransition(env, regions); + + setNextState(DeleteTableState.DELETE_TABLE_REMOVE_FROM_META); + break; + case DELETE_TABLE_REMOVE_FROM_META: + LOG.debug("delete '" + getTableName() + "' regions from META"); + DeleteTableProcedure.deleteFromMeta(env, getTableName(), regions); + setNextState(DeleteTableState.DELETE_TABLE_CLEAR_FS_LAYOUT); + break; + case DELETE_TABLE_CLEAR_FS_LAYOUT: + LOG.debug("delete '" + getTableName() + "' from filesystem"); + DeleteTableProcedure.deleteFromFs(env, getTableName(), regions, true); + setNextState(DeleteTableState.DELETE_TABLE_UPDATE_DESC_CACHE); + break; + case DELETE_TABLE_UPDATE_DESC_CACHE: + LOG.debug("delete '" + getTableName() + "' descriptor"); + DeleteTableProcedure.deleteTableDescriptorCache(env, getTableName()); + setNextState(DeleteTableState.DELETE_TABLE_UNASSIGN_REGIONS); + break; + case DELETE_TABLE_UNASSIGN_REGIONS: + LOG.debug("delete '" + getTableName() + "' assignment state"); + DeleteTableProcedure.deleteAssignmentState(env, getTableName()); + setNextState(DeleteTableState.DELETE_TABLE_POST_OPERATION); + break; + case DELETE_TABLE_POST_OPERATION: + postDelete(env); + LOG.debug("delete '" + getTableName() + "' completed"); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } catch (HBaseException|IOException e) { + LOG.warn("Retriable error trying to delete table=" + getTableName() + " state=" + state, e); + } catch (InterruptedException e) { + // if the interrupt is real, the executor will be stopped. + LOG.warn("Interrupted trying to delete table=" + getTableName() + " state=" + state, e); + } + return Flow.HAS_MORE_STATE; + } + + @Override + protected void rollbackState(final MasterProcedureEnv env, final DeleteTableState state) { + if (state == DeleteTableState.DELETE_TABLE_PRE_OPERATION) { + // nothing to rollback, pre-delete is just table-state checks. + // We can fail if the table does not exist or is not disabled. + ProcedurePrepareLatch.releaseLatch(syncLatch, this); + return; + } + + // The delete doesn't have a rollback. The execution will succeed, at some point. + throw new UnsupportedOperationException("unhandled state=" + state); + } + + @Override + protected DeleteTableState getState(final int stateId) { + return DeleteTableState.valueOf(stateId); + } + + @Override + protected int getStateId(final DeleteTableState state) { + return state.getNumber(); + } + + @Override + protected DeleteTableState getInitialState() { + return DeleteTableState.DELETE_TABLE_PRE_OPERATION; + } + + @Override + public TableName getTableName() { + return tableName; + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.DELETE; + } + + @Override + public boolean abort(final MasterProcedureEnv env) { + // TODO: We may be able to abort if the procedure is not started yet. + return false; + } + + @Override + protected boolean acquireLock(final MasterProcedureEnv env) { + if (!env.isInitialized()) return false; + return env.getProcedureQueue().tryAcquireTableWrite(getTableName(), "delete table"); + } + + @Override + protected void releaseLock(final MasterProcedureEnv env) { + env.getProcedureQueue().releaseTableWrite(getTableName()); + } + + @Override + public void toStringClassDetails(StringBuilder sb) { + sb.append(getClass().getSimpleName()); + sb.append(" (table="); + sb.append(getTableName()); + sb.append(") user="); + sb.append(user); + } + + @Override + public void serializeStateData(final OutputStream stream) throws IOException { + super.serializeStateData(stream); + + MasterProcedureProtos.DeleteTableStateData.Builder state = + MasterProcedureProtos.DeleteTableStateData.newBuilder() + .setUserInfo(MasterProcedureUtil.toProtoUserInfo(this.user)) + .setTableName(ProtobufUtil.toProtoTableName(tableName)); + if (regions != null) { + for (HRegionInfo hri: regions) { + state.addRegionInfo(HRegionInfo.convert(hri)); + } + } + state.build().writeDelimitedTo(stream); + } + + @Override + public void deserializeStateData(final InputStream stream) throws IOException { + super.deserializeStateData(stream); + + MasterProcedureProtos.DeleteTableStateData state = + MasterProcedureProtos.DeleteTableStateData.parseDelimitedFrom(stream); + user = MasterProcedureUtil.toUserInfo(state.getUserInfo()); + tableName = ProtobufUtil.toTableName(state.getTableName()); + if (state.getRegionInfoCount() == 0) { + regions = null; + } else { + regions = new ArrayList(state.getRegionInfoCount()); + for (HBaseProtos.RegionInfo hri: state.getRegionInfoList()) { + regions.add(HRegionInfo.convert(hri)); + } + } + } + + private boolean prepareDelete(final MasterProcedureEnv env) throws IOException { + try { + env.getMasterServices().checkTableModifiable(tableName); + } catch (TableNotFoundException|TableNotDisabledException e) { + setFailure("master-delete-table", e); + return false; + } + return true; + } + + private boolean preDelete(final MasterProcedureEnv env) + throws IOException, InterruptedException { + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + final TableName tableName = this.tableName; + user.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + cpHost.preDeleteTableHandler(tableName); + return null; + } + }); + } + return true; + } + + private void postDelete(final MasterProcedureEnv env) + throws IOException, InterruptedException { + deleteTableStates(env, tableName); + + final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); + if (cpHost != null) { + final TableName tableName = this.tableName; + user.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + cpHost.postDeleteTableHandler(tableName); + return null; + } + }); + } + } + + protected static void deleteFromFs(final MasterProcedureEnv env, + final TableName tableName, final List regions, + final boolean archive) throws IOException { + final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); + final FileSystem fs = mfs.getFileSystem(); + final Path tempdir = mfs.getTempDir(); + + final Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), tableName); + final Path tempTableDir = FSUtils.getTableDir(tempdir, tableName); + + if (fs.exists(tableDir)) { + // Ensure temp exists + if (!fs.exists(tempdir) && !fs.mkdirs(tempdir)) { + throw new IOException("HBase temp directory '" + tempdir + "' creation failure."); + } + + // Ensure parent exists + if (!fs.exists(tempTableDir.getParent()) && !fs.mkdirs(tempTableDir.getParent())) { + throw new IOException("HBase temp directory '" + tempdir + "' creation failure."); + } + + // Move the table in /hbase/.tmp + if (!fs.rename(tableDir, tempTableDir)) { + if (fs.exists(tempTableDir)) { + // TODO + // what's in this dir? something old? probably something manual from the user... + // let's get rid of this stuff... + FileStatus[] files = fs.listStatus(tempdir); + if (files != null && files.length > 0) { + for (int i = 0; i < files.length; ++i) { + if (!files[i].isDir()) continue; + HFileArchiver.archiveRegion(fs, mfs.getRootDir(), tempTableDir, files[i].getPath()); + } + } + fs.delete(tempdir, true); + } + throw new IOException("Unable to move '" + tableDir + "' to temp '" + tempTableDir + "'"); + } + } + + // Archive regions from FS (temp directory) + if (archive) { + for (HRegionInfo hri : regions) { + LOG.debug("Archiving region " + hri.getRegionNameAsString() + " from FS"); + HFileArchiver.archiveRegion(fs, mfs.getRootDir(), + tempTableDir, HRegion.getRegionDir(tempTableDir, hri.getEncodedName())); + } + LOG.debug("Table '" + tableName + "' archived!"); + } + + // Delete table directory from FS (temp directory) + if (!fs.delete(tempTableDir, true) && fs.exists(tempTableDir)) { + throw new IOException("Couldn't delete " + tempTableDir); + } + } + + /** + * There may be items for this table still up in hbase:meta in the case where the + * info:regioninfo column was empty because of some write error. Remove ALL rows from hbase:meta + * that have to do with this table. See HBASE-12980. + * @throws IOException + */ + private static void cleanAnyRemainingRows(final MasterProcedureEnv env, + final TableName tableName) throws IOException { + ClusterConnection connection = env.getMasterServices().getConnection(); + Scan tableScan = MetaTableAccessor.getScanForTableName(connection, tableName); + try (Table metaTable = + connection.getTable(TableName.META_TABLE_NAME)) { + List deletes = new ArrayList(); + try (ResultScanner resScanner = metaTable.getScanner(tableScan)) { + for (Result result : resScanner) { + deletes.add(new Delete(result.getRow())); + } + } + if (!deletes.isEmpty()) { + LOG.warn("Deleting some vestigal " + deletes.size() + " rows of " + tableName + + " from " + TableName.META_TABLE_NAME); + metaTable.delete(deletes); + } + } + } + + protected static void deleteFromMeta(final MasterProcedureEnv env, + final TableName tableName, List regions) throws IOException { + MetaTableAccessor.deleteRegions(env.getMasterServices().getConnection(), regions); + + // Clean any remaining rows for this table. + cleanAnyRemainingRows(env, tableName); + } + + protected static void deleteAssignmentState(final MasterProcedureEnv env, + final TableName tableName) throws IOException { + AssignmentManager am = env.getMasterServices().getAssignmentManager(); + + // Clean up regions of the table in RegionStates. + LOG.debug("Removing '" + tableName + "' from region states."); + am.getRegionStates().tableDeleted(tableName); + + // If entry for this table states, remove it. + LOG.debug("Marking '" + tableName + "' as deleted."); + am.getTableStateManager().setDeletedTable(tableName); + } + + protected static void deleteTableDescriptorCache(final MasterProcedureEnv env, + final TableName tableName) throws IOException { + LOG.debug("Removing '" + tableName + "' descriptor."); + env.getMasterServices().getTableDescriptors().remove(tableName); + } + + protected static void deleteTableStates(final MasterProcedureEnv env, final TableName tableName) + throws IOException { + getMasterQuotaManager(env).removeTableFromNamespaceQuota(tableName); + } + + private static MasterQuotaManager getMasterQuotaManager(final MasterProcedureEnv env) + throws IOException { + return ProcedureSyncWait.waitFor(env, "quota manager to be available", + new ProcedureSyncWait.Predicate() { + @Override + public MasterQuotaManager evaluate() throws IOException { + return env.getMasterServices().getMasterQuotaManager(); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/9d684ac3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java new file mode 100644 index 0000000..90ed4ee --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.Private +public final class MasterProcedureConstants { + private MasterProcedureConstants() {} + + public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs"; + + public static final String MASTER_PROCEDURE_THREADS = "hbase.master.procedure.threads"; + public static final int DEFAULT_MIN_MASTER_PROCEDURE_THREADS = 4; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/9d684ac3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java new file mode 100644 index 0000000..0a33cd4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.FSUtils; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class MasterProcedureEnv { + private static final Log LOG = LogFactory.getLog(MasterProcedureEnv.class); + + @InterfaceAudience.Private + public static class WALStoreLeaseRecovery implements WALProcedureStore.LeaseRecovery { + private final HMaster master; + + public WALStoreLeaseRecovery(final HMaster master) { + this.master = master; + } + + @Override + public void recoverFileLease(final FileSystem fs, final Path path) throws IOException { + final Configuration conf = master.getConfiguration(); + final FSUtils fsUtils = FSUtils.getInstance(fs, conf); + fsUtils.recoverFileLease(fs, path, conf, new CancelableProgressable() { + @Override + public boolean progress() { + LOG.debug("Recover Procedure Store log lease: " + path); + return master.isActiveMaster(); + } + }); + } + } + + @InterfaceAudience.Private + public static class MasterProcedureStoreListener + implements ProcedureStore.ProcedureStoreListener { + private final HMaster master; + + public MasterProcedureStoreListener(final HMaster master) { + this.master = master; + } + + @Override + public void abortProcess() { + master.abort("The Procedure Store lost the lease"); + } + } + + private final MasterProcedureQueue procQueue; + private final MasterServices master; + + public MasterProcedureEnv(final MasterServices master) { + this.master = master; + this.procQueue = new MasterProcedureQueue(master.getConfiguration(), + master.getTableLockManager()); + } + + public User getRequestUser() throws IOException { + User user = RpcServer.getRequestUser(); + if (user == null) { + user = UserProvider.instantiate(getMasterConfiguration()).getCurrent(); + } + return user; + } + + public MasterServices getMasterServices() { + return master; + } + + public Configuration getMasterConfiguration() { + return master.getConfiguration(); + } + + public MasterCoprocessorHost getMasterCoprocessorHost() { + return master.getMasterCoprocessorHost(); + } + + public MasterProcedureQueue getProcedureQueue() { + return procQueue; + } + + public boolean isRunning() { + return master.getMasterProcedureExecutor().isRunning(); + } + + public boolean isInitialized() { + return master.isInitialized(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/9d684ac3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java new file mode 100644 index 0000000..0dd0c3d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java @@ -0,0 +1,448 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureFairRunQueues; +import org.apache.hadoop.hbase.procedure2.ProcedureRunnableSet; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; +import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; + +/** + * ProcedureRunnableSet for the Master Procedures. + * This RunnableSet tries to provide to the ProcedureExecutor procedures + * that can be executed without having to wait on a lock. + * Most of the master operations can be executed concurrently, if the they + * are operating on different tables (e.g. two create table can be performed + * at the same, time assuming table A and table B). + * + * Each procedure should implement an interface providing information for this queue. + * for example table related procedures should implement TableProcedureInterface. + * each procedure will be pushed in its own queue, and based on the operation type + * we may take smarter decision. e.g. we can abort all the operations preceding + * a delete table, or similar. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class MasterProcedureQueue implements ProcedureRunnableSet { + private static final Log LOG = LogFactory.getLog(MasterProcedureQueue.class); + + private final ProcedureFairRunQueues fairq; + private final ReentrantLock lock = new ReentrantLock(); + private final Condition waitCond = lock.newCondition(); + private final TableLockManager lockManager; + + private final int metaTablePriority; + private final int userTablePriority; + private final int sysTablePriority; + + private int queueSize; + + public MasterProcedureQueue(final Configuration conf, final TableLockManager lockManager) { + this.fairq = new ProcedureFairRunQueues(1); + this.lockManager = lockManager; + + // TODO: should this be part of the HTD? + metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3); + sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2); + userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1); + } + + @Override + public void addFront(final Procedure proc) { + lock.lock(); + try { + getRunQueueOrCreate(proc).addFront(proc); + queueSize++; + waitCond.signal(); + } finally { + lock.unlock(); + } + } + + @Override + public void addBack(final Procedure proc) { + lock.lock(); + try { + getRunQueueOrCreate(proc).addBack(proc); + queueSize++; + waitCond.signal(); + } finally { + lock.unlock(); + } + } + + @Override + public void yield(final Procedure proc) { + addFront(proc); + } + + @Override + @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") + public Long poll() { + lock.lock(); + try { + if (queueSize == 0) { + waitCond.await(); + if (queueSize == 0) { + return null; + } + } + + RunQueue queue = fairq.poll(); + if (queue != null && queue.isAvailable()) { + queueSize--; + return queue.poll(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } finally { + lock.unlock(); + } + return null; + } + + @Override + public void signalAll() { + lock.lock(); + try { + waitCond.signalAll(); + } finally { + lock.unlock(); + } + } + + @Override + public void clear() { + lock.lock(); + try { + fairq.clear(); + queueSize = 0; + } finally { + lock.unlock(); + } + } + + @Override + public int size() { + lock.lock(); + try { + return queueSize; + } finally { + lock.unlock(); + } + } + + @Override + public String toString() { + lock.lock(); + try { + return "MasterProcedureQueue size=" + queueSize + ": " + fairq; + } finally { + lock.unlock(); + } + } + + @Override + public void completionCleanup(Procedure proc) { + if (proc instanceof TableProcedureInterface) { + TableProcedureInterface iProcTable = (TableProcedureInterface)proc; + boolean tableDeleted; + if (proc.hasException()) { + IOException procEx = proc.getException().unwrapRemoteException(); + if (iProcTable.getTableOperationType() == TableOperationType.CREATE) { + // create failed because the table already exist + tableDeleted = !(procEx instanceof TableExistsException); + } else { + // the operation failed because the table does not exist + tableDeleted = (procEx instanceof TableNotFoundException); + } + } else { + // the table was deleted + tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE); + } + if (tableDeleted) { + markTableAsDeleted(iProcTable.getTableName()); + } + } + } + + private RunQueue getRunQueueOrCreate(final Procedure proc) { + if (proc instanceof TableProcedureInterface) { + final TableName table = ((TableProcedureInterface)proc).getTableName(); + return getRunQueueOrCreate(table); + } + // TODO: at the moment we only have Table procedures + // if you are implementing a non-table procedure, you have two option create + // a group for all the non-table procedures or try to find a key for your + // non-table procedure and implement something similar to the TableRunQueue. + throw new UnsupportedOperationException("RQs for non-table procedures are not implemented yet"); + } + + private TableRunQueue getRunQueueOrCreate(final TableName table) { + final TableRunQueue queue = getRunQueue(table); + if (queue != null) return queue; + return (TableRunQueue)fairq.add(table, createTableRunQueue(table)); + } + + private TableRunQueue createTableRunQueue(final TableName table) { + int priority = userTablePriority; + if (table.equals(TableName.META_TABLE_NAME)) { + priority = metaTablePriority; + } else if (table.isSystemTable()) { + priority = sysTablePriority; + } + return new TableRunQueue(priority); + } + + private TableRunQueue getRunQueue(final TableName table) { + return (TableRunQueue)fairq.get(table); + } + + /** + * Try to acquire the read lock on the specified table. + * other read operations in the table-queue may be executed concurrently, + * otherwise they have to wait until all the read-locks are released. + * @param table Table to lock + * @param purpose Human readable reason for locking the table + * @return true if we were able to acquire the lock on the table, otherwise false. + */ + public boolean tryAcquireTableRead(final TableName table, final String purpose) { + return getRunQueueOrCreate(table).tryRead(lockManager, table, purpose); + } + + /** + * Release the read lock taken with tryAcquireTableRead() + * @param table the name of the table that has the read lock + */ + public void releaseTableRead(final TableName table) { + getRunQueue(table).releaseRead(lockManager, table); + } + + /** + * Try to acquire the write lock on the specified table. + * other operations in the table-queue will be executed after the lock is released. + * @param table Table to lock + * @param purpose Human readable reason for locking the table + * @return true if we were able to acquire the lock on the table, otherwise false. + */ + public boolean tryAcquireTableWrite(final TableName table, final String purpose) { + return getRunQueueOrCreate(table).tryWrite(lockManager, table, purpose); + } + + /** + * Release the write lock taken with tryAcquireTableWrite() + * @param table the name of the table that has the write lock + */ + public void releaseTableWrite(final TableName table) { + getRunQueue(table).releaseWrite(lockManager, table); + } + + /** + * Tries to remove the queue and the table-lock of the specified table. + * If there are new operations pending (e.g. a new create), + * the remove will not be performed. + * @param table the name of the table that should be marked as deleted + * @return true if deletion succeeded, false otherwise meaning that there are + * other new operations pending for that table (e.g. a new create). + */ + protected boolean markTableAsDeleted(final TableName table) { + TableRunQueue queue = getRunQueue(table); + if (queue != null) { + lock.lock(); + try { + if (queue.isEmpty() && !queue.isLocked()) { + fairq.remove(table); + + // Remove the table lock + try { + lockManager.tableDeleted(table); + } catch (IOException e) { + LOG.warn("Received exception from TableLockManager.tableDeleted:", e); //not critical + } + } else { + // TODO: If there are no create, we can drop all the other ops + return false; + } + } finally { + lock.unlock(); + } + } + return true; + } + + private interface RunQueue extends ProcedureFairRunQueues.FairObject { + void addFront(Procedure proc); + void addBack(Procedure proc); + Long poll(); + boolean isLocked(); + } + + /** + * Run Queue for a Table. It contains a read-write lock that is used by the + * MasterProcedureQueue to decide if we should fetch an item from this queue + * or skip to another one which will be able to run without waiting for locks. + */ + private static class TableRunQueue implements RunQueue { + private final Deque runnables = new ArrayDeque(); + private final int priority; + + private TableLock tableLock = null; + private boolean wlock = false; + private int rlock = 0; + + public TableRunQueue(int priority) { + this.priority = priority; + } + + @Override + public void addFront(final Procedure proc) { + runnables.addFirst(proc.getProcId()); + } + + // TODO: Improve run-queue push with TableProcedureInterface.getType() + // we can take smart decisions based on the type of the operation (e.g. create/delete) + @Override + public void addBack(final Procedure proc) { + runnables.addLast(proc.getProcId()); + } + + @Override + public Long poll() { + return runnables.poll(); + } + + @Override + public boolean isAvailable() { + synchronized (this) { + return !wlock && !runnables.isEmpty(); + } + } + + public boolean isEmpty() { + return runnables.isEmpty(); + } + + @Override + public boolean isLocked() { + synchronized (this) { + return wlock || rlock > 0; + } + } + + public boolean tryRead(final TableLockManager lockManager, + final TableName tableName, final String purpose) { + synchronized (this) { + if (wlock) { + return false; + } + + // Take zk-read-lock + tableLock = lockManager.readLock(tableName, purpose); + try { + tableLock.acquire(); + } catch (IOException e) { + LOG.error("failed acquire read lock on " + tableName, e); + tableLock = null; + return false; + } + + rlock++; + } + return true; + } + + public void releaseRead(final TableLockManager lockManager, + final TableName tableName) { + synchronized (this) { + releaseTableLock(lockManager, rlock == 1); + rlock--; + } + } + + public boolean tryWrite(final TableLockManager lockManager, + final TableName tableName, final String purpose) { + synchronized (this) { + if (wlock || rlock > 0) { + return false; + } + + // Take zk-write-lock + tableLock = lockManager.writeLock(tableName, purpose); + try { + tableLock.acquire(); + } catch (IOException e) { + LOG.error("failed acquire write lock on " + tableName, e); + tableLock = null; + return false; + } + wlock = true; + } + return true; + } + + public void releaseWrite(final TableLockManager lockManager, + final TableName tableName) { + synchronized (this) { + releaseTableLock(lockManager, true); + wlock = false; + } + } + + private void releaseTableLock(final TableLockManager lockManager, boolean reset) { + for (int i = 0; i < 3; ++i) { + try { + tableLock.release(); + if (reset) { + tableLock = null; + } + break; + } catch (IOException e) { + LOG.warn("Could not release the table write-lock", e); + } + } + } + + @Override + public int getPriority() { + return priority; + } + + @Override + public String toString() { + return runnables.toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/9d684ac3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java new file mode 100644 index 0000000..d7c0b92 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation; +import org.apache.hadoop.security.UserGroupInformation; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class MasterProcedureUtil { + private static final Log LOG = LogFactory.getLog(MasterProcedureUtil.class); + + private MasterProcedureUtil() {} + + public static UserInformation toProtoUserInfo(UserGroupInformation ugi) { + UserInformation.Builder userInfoPB = UserInformation.newBuilder(); + userInfoPB.setEffectiveUser(ugi.getUserName()); + if (ugi.getRealUser() != null) { + userInfoPB.setRealUser(ugi.getRealUser().getUserName()); + } + return userInfoPB.build(); + } + + public static UserGroupInformation toUserInfo(UserInformation userInfoProto) { + if (userInfoProto.hasEffectiveUser()) { + String effectiveUser = userInfoProto.getEffectiveUser(); + if (userInfoProto.hasRealUser()) { + String realUser = userInfoProto.getRealUser(); + UserGroupInformation realUserUgi = UserGroupInformation.createRemoteUser(realUser); + return UserGroupInformation.createProxyUser(effectiveUser, realUserUgi); + } + return UserGroupInformation.createRemoteUser(effectiveUser); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/9d684ac3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java new file mode 100644 index 0000000..2a1abca --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.ipc.RpcCallContext; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.VersionInfo; + +/** + * Latch used by the Master to have the prepare() sync behaviour for old + * clients, that can only get exceptions in a synchronous way. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class ProcedurePrepareLatch { + private static final NoopLatch noopLatch = new NoopLatch(); + + public static ProcedurePrepareLatch createLatch() { + // don't use the latch if we have procedure support + return hasProcedureSupport() ? noopLatch : new CompatibilityLatch(); + } + + public static boolean hasProcedureSupport() { + return currentClientHasMinimumVersion(1, 1); + } + + private static boolean currentClientHasMinimumVersion(int major, int minor) { + RpcCallContext call = RpcServer.getCurrentCall(); + VersionInfo versionInfo = call != null ? call.getClientVersionInfo() : null; + if (versionInfo != null) { + String[] components = versionInfo.getVersion().split("\\."); + + int clientMajor = components.length > 0 ? Integer.parseInt(components[0]) : 0; + if (clientMajor != major) { + return clientMajor > major; + } + + int clientMinor = components.length > 1 ? Integer.parseInt(components[1]) : 0; + return clientMinor >= minor; + } + return false; + } + + protected abstract void countDown(final Procedure proc); + public abstract void await() throws IOException; + + protected static void releaseLatch(final ProcedurePrepareLatch latch, final Procedure proc) { + if (latch != null) { + latch.countDown(proc); + } + } + + private static class NoopLatch extends ProcedurePrepareLatch { + protected void countDown(final Procedure proc) {} + public void await() throws IOException {} + } + + protected static class CompatibilityLatch extends ProcedurePrepareLatch { + private final CountDownLatch latch = new CountDownLatch(1); + + private IOException exception = null; + + protected void countDown(final Procedure proc) { + if (proc.hasException()) { + exception = proc.getException().unwrapRemoteException(); + } + latch.countDown(); + } + + public void await() throws IOException { + try { + latch.await(); + } catch (InterruptedException e) { + throw (InterruptedIOException)new InterruptedIOException().initCause(e); + } + + if (exception != null) { + throw exception; + } + } + } +}