Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7F2DBD789 for ; Fri, 22 Feb 2013 00:16:30 +0000 (UTC) Received: (qmail 29184 invoked by uid 500); 22 Feb 2013 00:16:30 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 29138 invoked by uid 500); 22 Feb 2013 00:16:30 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 29127 invoked by uid 99); 22 Feb 2013 00:16:30 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Feb 2013 00:16:30 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FRT_STOCK2 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Feb 2013 00:16:16 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 6793E23888E7; Fri, 22 Feb 2013 00:15:55 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1448867 [1/3] - in /hbase/trunk: hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ hbase-protocol/src/main/protobuf/ hbase-server/src/main/java/org/apache/hadoop/hbase/ hbase-server/src/main/java/org/apache/hadoop/hb... Date: Fri, 22 Feb 2013 00:15:53 -0000 To: commits@hbase.apache.org From: enis@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130222001555.6793E23888E7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: enis Date: Fri Feb 22 00:15:52 2013 New Revision: 1448867 URL: http://svn.apache.org/r1448867 Log: HBASE-7305. ZK based Read/Write locks for table operations Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/InterProcessLock.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/InterProcessReadWriteLock.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/LockTimeoutException.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/DeletionListener.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessLockBase.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessReadLock.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessReadWriteLock.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/lock/ZKInterProcessWriteLock.java hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/lock/ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/lock/TestZKInterProcessReadWriteLock.java Modified: hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java hbase/trunk/hbase-protocol/src/main/protobuf/ZooKeeper.proto hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/CloneSnapshotHandler.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java hbase/trunk/hbase-server/src/main/resources/hbase-default.xml hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/MultithreadedTestUtil.java hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java Modified: hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java?rev=1448867&r1=1448866&r2=1448867&view=diff ============================================================================== --- hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java (original) +++ hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java Fri Feb 22 00:15:52 2013 @@ -4902,6 +4902,807 @@ public final class ZooKeeperProtos { // @@protoc_insertion_point(class_scope:ReplicationLock) } + public interface TableLockOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // required bytes tableName = 1; + boolean hasTableName(); + com.google.protobuf.ByteString getTableName(); + + // required .ServerName lockOwner = 2; + boolean hasLockOwner(); + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getLockOwner(); + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getLockOwnerOrBuilder(); + + // required int64 threadId = 3; + boolean hasThreadId(); + long getThreadId(); + + // required bool isShared = 4; + boolean hasIsShared(); + boolean getIsShared(); + + // optional string purpose = 5; + boolean hasPurpose(); + String getPurpose(); + } + public static final class TableLock extends + com.google.protobuf.GeneratedMessage + implements TableLockOrBuilder { + // Use TableLock.newBuilder() to construct. + private TableLock(Builder builder) { + super(builder); + } + private TableLock(boolean noInit) {} + + private static final TableLock defaultInstance; + public static TableLock getDefaultInstance() { + return defaultInstance; + } + + public TableLock getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_TableLock_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_TableLock_fieldAccessorTable; + } + + private int bitField0_; + // required bytes tableName = 1; + public static final int TABLENAME_FIELD_NUMBER = 1; + private com.google.protobuf.ByteString tableName_; + public boolean hasTableName() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public com.google.protobuf.ByteString getTableName() { + return tableName_; + } + + // required .ServerName lockOwner = 2; + public static final int LOCKOWNER_FIELD_NUMBER = 2; + private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName lockOwner_; + public boolean hasLockOwner() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getLockOwner() { + return lockOwner_; + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getLockOwnerOrBuilder() { + return lockOwner_; + } + + // required int64 threadId = 3; + public static final int THREADID_FIELD_NUMBER = 3; + private long threadId_; + public boolean hasThreadId() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public long getThreadId() { + return threadId_; + } + + // required bool isShared = 4; + public static final int ISSHARED_FIELD_NUMBER = 4; + private boolean isShared_; + public boolean hasIsShared() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public boolean getIsShared() { + return isShared_; + } + + // optional string purpose = 5; + public static final int PURPOSE_FIELD_NUMBER = 5; + private java.lang.Object purpose_; + public boolean hasPurpose() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + public String getPurpose() { + java.lang.Object ref = purpose_; + if (ref instanceof String) { + return (String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + if (com.google.protobuf.Internal.isValidUtf8(bs)) { + purpose_ = s; + } + return s; + } + } + private com.google.protobuf.ByteString getPurposeBytes() { + java.lang.Object ref = purpose_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8((String) ref); + purpose_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + private void initFields() { + tableName_ = com.google.protobuf.ByteString.EMPTY; + lockOwner_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance(); + threadId_ = 0L; + isShared_ = false; + purpose_ = ""; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasTableName()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasLockOwner()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasThreadId()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasIsShared()) { + memoizedIsInitialized = 0; + return false; + } + if (!getLockOwner().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, tableName_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeMessage(2, lockOwner_); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeInt64(3, threadId_); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeBool(4, isShared_); + } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeBytes(5, getPurposeBytes()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(1, tableName_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(2, lockOwner_); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(3, threadId_); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(4, isShared_); + } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(5, getPurposeBytes()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock)) { + return super.equals(obj); + } + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock other = (org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock) obj; + + boolean result = true; + result = result && (hasTableName() == other.hasTableName()); + if (hasTableName()) { + result = result && getTableName() + .equals(other.getTableName()); + } + result = result && (hasLockOwner() == other.hasLockOwner()); + if (hasLockOwner()) { + result = result && getLockOwner() + .equals(other.getLockOwner()); + } + result = result && (hasThreadId() == other.hasThreadId()); + if (hasThreadId()) { + result = result && (getThreadId() + == other.getThreadId()); + } + result = result && (hasIsShared() == other.hasIsShared()); + if (hasIsShared()) { + result = result && (getIsShared() + == other.getIsShared()); + } + result = result && (hasPurpose() == other.hasPurpose()); + if (hasPurpose()) { + result = result && getPurpose() + .equals(other.getPurpose()); + } + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + @java.lang.Override + public int hashCode() { + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasTableName()) { + hash = (37 * hash) + TABLENAME_FIELD_NUMBER; + hash = (53 * hash) + getTableName().hashCode(); + } + if (hasLockOwner()) { + hash = (37 * hash) + LOCKOWNER_FIELD_NUMBER; + hash = (53 * hash) + getLockOwner().hashCode(); + } + if (hasThreadId()) { + hash = (37 * hash) + THREADID_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getThreadId()); + } + if (hasIsShared()) { + hash = (37 * hash) + ISSHARED_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getIsShared()); + } + if (hasPurpose()) { + hash = (37 * hash) + PURPOSE_FIELD_NUMBER; + hash = (53 * hash) + getPurpose().hashCode(); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + return hash; + } + + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLockOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_TableLock_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_TableLock_fieldAccessorTable; + } + + // Construct using org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder(BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + getLockOwnerFieldBuilder(); + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + tableName_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000001); + if (lockOwnerBuilder_ == null) { + lockOwner_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance(); + } else { + lockOwnerBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000002); + threadId_ = 0L; + bitField0_ = (bitField0_ & ~0x00000004); + isShared_ = false; + bitField0_ = (bitField0_ & ~0x00000008); + purpose_ = ""; + bitField0_ = (bitField0_ & ~0x00000010); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock.getDescriptor(); + } + + public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock getDefaultInstanceForType() { + return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock.getDefaultInstance(); + } + + public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock build() { + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + private org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return result; + } + + public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock buildPartial() { + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock result = new org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.tableName_ = tableName_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + if (lockOwnerBuilder_ == null) { + result.lockOwner_ = lockOwner_; + } else { + result.lockOwner_ = lockOwnerBuilder_.build(); + } + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.threadId_ = threadId_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000008; + } + result.isShared_ = isShared_; + if (((from_bitField0_ & 0x00000010) == 0x00000010)) { + to_bitField0_ |= 0x00000010; + } + result.purpose_ = purpose_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock) { + return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock other) { + if (other == org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock.getDefaultInstance()) return this; + if (other.hasTableName()) { + setTableName(other.getTableName()); + } + if (other.hasLockOwner()) { + mergeLockOwner(other.getLockOwner()); + } + if (other.hasThreadId()) { + setThreadId(other.getThreadId()); + } + if (other.hasIsShared()) { + setIsShared(other.getIsShared()); + } + if (other.hasPurpose()) { + setPurpose(other.getPurpose()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasTableName()) { + + return false; + } + if (!hasLockOwner()) { + + return false; + } + if (!hasThreadId()) { + + return false; + } + if (!hasIsShared()) { + + return false; + } + if (!getLockOwner().isInitialized()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + tableName_ = input.readBytes(); + break; + } + case 18: { + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder subBuilder = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder(); + if (hasLockOwner()) { + subBuilder.mergeFrom(getLockOwner()); + } + input.readMessage(subBuilder, extensionRegistry); + setLockOwner(subBuilder.buildPartial()); + break; + } + case 24: { + bitField0_ |= 0x00000004; + threadId_ = input.readInt64(); + break; + } + case 32: { + bitField0_ |= 0x00000008; + isShared_ = input.readBool(); + break; + } + case 42: { + bitField0_ |= 0x00000010; + purpose_ = input.readBytes(); + break; + } + } + } + } + + private int bitField0_; + + // required bytes tableName = 1; + private com.google.protobuf.ByteString tableName_ = com.google.protobuf.ByteString.EMPTY; + public boolean hasTableName() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public com.google.protobuf.ByteString getTableName() { + return tableName_; + } + public Builder setTableName(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + tableName_ = value; + onChanged(); + return this; + } + public Builder clearTableName() { + bitField0_ = (bitField0_ & ~0x00000001); + tableName_ = getDefaultInstance().getTableName(); + onChanged(); + return this; + } + + // required .ServerName lockOwner = 2; + private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName lockOwner_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance(); + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder> lockOwnerBuilder_; + public boolean hasLockOwner() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getLockOwner() { + if (lockOwnerBuilder_ == null) { + return lockOwner_; + } else { + return lockOwnerBuilder_.getMessage(); + } + } + public Builder setLockOwner(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName value) { + if (lockOwnerBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + lockOwner_ = value; + onChanged(); + } else { + lockOwnerBuilder_.setMessage(value); + } + bitField0_ |= 0x00000002; + return this; + } + public Builder setLockOwner( + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder builderForValue) { + if (lockOwnerBuilder_ == null) { + lockOwner_ = builderForValue.build(); + onChanged(); + } else { + lockOwnerBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000002; + return this; + } + public Builder mergeLockOwner(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName value) { + if (lockOwnerBuilder_ == null) { + if (((bitField0_ & 0x00000002) == 0x00000002) && + lockOwner_ != org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance()) { + lockOwner_ = + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder(lockOwner_).mergeFrom(value).buildPartial(); + } else { + lockOwner_ = value; + } + onChanged(); + } else { + lockOwnerBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000002; + return this; + } + public Builder clearLockOwner() { + if (lockOwnerBuilder_ == null) { + lockOwner_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance(); + onChanged(); + } else { + lockOwnerBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder getLockOwnerBuilder() { + bitField0_ |= 0x00000002; + onChanged(); + return getLockOwnerFieldBuilder().getBuilder(); + } + public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getLockOwnerOrBuilder() { + if (lockOwnerBuilder_ != null) { + return lockOwnerBuilder_.getMessageOrBuilder(); + } else { + return lockOwner_; + } + } + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder> + getLockOwnerFieldBuilder() { + if (lockOwnerBuilder_ == null) { + lockOwnerBuilder_ = new com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder>( + lockOwner_, + getParentForChildren(), + isClean()); + lockOwner_ = null; + } + return lockOwnerBuilder_; + } + + // required int64 threadId = 3; + private long threadId_ ; + public boolean hasThreadId() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public long getThreadId() { + return threadId_; + } + public Builder setThreadId(long value) { + bitField0_ |= 0x00000004; + threadId_ = value; + onChanged(); + return this; + } + public Builder clearThreadId() { + bitField0_ = (bitField0_ & ~0x00000004); + threadId_ = 0L; + onChanged(); + return this; + } + + // required bool isShared = 4; + private boolean isShared_ ; + public boolean hasIsShared() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public boolean getIsShared() { + return isShared_; + } + public Builder setIsShared(boolean value) { + bitField0_ |= 0x00000008; + isShared_ = value; + onChanged(); + return this; + } + public Builder clearIsShared() { + bitField0_ = (bitField0_ & ~0x00000008); + isShared_ = false; + onChanged(); + return this; + } + + // optional string purpose = 5; + private java.lang.Object purpose_ = ""; + public boolean hasPurpose() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + public String getPurpose() { + java.lang.Object ref = purpose_; + if (!(ref instanceof String)) { + String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); + purpose_ = s; + return s; + } else { + return (String) ref; + } + } + public Builder setPurpose(String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000010; + purpose_ = value; + onChanged(); + return this; + } + public Builder clearPurpose() { + bitField0_ = (bitField0_ & ~0x00000010); + purpose_ = getDefaultInstance().getPurpose(); + onChanged(); + return this; + } + void setPurpose(com.google.protobuf.ByteString value) { + bitField0_ |= 0x00000010; + purpose_ = value; + onChanged(); + } + + // @@protoc_insertion_point(builder_scope:TableLock) + } + + static { + defaultInstance = new TableLock(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:TableLock) + } + private static com.google.protobuf.Descriptors.Descriptor internal_static_RootRegionServer_descriptor; private static @@ -4952,6 +5753,11 @@ public final class ZooKeeperProtos { private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_ReplicationLock_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_TableLock_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_TableLock_fieldAccessorTable; public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { @@ -4980,8 +5786,11 @@ public final class ZooKeeperProtos { "tate.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" + "BLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n\010pos" + "ition\030\001 \002(\003\"$\n\017ReplicationLock\022\021\n\tlockOw" + - "ner\030\001 \002(\tBE\n*org.apache.hadoop.hbase.pro", - "tobuf.generatedB\017ZooKeeperProtosH\001\210\001\001\240\001\001" + "ner\030\001 \002(\t\"s\n\tTableLock\022\021\n\ttableName\030\001 \002(", + "\014\022\036\n\tlockOwner\030\002 \002(\0132\013.ServerName\022\020\n\010thr" + + "eadId\030\003 \002(\003\022\020\n\010isShared\030\004 \002(\010\022\017\n\007purpose" + + "\030\005 \001(\tBE\n*org.apache.hadoop.hbase.protob" + + "uf.generatedB\017ZooKeeperProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -5068,6 +5877,14 @@ public final class ZooKeeperProtos { new java.lang.String[] { "LockOwner", }, org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationLock.class, org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationLock.Builder.class); + internal_static_TableLock_descriptor = + getDescriptor().getMessageTypes().get(10); + internal_static_TableLock_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_TableLock_descriptor, + new java.lang.String[] { "TableName", "LockOwner", "ThreadId", "IsShared", "Purpose", }, + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock.class, + org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock.Builder.class); return null; } }; Modified: hbase/trunk/hbase-protocol/src/main/protobuf/ZooKeeper.proto URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/ZooKeeper.proto?rev=1448867&r1=1448866&r2=1448867&view=diff ============================================================================== --- hbase/trunk/hbase-protocol/src/main/protobuf/ZooKeeper.proto (original) +++ hbase/trunk/hbase-protocol/src/main/protobuf/ZooKeeper.proto Fri Feb 22 00:15:52 2013 @@ -133,3 +133,14 @@ message ReplicationHLogPosition { message ReplicationLock { required string lockOwner = 1; } + +/** + * Metadata associated with a table lock in zookeeper + */ +message TableLock { + optional bytes tableName = 1; + optional ServerName lockOwner = 2; + optional int64 threadId = 3; + optional bool isShared = 4; + optional string purpose = 5; +} Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/InterProcessLock.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/InterProcessLock.java?rev=1448867&view=auto ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/InterProcessLock.java (added) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/InterProcessLock.java Fri Feb 22 00:15:52 2013 @@ -0,0 +1,86 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * An interface for an application-specific lock. + */ +@InterfaceAudience.Private +public interface InterProcessLock { + + /** + * Acquire the lock, waiting indefinitely until the lock is released or + * the thread is interrupted. + * @throws IOException If there is an unrecoverable error releasing the lock + * @throws InterruptedException If current thread is interrupted while + * waiting for the lock + */ + public void acquire() throws IOException, InterruptedException; + + /** + * Acquire the lock within a wait time. + * @param timeoutMs The maximum time (in milliseconds) to wait for the lock, + * -1 to wait indefinitely + * @return True if the lock was acquired, false if waiting time elapsed + * before the lock was acquired + * @throws IOException If there is an unrecoverable error talking talking + * (e.g., when talking to a lock service) when acquiring + * the lock + * @throws InterruptedException If the thread is interrupted while waiting to + * acquire the lock + */ + public boolean tryAcquire(long timeoutMs) + throws IOException, InterruptedException; + + /** + * Release the lock. + * @throws IOException If there is an unrecoverable error releasing the lock + * @throws InterruptedException If the thread is interrupted while releasing + * the lock + */ + public void release() throws IOException, InterruptedException; + + /** + * If supported, attempts to reap all the locks of this type by forcefully + * deleting the locks. Lock reaping is different than coordinated lock revocation + * in that, there is no coordination, and the behavior is undefined if the + * lock holder is still alive. + * @throws IOException If there is an unrecoverable error reaping the locks + */ + public void reapAllLocks() throws IOException; + + /** + * An interface for objects that process lock metadata. + */ + public static interface MetadataHandler { + + /** + * Called after lock metadata is successfully read from a distributed + * lock service. This method may contain any procedures for, e.g., + * printing the metadata in a humanly-readable format. + * @param metadata The metadata + */ + public void handleMetadata(byte[] metadata); + } +} Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/InterProcessReadWriteLock.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/InterProcessReadWriteLock.java?rev=1448867&view=auto ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/InterProcessReadWriteLock.java (added) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/InterProcessReadWriteLock.java Fri Feb 22 00:15:52 2013 @@ -0,0 +1,45 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * An interface for a distributed reader-writer lock. + */ +@InterfaceAudience.Private +public interface InterProcessReadWriteLock { + + /** + * Obtain a reader lock containing given metadata. + * @param metadata Serialized lock metadata (this may contain information + * such as the process owning the lock or the purpose for + * which the lock was acquired). Must not be null. + * @return An instantiated InterProcessReadWriteLock instance + */ + public InterProcessLock readLock(byte[] metadata); + + /** + * Obtain a writer lock containing given metadata. + * @param metadata See documentation of metadata parameter in readLock() + * @return An instantiated InterProcessReadWriteLock instance + */ + public InterProcessLock writeLock(byte[] metadata); +} Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/LockTimeoutException.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/LockTimeoutException.java?rev=1448867&view=auto ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/LockTimeoutException.java (added) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/LockTimeoutException.java Fri Feb 22 00:15:52 2013 @@ -0,0 +1,37 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +public class LockTimeoutException extends IOException { + + private static final long serialVersionUID = -1770764924258999825L; + + /** Default constructor */ + public LockTimeoutException() { + super(); + } + + public LockTimeoutException(String s) { + super(s); + } + +} Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java?rev=1448867&r1=1448866&r2=1448867&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java Fri Feb 22 00:15:52 2013 @@ -33,10 +33,10 @@ import org.cloudera.htrace.Trace; /** * Abstract base class for all HBase event handlers. Subclasses should - * implement the {@link #process()} method. Subclasses should also do all - * necessary checks up in their constructor if possible -- check table exists, - * is disabled, etc. -- so they fail fast rather than later when process is - * running. Do it this way because process be invoked directly but event + * implement the {@link #process()} and {@link #prepare()} methods. Subclasses + * should also do all necessary checks up in their prepare() if possible -- check + * table exists, is disabled, etc. -- so they fail fast rather than later when process + * is running. Do it this way because process be invoked directly but event * handlers are also * run in an executor context -- i.e. asynchronously -- and in this case, * exceptions thrown at process time will not be seen by the invoker, not till @@ -102,7 +102,7 @@ public abstract class EventHandler imple * originated and then where its destined -- e.g. RS2ZK_ prefix means the * event came from a regionserver destined for zookeeper -- and then what * the even is; e.g. REGION_OPENING. - * + * *

We give the enums indices so we can add types later and keep them * grouped together rather than have to add them always to the end as we * would have to if we used raw enum ordinals. @@ -202,6 +202,19 @@ public abstract class EventHandler imple } } + /** + * Event handlers should do all the necessary checks in this method (rather than + * in the constructor, or in process()) so that the caller, which is mostly executed + * in the ipc context can fail fast. Process is executed async from the client ipc, + * so this method gives a quick chance to do some basic checks. + * Should be called after constructing the EventHandler, and before process(). + * @return the instance of this class + * @throws Exception when something goes wrong + */ + public EventHandler prepare() throws Exception { + return this; + } + public void run() { Span chunk = Trace.startSpan(Thread.currentThread().getName(), parent, Sampler.ALWAYS); @@ -282,7 +295,7 @@ public abstract class EventHandler imple public synchronized void setListener(EventHandlerListener listener) { this.listener = listener; } - + @Override public String toString() { return "Event #" + getSeqid() + Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1448867&r1=1448866&r2=1448867&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Fri Feb 22 00:15:52 2013 @@ -112,6 +112,8 @@ public class AssignmentManager extends Z private LoadBalancer balancer; + private final TableLockManager tableLockManager; + final private KeyLocker locker = new KeyLocker(); /** @@ -192,7 +194,8 @@ public class AssignmentManager extends Z */ public AssignmentManager(Server server, ServerManager serverManager, CatalogTracker catalogTracker, final LoadBalancer balancer, - final ExecutorService service, MetricsMaster metricsMaster) throws KeeperException, IOException { + final ExecutorService service, MetricsMaster metricsMaster, + final TableLockManager tableLockManager) throws KeeperException, IOException { super(server.getZooKeeper()); this.server = server; this.serverManager = serverManager; @@ -228,6 +231,7 @@ public class AssignmentManager extends Z ThreadFactory threadFactory = Threads.newDaemonThreadFactory("hbase-am-zkevent-worker"); zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L, TimeUnit.SECONDS, threadFactory); + this.tableLockManager = tableLockManager; } void startTimeOutMonitor() { @@ -301,7 +305,7 @@ public class AssignmentManager extends Z public Pair getReopenStatus(byte[] tableName) throws IOException { List hris = - MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName); + MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName, true); Integer pending = 0; for (HRegionInfo hri : hris) { String name = hri.getEncodedName(); @@ -1258,7 +1262,7 @@ public class AssignmentManager extends Z */ public void regionOffline(final HRegionInfo regionInfo) { regionStates.regionOffline(regionInfo); - + removeClosedRegion(regionInfo); // remove the region plan as well just in case. clearRegionPlan(regionInfo); } @@ -2408,8 +2412,8 @@ public class AssignmentManager extends Z LOG.info("The table " + tableName + " is in DISABLING state. Hence recovering by moving the table" + " to DISABLED state."); - new DisableTableHandler(this.server, tableName.getBytes(), - catalogTracker, this, true).process(); + new DisableTableHandler(this.server, tableName.getBytes(), catalogTracker, + this, tableLockManager, true).prepare().process(); } } } @@ -2434,7 +2438,7 @@ public class AssignmentManager extends Z // enableTable in sync way during master startup, // no need to invoke coprocessor new EnableTableHandler(this.server, tableName.getBytes(), - catalogTracker, this, true).process(); + catalogTracker, this, tableLockManager, true).prepare().process(); } } } Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1448867&r1=1448866&r2=1448867&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Fri Feb 22 00:15:52 2013 @@ -316,6 +316,9 @@ Server { private TableDescriptors tableDescriptors; + // Table level lock manager for schema changes + private TableLockManager tableLockManager; + // Time stamps for when a hmaster was started and when it became active private long masterStartTime; private long masterActiveTime; @@ -566,7 +569,8 @@ Server { this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this); this.loadBalancerTracker.start(); this.assignmentManager = new AssignmentManager(this, serverManager, - this.catalogTracker, this.balancer, this.executorService, this.metricsMaster); + this.catalogTracker, this.balancer, this.executorService, this.metricsMaster, + this.tableLockManager); zooKeeper.registerListenerFirst(assignmentManager); this.regionServerTracker = new RegionServerTracker(zooKeeper, this, @@ -703,6 +707,13 @@ Server { startServiceThreads(); } + //Initialize table lock manager, and ensure that all write locks held previously + //are invalidated + this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper, serverName); + if (!masterRecovery) { + this.tableLockManager.reapAllTableWriteLocks(); + } + // Wait for region servers to report in. this.serverManager.waitForRegionServers(status); // Check zk for region servers that are up but didn't register @@ -1508,7 +1519,7 @@ Server { this.executorService.submit(new CreateTableHandler(this, this.fileSystemManager, hTableDescriptor, conf, - newRegions, catalogTracker, assignmentManager)); + newRegions, this).prepare()); if (cpHost != null) { cpHost.postCreateTable(hTableDescriptor, newRegions); } @@ -1575,7 +1586,7 @@ Server { if (cpHost != null) { cpHost.preDeleteTable(tableName); } - this.executorService.submit(new DeleteTableHandler(tableName, this, this)); + this.executorService.submit(new DeleteTableHandler(tableName, this, this).prepare()); if (cpHost != null) { cpHost.postDeleteTable(tableName); } @@ -1629,7 +1640,9 @@ Server { return; } } - new TableAddFamilyHandler(tableName, column, this, this).process(); + //TODO: we should process this (and some others) in an executor + new TableAddFamilyHandler(tableName, column, this, this) + .prepare().process(); if (cpHost != null) { cpHost.postAddColumn(tableName, column); } @@ -1657,7 +1670,8 @@ Server { return; } } - new TableModifyFamilyHandler(tableName, descriptor, this, this).process(); + new TableModifyFamilyHandler(tableName, descriptor, this, this) + .prepare().process(); if (cpHost != null) { cpHost.postModifyColumn(tableName, descriptor); } @@ -1684,7 +1698,7 @@ Server { return; } } - new TableDeleteFamilyHandler(tableName, columnName, this, this).process(); + new TableDeleteFamilyHandler(tableName, columnName, this, this).prepare().process(); if (cpHost != null) { cpHost.postDeleteColumn(tableName, columnName); } @@ -1708,7 +1722,7 @@ Server { cpHost.preEnableTable(tableName); } this.executorService.submit(new EnableTableHandler(this, tableName, - catalogTracker, assignmentManager, false)); + catalogTracker, assignmentManager, tableLockManager, false).prepare()); if (cpHost != null) { cpHost.postEnableTable(tableName); } @@ -1732,7 +1746,7 @@ Server { cpHost.preDisableTable(tableName); } this.executorService.submit(new DisableTableHandler(this, tableName, - catalogTracker, assignmentManager, false)); + catalogTracker, assignmentManager, tableLockManager, false).prepare()); if (cpHost != null) { cpHost.postDisableTable(tableName); } @@ -1792,8 +1806,7 @@ Server { if (cpHost != null) { cpHost.preModifyTable(tableName, descriptor); } - new ModifyTableHandler(tableName, descriptor, this, this).process(); - + new ModifyTableHandler(tableName, descriptor, this, this).prepare().process(); if (cpHost != null) { cpHost.postModifyTable(tableName, descriptor); } @@ -2056,12 +2069,17 @@ Server { return this.assignmentManager; } + @Override + public TableLockManager getTableLockManager() { + return this.tableLockManager; + } + public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() { return rsFatals; } public void shutdown() throws IOException { - if (spanReceiverHost != null) { + if (spanReceiverHost != null) { spanReceiverHost.closeReceivers(); } if (cpHost != null) { Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java?rev=1448867&r1=1448866&r2=1448867&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java Fri Feb 22 00:15:52 2013 @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; -import com.google.protobuf.Service; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; @@ -30,6 +29,8 @@ import org.apache.hadoop.hbase.TableNotD import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.executor.ExecutorService; +import com.google.protobuf.Service; + /** * Services Master supplies */ @@ -56,6 +57,11 @@ public interface MasterServices extends public ExecutorService getExecutorService(); /** + * @return Master's instance of {@link TableLockManager} + */ + public TableLockManager getTableLockManager(); + + /** * @return Master's instance of {@link MasterCoprocessorHost} */ public MasterCoprocessorHost getCoprocessorHost(); Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java?rev=1448867&view=auto ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java (added) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java Fri Feb 22 00:15:52 2013 @@ -0,0 +1,387 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.InterProcessLock; +import org.apache.hadoop.hbase.InterProcessLock.MetadataHandler; +import org.apache.hadoop.hbase.InterProcessReadWriteLock; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.LockTimeoutException; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.hbase.zookeeper.lock.ZKInterProcessReadWriteLock; +import org.apache.zookeeper.KeeperException; + +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; + +/** + * A manager for distributed table level locks. + */ +@InterfaceAudience.Private +public abstract class TableLockManager { + + private static final Log LOG = LogFactory.getLog(TableLockManager.class); + + /** Configuration key for enabling table-level locks for schema changes */ + public static final String TABLE_LOCK_ENABLE = + "hbase.table.lock.enable"; + + /** by default we should enable table-level locks for schema changes */ + private static final boolean DEFAULT_TABLE_LOCK_ENABLE = true; + + /** Configuration key for time out for trying to acquire table locks */ + protected static final String TABLE_WRITE_LOCK_TIMEOUT_MS = + "hbase.table.write.lock.timeout.ms"; + + /** Configuration key for time out for trying to acquire table locks */ + protected static final String TABLE_READ_LOCK_TIMEOUT_MS = + "hbase.table.read.lock.timeout.ms"; + + protected static final int DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS = + 600 * 1000; //10 min default + + protected static final int DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS = + 600 * 1000; //10 min default + + /** + * A distributed lock for a table. + */ + @InterfaceAudience.Private + public static interface TableLock { + /** + * Acquire the lock, with the configured lock timeout. + * @throws LockTimeoutException If unable to acquire a lock within a specified + * time period (if any) + * @throws IOException If unrecoverable error occurs + */ + public void acquire() throws IOException; + + /** + * Release the lock already held. + * @throws IOException If there is an unrecoverable error releasing the lock + */ + public void release() throws IOException; + } + + /** + * Returns a TableLock for locking the table for exclusive access + * @param tableName Table to lock + * @param purpose Human readable reason for locking the table + * @return A new TableLock object for acquiring a write lock + */ + public abstract TableLock writeLock(byte[] tableName, String purpose); + + /** + * Returns a TableLock for locking the table for shared access among read-lock holders + * @param tableName Table to lock + * @param purpose Human readable reason for locking the table + * @return A new TableLock object for acquiring a read lock + */ + public abstract TableLock readLock(byte[] tableName, String purpose); + + /** + * Force releases all table write locks and lock attempts even if this thread does + * not own the lock. The behavior of the lock holders still thinking that they + * have the lock is undefined. This should be used carefully and only when + * we can ensure that all write-lock holders have died. For example if only + * the master can hold write locks, then we can reap it's locks when the backup + * master starts. + */ + public abstract void reapAllTableWriteLocks() throws IOException; + + /** + * Called after a table has been deleted, and after the table lock is released. + * TableLockManager should do cleanup for the table state. + * @param tableName name of the table + * @throws IOException If there is an unrecoverable error releasing the lock + */ + public abstract void tableDeleted(byte[] tableName) + throws IOException; + + /** + * Creates and returns a TableLockManager according to the configuration + */ + public static TableLockManager createTableLockManager(Configuration conf, + ZooKeeperWatcher zkWatcher, ServerName serverName) { + // Initialize table level lock manager for schema changes, if enabled. + if (conf.getBoolean(TABLE_LOCK_ENABLE, + DEFAULT_TABLE_LOCK_ENABLE)) { + int writeLockTimeoutMs = conf.getInt(TABLE_WRITE_LOCK_TIMEOUT_MS, + DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS); + int readLockTimeoutMs = conf.getInt(TABLE_READ_LOCK_TIMEOUT_MS, + DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS); + return new ZKTableLockManager(zkWatcher, serverName, writeLockTimeoutMs, readLockTimeoutMs); + } + + return new NullTableLockManager(); + } + + /** + * A null implementation + */ + @InterfaceAudience.Private + static class NullTableLockManager extends TableLockManager { + static class NullTableLock implements TableLock { + @Override + public void acquire() throws IOException { + } + @Override + public void release() throws IOException { + } + } + @Override + public TableLock writeLock(byte[] tableName, String purpose) { + return new NullTableLock(); + } + @Override + public TableLock readLock(byte[] tableName, String purpose) { + return new NullTableLock(); + } + @Override + public void reapAllTableWriteLocks() throws IOException { + } + @Override + public void tableDeleted(byte[] tableName) throws IOException { + } + } + + /** + * ZooKeeper based TableLockManager + */ + @InterfaceAudience.Private + private static class ZKTableLockManager extends TableLockManager { + + private static final MetadataHandler METADATA_HANDLER = new MetadataHandler() { + @Override + public void handleMetadata(byte[] ownerMetadata) { + if (!LOG.isDebugEnabled()) { + return; + } + ZooKeeperProtos.TableLock data = fromBytes(ownerMetadata); + if (data == null) { + return; + } + LOG.debug("Table is locked by: " + + String.format("[tableName=%s, lockOwner=%s, threadId=%s, " + + "purpose=%s, isShared=%s]", Bytes.toString(data.getTableName().toByteArray()), + ProtobufUtil.toServerName(data.getLockOwner()), data.getThreadId(), + data.getPurpose(), data.getIsShared())); + } + }; + + private static class TableLockImpl implements TableLock { + long lockTimeoutMs; + byte[] tableName; + String tableNameStr; + InterProcessLock lock; + boolean isShared; + ZooKeeperWatcher zkWatcher; + ServerName serverName; + String purpose; + + public TableLockImpl(byte[] tableName, ZooKeeperWatcher zkWatcher, + ServerName serverName, long lockTimeoutMs, boolean isShared, String purpose) { + this.tableName = tableName; + tableNameStr = Bytes.toString(tableName); + this.zkWatcher = zkWatcher; + this.serverName = serverName; + this.lockTimeoutMs = lockTimeoutMs; + this.isShared = isShared; + this.purpose = purpose; + } + + @Override + public void acquire() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Attempt to acquire table " + (isShared ? "read" : "write") + + " lock on :" + tableNameStr + " for:" + purpose); + } + + lock = createTableLock(); + try { + if (lockTimeoutMs == -1) { + // Wait indefinitely + lock.acquire(); + } else { + if (!lock.tryAcquire(lockTimeoutMs)) { + throw new LockTimeoutException("Timed out acquiring " + + (isShared ? "read" : "write") + "lock for table:" + tableNameStr + + "for:" + purpose + " after " + lockTimeoutMs + " ms."); + } + } + } catch (InterruptedException e) { + LOG.warn("Interrupted acquiring a lock for " + tableNameStr, e); + Thread.currentThread().interrupt(); + throw new InterruptedIOException("Interrupted acquiring a lock"); + } + LOG.debug("Acquired table " + (isShared ? "read" : "write") + + " lock on :" + tableNameStr + " for:" + purpose); + } + + @Override + public void release() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Attempt to release table " + (isShared ? "read" : "write") + + " lock on :" + tableNameStr); + } + if (lock == null) { + throw new IllegalStateException("Table " + tableNameStr + + " is not locked!"); + } + + try { + lock.release(); + } catch (InterruptedException e) { + LOG.warn("Interrupted while releasing a lock for " + tableNameStr); + Thread.currentThread().interrupt(); + throw new InterruptedIOException(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Released table lock on :" + tableNameStr); + } + } + + private InterProcessLock createTableLock() { + String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableNameStr); + + ZooKeeperProtos.TableLock data = ZooKeeperProtos.TableLock.newBuilder() + .setTableName(ByteString.copyFrom(tableName)) + .setLockOwner(ProtobufUtil.toServerName(serverName)) + .setThreadId(Thread.currentThread().getId()) + .setPurpose(purpose) + .setIsShared(isShared).build(); + byte[] lockMetadata = toBytes(data); + + InterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(zkWatcher, tableLockZNode, + METADATA_HANDLER); + return isShared ? lock.readLock(lockMetadata) : lock.writeLock(lockMetadata); + } + } + + private static byte[] toBytes(ZooKeeperProtos.TableLock data) { + return ProtobufUtil.prependPBMagic(data.toByteArray()); + } + + private static ZooKeeperProtos.TableLock fromBytes(byte[] bytes) { + int pblen = ProtobufUtil.lengthOfPBMagic(); + if (bytes == null || bytes.length < pblen) { + return null; + } + try { + ZooKeeperProtos.TableLock data = ZooKeeperProtos.TableLock.newBuilder().mergeFrom( + bytes, pblen, bytes.length - pblen).build(); + return data; + } catch (InvalidProtocolBufferException ex) { + LOG.warn("Exception in deserialization", ex); + } + return null; + } + + private final ServerName serverName; + private final ZooKeeperWatcher zkWatcher; + private final long writeLockTimeoutMs; + private final long readLockTimeoutMs; + + /** + * Initialize a new manager for table-level locks. + * @param zkWatcher + * @param serverName Address of the server responsible for acquiring and + * releasing the table-level locks + * @param writeLockTimeoutMs Timeout (in milliseconds) for acquiring a write lock for a + * given table, or -1 for no timeout + * @param readLockTimeoutMs Timeout (in milliseconds) for acquiring a read lock for a + * given table, or -1 for no timeout + */ + public ZKTableLockManager(ZooKeeperWatcher zkWatcher, + ServerName serverName, long writeLockTimeoutMs, long readLockTimeoutMs) { + this.zkWatcher = zkWatcher; + this.serverName = serverName; + this.writeLockTimeoutMs = writeLockTimeoutMs; + this.readLockTimeoutMs = readLockTimeoutMs; + } + + @Override + public TableLock writeLock(byte[] tableName, String purpose) { + return new TableLockImpl(tableName, zkWatcher, + serverName, writeLockTimeoutMs, false, purpose); + } + + public TableLock readLock(byte[] tableName, String purpose) { + return new TableLockImpl(tableName, zkWatcher, + serverName, readLockTimeoutMs, true, purpose); + } + + @Override + public void reapAllTableWriteLocks() throws IOException { + //get the table names + try { + List tableNames; + try { + tableNames = ZKUtil.listChildrenNoWatch(zkWatcher, zkWatcher.tableLockZNode); + } catch (KeeperException e) { + LOG.error("Unexpected ZooKeeper error when listing children", e); + throw new IOException("Unexpected ZooKeeper exception", e); + } + + for (String tableName : tableNames) { + String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName); + ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock( + zkWatcher, tableLockZNode, null); + lock.writeLock(null).reapAllLocks(); + } + } catch (IOException ex) { + throw ex; + } catch (Exception ex) { + LOG.warn("Caught exception while reaping table write locks", ex); + } + } + + @Override + public void tableDeleted(byte[] tableName) throws IOException { + //table write lock from DeleteHandler is already released, just delete the parent znode + String tableNameStr = Bytes.toString(tableName); + String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableNameStr); + try { + ZKUtil.deleteNode(zkWatcher, tableLockZNode); + } catch (KeeperException ex) { + if (ex.code() == KeeperException.Code.NOTEMPTY) { + //we might get this in rare occasions where a CREATE table or some other table operation + //is waiting to acquire the lock. In this case, parent znode won't be deleted. + LOG.warn("Could not delete the znode for table locks because NOTEMPTY: " + + tableLockZNode); + return; + } + throw new IOException(ex); + } + } + } +} Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java?rev=1448867&r1=1448866&r2=1448867&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java Fri Feb 22 00:15:52 2013 @@ -50,6 +50,9 @@ import org.apache.hadoop.hbase.master.As import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterFileSystem; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.Threads; @@ -66,21 +69,29 @@ public class CreateTableHandler extends protected final Configuration conf; private final AssignmentManager assignmentManager; private final CatalogTracker catalogTracker; + private final TableLockManager tableLockManager; private final HRegionInfo [] newRegions; + private final TableLock tableLock; public CreateTableHandler(Server server, MasterFileSystem fileSystemManager, HTableDescriptor hTableDescriptor, Configuration conf, HRegionInfo [] newRegions, - CatalogTracker catalogTracker, AssignmentManager assignmentManager) - throws NotAllMetaRegionsOnlineException, TableExistsException, IOException { + MasterServices masterServices) { super(server, EventType.C_M_CREATE_TABLE); this.fileSystemManager = fileSystemManager; this.hTableDescriptor = hTableDescriptor; this.conf = conf; this.newRegions = newRegions; - this.catalogTracker = catalogTracker; - this.assignmentManager = assignmentManager; + this.catalogTracker = masterServices.getCatalogTracker(); + this.assignmentManager = masterServices.getAssignmentManager(); + this.tableLockManager = masterServices.getTableLockManager(); + this.tableLock = this.tableLockManager.writeLock(this.hTableDescriptor.getName() + , EventType.C_M_CREATE_TABLE.toString()); + } + + public CreateTableHandler prepare() + throws NotAllMetaRegionsOnlineException, TableExistsException, IOException { int timeout = conf.getInt("hbase.client.catalog.timeout", 10000); // Need META availability to create a table try { @@ -94,28 +105,40 @@ public class CreateTableHandler extends throw ie; } - String tableName = this.hTableDescriptor.getNameAsString(); - if (MetaReader.tableExists(catalogTracker, tableName)) { - throw new TableExistsException(tableName); - } - - // If we have multiple client threads trying to create the table at the - // same time, given the async nature of the operation, the table - // could be in a state where .META. table hasn't been updated yet in - // the process() function. - // Use enabling state to tell if there is already a request for the same - // table in progress. This will introduce a new zookeeper call. Given - // createTable isn't a frequent operation, that should be ok. + //acquire the table write lock, blocking. Make sure that it is released. + this.tableLock.acquire(); + boolean success = false; try { - if (!this.assignmentManager.getZKTable().checkAndSetEnablingTable(tableName)) + String tableName = this.hTableDescriptor.getNameAsString(); + if (MetaReader.tableExists(catalogTracker, tableName)) { throw new TableExistsException(tableName); - } catch (KeeperException e) { - throw new IOException("Unable to ensure that the table will be" + - " enabling because of a ZooKeeper issue", e); + } + + // If we have multiple client threads trying to create the table at the + // same time, given the async nature of the operation, the table + // could be in a state where .META. table hasn't been updated yet in + // the process() function. + // Use enabling state to tell if there is already a request for the same + // table in progress. This will introduce a new zookeeper call. Given + // createTable isn't a frequent operation, that should be ok. + //TODO: now that we have table locks, re-evaluate above + try { + if (!this.assignmentManager.getZKTable().checkAndSetEnablingTable(tableName)) { + throw new TableExistsException(tableName); + } + } catch (KeeperException e) { + throw new IOException("Unable to ensure that the table will be" + + " enabling because of a ZooKeeper issue", e); + } + success = true; + } finally { + if (!success) { + releaseTableLock(); + } } + return this; } - @Override public String toString() { String name = "UnknownServerName"; @@ -129,8 +152,9 @@ public class CreateTableHandler extends @Override public void process() { String tableName = this.hTableDescriptor.getNameAsString(); + LOG.info("Attempting to create the table " + tableName); + try { - LOG.info("Attempting to create the table " + tableName); MasterCoprocessorHost cpHost = ((HMaster) this.server).getCoprocessorHost(); if (cpHost != null) { cpHost.preCreateTableHandler(this.hTableDescriptor, this.newRegions); @@ -207,6 +231,18 @@ public class CreateTableHandler extends } catch (KeeperException e) { throw new IOException("Unable to ensure that " + tableName + " will be" + " enabled because of a ZooKeeper issue", e); + } finally { + releaseTableLock(); + } + } + + private void releaseTableLock() { + if (this.tableLock != null) { + try { + this.tableLock.release(); + } catch (IOException ex) { + LOG.warn("Could not release the table lock", ex); + } } } Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java?rev=1448867&r1=1448866&r2=1448867&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java Fri Feb 22 00:15:52 2013 @@ -44,9 +44,12 @@ public class DeleteTableHandler extends private static final Log LOG = LogFactory.getLog(DeleteTableHandler.class); public DeleteTableHandler(byte [] tableName, Server server, - final MasterServices masterServices) - throws IOException { + final MasterServices masterServices) { super(EventType.C_M_DELETE_TABLE, tableName, server, masterServices); + } + + @Override + protected void prepareWithTableLock() throws IOException { // The next call fails if no such table. getTableDescriptor(); } @@ -114,6 +117,16 @@ public class DeleteTableHandler extends } @Override + protected void releaseTableLock() { + super.releaseTableLock(); + try { + masterServices.getTableLockManager().tableDeleted(tableName); + } catch (IOException ex) { + LOG.warn("Received exception from TableLockManager.tableDeleted:", ex); //not critical + } + } + + @Override public String toString() { String name = "UnknownServerName"; if(server != null && server.getServerName() != null) { Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java?rev=1448867&r1=1448866&r2=1448867&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java Fri Feb 22 00:15:52 2013 @@ -37,6 +37,8 @@ import org.apache.hadoop.hbase.master.Bu import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.RegionStates; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; import org.apache.hadoop.hbase.util.Bytes; import org.apache.zookeeper.KeeperException; import org.cloudera.htrace.Trace; @@ -50,40 +52,62 @@ public class DisableTableHandler extends private final byte [] tableName; private final String tableNameStr; private final AssignmentManager assignmentManager; + private final TableLockManager tableLockManager; + private final CatalogTracker catalogTracker; + private final boolean skipTableStateCheck; + private TableLock tableLock; public DisableTableHandler(Server server, byte [] tableName, CatalogTracker catalogTracker, AssignmentManager assignmentManager, - boolean skipTableStateCheck) - throws TableNotFoundException, TableNotEnabledException, IOException { + TableLockManager tableLockManager, boolean skipTableStateCheck) { super(server, EventType.C_M_DISABLE_TABLE); this.tableName = tableName; this.tableNameStr = Bytes.toString(this.tableName); this.assignmentManager = assignmentManager; - // Check if table exists - // TODO: do we want to keep this in-memory as well? i guess this is - // part of old master rewrite, schema to zk to check for table - // existence and such - if (!MetaReader.tableExists(catalogTracker, this.tableNameStr)) { - throw new TableNotFoundException(this.tableNameStr); - } + this.catalogTracker = catalogTracker; + this.tableLockManager = tableLockManager; + this.skipTableStateCheck = skipTableStateCheck; + } - // There could be multiple client requests trying to disable or enable - // the table at the same time. Ensure only the first request is honored - // After that, no other requests can be accepted until the table reaches - // DISABLED or ENABLED. - if (!skipTableStateCheck) - { - try { - if (!this.assignmentManager.getZKTable().checkEnabledAndSetDisablingTable - (this.tableNameStr)) { - LOG.info("Table " + tableNameStr + " isn't enabled; skipping disable"); - throw new TableNotEnabledException(this.tableNameStr); + public DisableTableHandler prepare() + throws TableNotFoundException, TableNotEnabledException, IOException { + //acquire the table write lock, blocking + this.tableLock = this.tableLockManager.writeLock(tableName, + EventType.C_M_DISABLE_TABLE.toString()); + this.tableLock.acquire(); + + boolean success = false; + try { + // Check if table exists + if (!MetaReader.tableExists(catalogTracker, this.tableNameStr)) { + throw new TableNotFoundException(this.tableNameStr); + } + + // There could be multiple client requests trying to disable or enable + // the table at the same time. Ensure only the first request is honored + // After that, no other requests can be accepted until the table reaches + // DISABLED or ENABLED. + //TODO: reevaluate this since we have table locks now + if (!skipTableStateCheck) { + try { + if (!this.assignmentManager.getZKTable().checkEnabledAndSetDisablingTable + (this.tableNameStr)) { + LOG.info("Table " + tableNameStr + " isn't enabled; skipping disable"); + throw new TableNotEnabledException(this.tableNameStr); + } + } catch (KeeperException e) { + throw new IOException("Unable to ensure that the table will be" + + " disabling because of a ZooKeeper issue", e); } - } catch (KeeperException e) { - throw new IOException("Unable to ensure that the table will be" + - " disabling because of a ZooKeeper issue", e); + } + success = true; + } finally { + if (!success) { + releaseTableLock(); } } + + return this; } @Override @@ -113,6 +137,18 @@ public class DisableTableHandler extends LOG.error("Error trying to disable table " + this.tableNameStr, e); } catch (KeeperException e) { LOG.error("Error trying to disable table " + this.tableNameStr, e); + } finally { + releaseTableLock(); + } + } + + private void releaseTableLock() { + if (this.tableLock != null) { + try { + this.tableLock.release(); + } catch (IOException ex) { + LOG.warn("Could not release the table lock", ex); + } } }