hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbau...@apache.org
Subject svn commit: r1334557 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/hadoop/hbase/util/ main/java/org/apache/hadoop/hbase/zookeeper/ test/java/org/apache/hadoop/hbase/m...
Date Sun, 06 May 2012 04:35:52 GMT
Author: mbautin
Date: Sun May  6 04:35:51 2012
New Revision: 1334557

URL: http://svn.apache.org/viewvc?rev=1334557&view=rev
Log:
[jira] [HBASE-5494] [89-fb] Table-level locks for schema changing operations.

Author: avf

Summary:

Since concurrent modification (e.g., disabling and dropping a table under
creation) could leave a cluster in an inconsistent state, we need table-level
locks for schema changing operations.

A ZooKeeper-based distributed lock has been implemented that
attempts to create a persistent ZNode (one ZNode per entity being locked, i.e.,
one per table) if one does not exist. Currently in case a master crashes while
holding the lock, the lock must be manually removed using the ZooKeeper command
line (locks being stored in "/hbase/tableLock/").

The locks implemented are not fair or re-entrant. RecoverableZooKeeper is used
to correctly handle connection loss.

To test the locks, InjectionHandler and InjectionEvent have been introduced,
allowing for injection of arbitrary events, in this case adding delays during
schema changing operations as to induce a race condition.

Future work involves automatically deleting stale lock ZNodes upon server
recovery (providing the attempted operations are not resumed), adding metrics
around locks (e.g., list all locks held).

Task ID: #766044

Blame Rev:

Reviewers: kannan, mbautin, liyintang, JIRA

CC:

Test Plan:

1) Unit tests:

To verify the locks themselves work, invoke TestZKDistributedLock unit test. To
verify whether or not locks prevent race conditions in schema modification,
invoke the TestSchemaModificationLocks unit test.

2) Stress test:

This can also be stress tested on a realistic cluster by launching many
concurrent modifications and verifying that they are serialized, and profile
the wait times for different locks.

Revert Plan:

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/TableLockTimeoutException.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionHandler.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/DistributedLock.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSchemaModificationLocks.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/DelayInducingInjectionHandler.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/zookeeper/TestDistributedLock.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1334557&r1=1334556&r2=1334557&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Sun May  6 04:35:51 2012
@@ -93,6 +93,20 @@ public final class HConstants {
   /** by default every master is a possible primary master unless the conf explicitly overrides it */
   public static final boolean DEFAULT_MASTER_TYPE_BACKUP = false;
 
+  /** Configuration key for enabling table-level locks for schema changes */
+  public static final String MASTER_SCHEMA_CHANGES_LOCK_ENABLE =
+    "hbase.master.schemaChanges.lock.enable";
+
+  /** by default we should enable table-level locks for schema changes */
+  public static final boolean DEFAULT_MASTER_SCHEMA_CHANGES_LOCK_ENABLE = true;
+
+  /** Configuration key for time out for schema modification locks */
+  public static final String MASTER_SCHEMA_CHANGES_LOCK_TIMEOUT_MS =
+    "hbase.master.schemaChanges.lock.timeout.ms";
+
+  public static final int DEFAULT_MASTER_SCHEMA_CHANGES_LOCK_TIMEOUT_MS =
+    60 * 1000;
+
   /** Name of ZooKeeper quorum configuration parameter. */
   public static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
 

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/TableLockTimeoutException.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/TableLockTimeoutException.java?rev=1334557&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/TableLockTimeoutException.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/TableLockTimeoutException.java Sun May  6 04:35:51 2012
@@ -0,0 +1,37 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+
+public class TableLockTimeoutException extends IOException {
+
+  private static final long serialVersionUID = -1770764924258999825L;
+
+  /** Default constructor */
+  public TableLockTimeoutException() {
+    super();
+  }
+
+  public TableLockTimeoutException(String s) {
+    super(s);
+  }
+
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1334557&r1=1334556&r2=1334557&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Sun May  6 04:35:51 2012
@@ -78,7 +78,6 @@ import org.apache.hadoop.hbase.MasterNot
 import org.apache.hadoop.hbase.MiniZooKeeperCluster;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.StopStatus;
-import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -107,6 +106,8 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.InfoServer;
+import org.apache.hadoop.hbase.util.InjectionEvent;
+import org.apache.hadoop.hbase.util.InjectionHandler;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.RuntimeHaltAbortStrategy;
 import org.apache.hadoop.hbase.util.Sleeper;
@@ -141,6 +142,8 @@ public class HMaster extends Thread impl
   //instance into web context.
   public static final String MASTER = "master";
   private static final Log LOG = LogFactory.getLog(HMaster.class.getName());
+
+
   private static final String LOCALITY_SNAPSHOT_FILE_NAME = "regionLocality-snapshot";
 
   /**
@@ -179,6 +182,8 @@ public class HMaster extends Thread impl
   private ZooKeeperWrapper zooKeeperWrapper;
   // Watcher for master address and for cluster shutdown.
   private final ZKMasterAddressWatcher zkMasterAddressWatcher;
+  // Table level lock manager for schema changes
+  private final TableLockManager tableLockManager;
   // A Sleeper that sleeps for threadWakeFrequency; sleep if nothing todo.
   private final Sleeper sleeper;
   // Keep around for convenience.
@@ -346,6 +351,18 @@ public class HMaster extends Thread impl
         RegionManager.AssignmentLoadBalancer.class);
     LOG.debug("Whether to read the favoredNodes from meta: " +
         (shouldAssignRegionsWithFavoredNodes ? "Yes" : "No"));
+
+    // Initialize table level lock manager for schema changes, if enabled.
+    if (conf.getBoolean(HConstants.MASTER_SCHEMA_CHANGES_LOCK_ENABLE,
+      HConstants.DEFAULT_MASTER_SCHEMA_CHANGES_LOCK_ENABLE)) {
+      int schemaChangeLockTimeoutMs = conf.getInt(
+        HConstants.MASTER_SCHEMA_CHANGES_LOCK_TIMEOUT_MS,
+        HConstants.DEFAULT_MASTER_SCHEMA_CHANGES_LOCK_TIMEOUT_MS);
+      tableLockManager = new TableLockManager(zooKeeperWrapper,
+        address, schemaChangeLockTimeoutMs);
+    } else {
+      tableLockManager = null;
+    }
   }
 
   public boolean shouldAssignRegionsWithFavoredNodes() {
@@ -1294,6 +1311,24 @@ public class HMaster extends Thread impl
     requestClusterShutdown();
   }
 
+  private boolean isTableLockEnabled() {
+    return tableLockManager != null;
+  }
+
+  protected void lockTable(byte[] tableName, String purpose)
+  throws IOException {
+    if (isTableLockEnabled()) {
+      tableLockManager.lockTable(tableName, purpose);
+    }
+  }
+
+  protected void unlockTable(byte[] tableName)
+  throws IOException {
+    if (isTableLockEnabled()) {
+      tableLockManager.unlockTable(tableName);
+    }
+  }
+
   @Override
   public void createTable(HTableDescriptor desc, byte [][] splitKeys)
   throws IOException {
@@ -1334,16 +1369,9 @@ public class HMaster extends Thread impl
     }
   }
 
-  private synchronized void createTable(final HRegionInfo [] newRegions)
+  private static boolean tableExists(HRegionInterface srvr,
+    byte[] metaRegionName, String tableName)
   throws IOException {
-    String tableName = newRegions[0].getTableDesc().getNameAsString();
-    // 1. Check to see if table already exists. Get meta region where
-    // table would sit should it exist. Open scanner on it. If a region
-    // for the table we want to create already exists, then table already
-    // created. Throw already-exists exception.
-    MetaRegion m = regionManager.getFirstMetaRegionForRegion(newRegions[0]);
-    byte [] metaRegionName = m.getRegionName();
-    HRegionInterface srvr = this.connection.getHRegionConnection(m.getServer());
     byte[] firstRowInTable = Bytes.toBytes(tableName + ",,");
     Scan scan = new Scan(firstRowInTable);
     scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
@@ -1356,41 +1384,68 @@ public class HMaster extends Thread impl
               HConstants.REGIONINFO_QUALIFIER));
         if (info.getTableDesc().getNameAsString().equals(tableName)) {
           // A region for this table already exists. Ergo table exists.
-          throw new TableExistsException(tableName);
+          return true;
         }
       }
     } finally {
       srvr.close(scannerid);
     }
+    return false;
+  }
 
-    AssignmentPlan assignmentPlan = null;
-    if (this.shouldAssignRegionsWithFavoredNodes) {
-      // Get the assignment domain for this table
-      AssignmentDomain domain = this.getAssignmentDomain(tableName);
-      // Get the assignment plan for the new regions
-      assignmentPlan =
-        regionPlacement.getNewAssignmentPlan(newRegions, domain);
-    }
-
-    if (assignmentPlan == null) {
-      LOG.info("Generated the assignment plan for new table " + tableName);
-    } else {
-      LOG.info("NO assignment plan for new table " + tableName);
+  private synchronized void createTable(final HRegionInfo [] newRegions)
+  throws IOException {
+    String tableName = newRegions[0].getTableDesc().getNameAsString();
+    // 1. Check to see if table already exists. Get meta region where
+    // table would sit should it exist. Open scanner on it. If a region
+    // for the table we want to create already exists, then table already
+    // created. Throw already-exists exception.
+    MetaRegion m = regionManager.getFirstMetaRegionForRegion(newRegions[0]);
+    byte [] metaRegionName = m.getRegionName();
+    HRegionInterface srvr = this.connection.getHRegionConnection(m.getServer());
+    if (tableExists(srvr, metaRegionName, tableName)) {
+      throw new TableExistsException(tableName);
     }
+    byte [] tableNameBytes = Bytes.toBytes(tableName);
+    lockTable(tableNameBytes, "create");
+    try {
+      InjectionHandler.processEvent(InjectionEvent.HMASTER_CREATE_TABLE);
+      // After acquiring the lock, verify again that the table does not
+      // exist.
+      if (tableExists(srvr, metaRegionName, tableName)) {
+        throw new TableExistsException(tableName);
+      }
+      AssignmentPlan assignmentPlan = null;
+      if (this.shouldAssignRegionsWithFavoredNodes) {
+        // Get the assignment domain for this table
+        AssignmentDomain domain = this.getAssignmentDomain(tableName);
+        // Get the assignment plan for the new regions
+        assignmentPlan =
+          regionPlacement.getNewAssignmentPlan(newRegions, domain);
+      }
 
-    for(HRegionInfo newRegion : newRegions) {
-      if (assignmentPlan != null) {
-        // create the region with favorite nodes.
-        List<HServerAddress> favoredNodes =
-          assignmentPlan.getAssignment(newRegion);
-        regionManager.createRegion(newRegion, srvr, metaRegionName,
-            favoredNodes);
+      if (assignmentPlan == null) {
+        LOG.info("Generated the assignment plan for new table " + tableName);
       } else {
-        regionManager.createRegion(newRegion, srvr, metaRegionName);
+        LOG.info("NO assignment plan for new table " + tableName);
       }
+
+      for(HRegionInfo newRegion : newRegions) {
+        if (assignmentPlan != null) {
+          // create the region with favorite nodes.
+          List<HServerAddress> favoredNodes =
+            assignmentPlan.getAssignment(newRegion);
+          regionManager.createRegion(newRegion, srvr, metaRegionName,
+              favoredNodes);
+        } else {
+          regionManager.createRegion(newRegion, srvr, metaRegionName);
+        }
+      }
+      // kick off a meta scan right away to assign the newly created regions
+      regionManager.metaScannerThread.triggerNow();
+    } finally {
+      unlockTable(tableNameBytes);
     }
-    // kick off a meta scan right away to assign the newly created regions
-    regionManager.metaScannerThread.triggerNow();
   }
 
   /**
@@ -1420,10 +1475,16 @@ public class HMaster extends Thread impl
 
   @Override
   public void deleteTable(final byte [] tableName) throws IOException {
-    if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
-      throw new IOException("Can't delete root table");
+    lockTable(tableName, "delete");
+    try {
+      InjectionHandler.processEvent(InjectionEvent.HMASTER_DELETE_TABLE);
+      if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
+        throw new IOException("Can't delete root table");
+      }
+      new TableDelete(this, tableName).process();
+    } finally {
+      unlockTable(tableName);
     }
-    new TableDelete(this, tableName).process();
     LOG.info("deleted table: " + Bytes.toString(tableName));
   }
 
@@ -1432,12 +1493,18 @@ public class HMaster extends Thread impl
       List<HColumnDescriptor> columnAdditions,
       List<Pair<byte[], HColumnDescriptor>> columnModifications,
       List<byte[]> columnDeletions) throws IOException {
-    ThrottledRegionReopener reopener = this.regionManager.
-            createThrottledReopener(Bytes.toString(tableName));
-    // Regions are added to the reopener in MultiColumnOperation
-    new MultiColumnOperation(this, tableName, columnAdditions,
-        columnModifications, columnDeletions).process();
-    reopener.reOpenRegionsThrottle();
+    lockTable(tableName, "alter");
+    try {
+      InjectionHandler.processEvent(InjectionEvent.HMASTER_ALTER_TABLE);
+      ThrottledRegionReopener reopener = this.regionManager.
+        createThrottledReopener(Bytes.toString(tableName));
+      // Regions are added to the reopener in MultiColumnOperation
+      new MultiColumnOperation(this, tableName, columnAdditions,
+                               columnModifications, columnDeletions).process();
+      reopener.reOpenRegionsThrottle();
+    } finally {
+      unlockTable(tableName);
+    }
   }
 
   public Pair<Integer, Integer> getAlterStatus(byte[] tableName)
@@ -1475,18 +1542,30 @@ public class HMaster extends Thread impl
 
   @Override
   public void enableTable(final byte [] tableName) throws IOException {
-    if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
-      throw new IOException("Can't enable root table");
+    lockTable(tableName, "enable");
+    try {
+      InjectionHandler.processEvent(InjectionEvent.HMASTER_ENABLE_TABLE);
+      if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
+        throw new IOException("Can't enable root table");
+      }
+      new ChangeTableState(this, tableName, true).process();
+    } finally {
+      unlockTable(tableName);
     }
-    new ChangeTableState(this, tableName, true).process();
   }
 
   @Override
   public void disableTable(final byte [] tableName) throws IOException {
-    if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
-      throw new IOException("Can't disable root table");
+    lockTable(tableName, "disable");
+    try {
+      InjectionHandler.processEvent(InjectionEvent.HMASTER_DISABLE_TABLE);
+      if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
+        throw new IOException("Can't disable root table");
+      }
+      new ChangeTableState(this, tableName, false).process();
+    } finally {
+      unlockTable(tableName);
     }
-    new ChangeTableState(this, tableName, false).process();
   }
 
   /**

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java?rev=1334557&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/TableLockManager.java Sun May  6 04:35:51 2012
@@ -0,0 +1,146 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.TableLockTimeoutException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.DistributedLock;
+import org.apache.hadoop.hbase.zookeeper.DistributedLock.OwnerMetadataHandler;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A manager for distributed table level locks.
+ */
+public class TableLockManager {
+
+  private static final Log LOG = LogFactory.getLog(TableLockManager.class);
+
+  private static final OwnerMetadataHandler METADATA_HANDLER =
+    new OwnerMetadataHandler() {
+      @Override
+      public void printOwnerMetadata(byte[] ownerMetadata) {
+        LOG.info("Table is locked: " + Bytes.toString(ownerMetadata));
+      }
+    };
+
+  /**
+   * Tables that are currently locked by this instance. Allows locks to
+   * be released by table name.
+   */
+  private final ConcurrentMap<String, DistributedLock> acquiredTableLocks;
+
+  private final HServerAddress serverAddress;
+
+  private final ZooKeeperWrapper zkWrapper;
+
+  private final int lockTimeoutMs;
+
+  /**
+   * Initialize a new manager for table-level locks.
+   * @param zkWrapper
+   * @param serverAddress Address of the server responsible for acquiring and
+   *                      releasing the table-level locks
+   * @param lockTimeoutMs Timeout (in milliseconds) for acquiring a lock for a
+   *                      given table, or -1 for no timeout
+   */
+  public TableLockManager(ZooKeeperWrapper zkWrapper,
+    HServerAddress serverAddress, int lockTimeoutMs) {
+    this.zkWrapper = zkWrapper;
+    this.serverAddress = serverAddress;
+    this.lockTimeoutMs = lockTimeoutMs;
+    this.acquiredTableLocks = new ConcurrentHashMap<String, DistributedLock>();
+  }
+
+  /**
+   * Lock a table, given a purpose.
+   * @param tableName Table to lock
+   * @param purpose Human readable reason for locking the table
+   * @throws TableLockTimeoutException If unable to acquire a lock within a
+   *                                   specified time period (if any)
+   * @throws IOException If unrecoverable ZooKeeper error occurs
+   */
+  public void lockTable(byte[] tableName, String purpose)
+  throws IOException {
+    String tableNameStr = Bytes.toString(tableName);
+    DistributedLock lock = createTableLock(tableNameStr, purpose);
+    try {
+      if (lockTimeoutMs == -1) {
+        // Wait indefinitely
+        lock.acquire();
+      } else {
+        if (!lock.tryAcquire(lockTimeoutMs)) {
+          throw new TableLockTimeoutException("Timed out acquiring " +
+            "lock for " + tableNameStr + " after " + lockTimeoutMs + " ms.");
+        }
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted acquiring a lock for " + tableNameStr, e);
+      Thread.currentThread().interrupt();
+      throw new InterruptedIOException("Interrupted acquiring a lock");
+    }
+
+    if (acquiredTableLocks.putIfAbsent(tableNameStr, lock) != null) {
+      // This should never execute if DistributedLock is implemented
+      // correctly.
+      LOG.error("Lock for " + tableNameStr + " acquired by multiple owners!");
+      LOG.error("Currently held locks: " + acquiredTableLocks);
+      throw new IllegalStateException("Lock for " + tableNameStr +
+        " was acquired by multiple owners!");
+    }
+  }
+
+  private DistributedLock createTableLock(String tableName, String purpose) {
+    String tableLockZNode = zkWrapper.getZNode(zkWrapper.tableLockZNode,
+      tableName);
+    byte[] lockMetadata = Bytes.toBytes("[Table = " + tableName +
+      "\nOwner server address = " + serverAddress +
+      "\nOwner thread id = " + Thread.currentThread().getId() +
+      "\nPurpose = " + purpose + "]");
+    return new DistributedLock(zkWrapper, tableLockZNode, lockMetadata,
+      METADATA_HANDLER);
+  }
+
+  public void unlockTable(byte[] tableName)
+  throws IOException {
+    String tableNameStr = Bytes.toString(tableName);
+    DistributedLock lock = acquiredTableLocks.get(tableNameStr);
+    if (lock == null) {
+      throw new IllegalStateException("Table " + tableNameStr +
+        " is not locked!");
+    }
+
+    try {
+      acquiredTableLocks.remove(tableNameStr);
+      lock.release();
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while releasing a lock for " + tableNameStr);
+      Thread.currentThread().interrupt();
+      throw new InterruptedIOException();
+    }
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java?rev=1334557&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java Sun May  6 04:35:51 2012
@@ -0,0 +1,35 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+/**
+ * Enumeration of all injection events.
+ * When defining new events, please PREFIX the name
+ * with the supervised class.
+ *
+ * Please see InjectionHandler.
+ */
+public enum InjectionEvent {
+  HMASTER_CREATE_TABLE,
+  HMASTER_DELETE_TABLE,
+  HMASTER_ALTER_TABLE,
+  HMASTER_ENABLE_TABLE,
+  HMASTER_DISABLE_TABLE
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionHandler.java?rev=1334557&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionHandler.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/InjectionHandler.java Sun May  6 04:35:51 2012
@@ -0,0 +1,171 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * The InjectionHandler is an object provided to a class,
+ * which can perform custom actions for JUnit testing.
+ * JUnit test can implement custom version of the handler.
+ * For example, let's say we want to supervise FSImage object:
+ *
+ * <code>
+ * // JUnit test code
+ * class MyInjectionHandler extends InjectionHandler {
+ *   protected void _processEvent(InjectionEvent event,
+ *       Object... args) {
+ *     if (event == InjectionEvent.MY_EVENT) {
+ *       LOG.info("Handling my event for fsImage: "
+ *         + args[0].toString());
+ *     }
+ *   }
+ * }
+ *
+ * public void testMyEvent() {
+ *   InjectionHandler ih = new MyInjectionHandler();
+ *   InjectionHandler.set(ih);
+ *   ...
+ *
+ *   InjectionHandler.clear();
+ * }
+ *
+ * // supervised code example
+ *
+ * class FSImage {
+ *
+ *   private doSomething() {
+ *     ...
+ *     if (condition1 && InjectionHandler.trueCondition(MY_EVENT1) {
+ *       ...
+ *     }
+ *     if (condition2 || condition3
+ *       || InjectionHandler.falseCondition(MY_EVENT1) {
+ *       ...
+ *     }
+ *     ...
+ *     InjectionHandler.processEvent(MY_EVENT2, this)
+ *     ...
+ *     try {
+ *       read();
+ *       InjectionHandler.processEventIO(MY_EVENT3, this, object);
+ *       // might throw an exception when testing
+ *     catch (IOEXception) {
+ *       LOG.info("Exception")
+ *     }
+ *     ...
+ *   }
+ *   ...
+ * }
+ * </code>
+ *
+ * Each unit test should use a unique event type.
+ * The types can be defined by adding them to
+ * InjectionEvent class.
+ *
+ * methods:
+ *
+ * // simulate actions
+ * void processEvent()
+ * // simulate exceptions
+ * void processEventIO() throws IOException
+ *
+ * // simulate conditions
+ * boolean trueCondition()
+ * boolean falseCondition()
+ *
+ * The class implementing InjectionHandler must
+ * override respective protected methods
+ * _processEvent()
+ * _processEventIO()
+ * _trueCondition()
+ * _falseCondition()
+ */
+public class InjectionHandler {
+
+  private static final Log LOG = LogFactory.getLog(InjectionHandler.class);
+
+  // the only handler to which everyone reports
+  private static InjectionHandler handler = new InjectionHandler();
+
+  // can not be instantiated outside, unless a testcase extends it
+  protected InjectionHandler() {}
+
+  // METHODS FOR PRODUCTION CODE
+
+  protected void _processEvent(InjectionEvent event, Object... args) {
+    // by default do nothing
+  }
+
+  protected void _processEventIO(InjectionEvent event, Object... args) throws IOException{
+    // by default do nothing
+  }
+
+  protected boolean _trueCondition(InjectionEvent event, Object... args) {
+    return true; // neutral in conjunction
+  }
+
+  protected boolean _falseCondition(InjectionEvent event, Object... args) {
+    return false; // neutral in alternative
+  }
+
+  ////////////////////////////////////////////////////////////
+
+  /**
+   * Set to the empty/production implementation.
+   */
+  public static void clear() {
+    handler = new InjectionHandler();
+  }
+
+  /**
+   * Set custom implementation of the handler.
+   */
+  public static void set(InjectionHandler custom) {
+    LOG.warn("WARNING: SETTING INJECTION HANDLER" +
+      " - THIS SHOULD NOT BE USED IN PRODUCTION !!!");
+    handler = custom;
+  }
+
+  /*
+  * Static methods for reporting to the handler
+  */
+
+  public static void processEvent(InjectionEvent event, Object... args) {
+    handler._processEvent(event, args);
+  }
+
+  public static void processEventIO(InjectionEvent event, Object... args)
+    throws IOException {
+    handler._processEventIO(event, args);
+  }
+
+  public static boolean trueCondition(InjectionEvent event, Object... args) {
+    return handler._trueCondition(event, args);
+  }
+
+  public static boolean falseCondition(InjectionEvent event, Object... args) {
+    return handler._falseCondition(event, args);
+  }
+}
+

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/DistributedLock.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/DistributedLock.java?rev=1334557&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/DistributedLock.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/DistributedLock.java Sun May  6 04:35:51 2012
@@ -0,0 +1,314 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * ZooKeeper based distributed lock. This lock is <b>not</b> re-entrant.
+ */
+public class DistributedLock {
+
+  private static final Log LOG = LogFactory.getLog(DistributedLock.class);
+
+  private final ZooKeeperWrapper zkWrapper;
+
+  private final String lockZNode;
+
+  private final byte[] lockMeta;
+
+  private final String fullyQualifiedZNode;
+
+  private final OwnerMetadataHandler metadataHandler;
+
+  private volatile int lockZNodeVersion;
+
+  /**
+   * Create a distributed lock instance.
+   * @param zkWrapper
+   * @param lockZNode (Non-fully qualified) ZNode path for the lock
+   * @param lockMeta  Some metadata about the node. Preferably, a human
+   *                  readable string. Must not be null.
+   * @param metadataHandler If not null, use this to parse lockMeta
+   */
+  public DistributedLock(ZooKeeperWrapper zkWrapper, String lockZNode,
+    byte[] lockMeta, OwnerMetadataHandler metadataHandler) {
+    Preconditions.checkNotNull(lockMeta);
+
+    this.zkWrapper = zkWrapper;
+    this.lockZNode = lockZNode;
+    this.lockMeta = lockMeta;
+    this.metadataHandler = metadataHandler;
+    this.fullyQualifiedZNode = zkWrapper.getZNode(zkWrapper.getParentZNode(),
+      lockZNode);
+    this.lockZNodeVersion = -1;
+  }
+
+
+  /**
+   * Acquires the lock, waiting indefinitely until the lock is released, or
+   * the thread is interrupted.
+   * @throws IOException
+   * @throws InterruptedException If current thread is interrupted while
+   *                              waiting for the lock
+   */
+  public void acquire()
+  throws IOException, InterruptedException {
+    if (!tryAcquire(-1)) {
+      // Should either throw an exception or acquire lock
+      throw new IllegalStateException("tryAcquire() should either wait" +
+        " indefinitely, acquire the lock, or throw an exception");
+    }
+  }
+
+  /**
+   * Acquired a lock within a given wait time
+   * @param timeoutMs The maximum time (in milliseconds) to wait for the lock,
+   *                  -1 to wait indefinitely.
+   * @return True if the lock was acquired, false if waiting time elapsed
+   *         before the lock was acquired
+   * @throws IOException
+   * @throws InterruptedException If thread is interrupted while waiting to
+   *                              acquire the lock
+   */
+  public boolean tryAcquire(long timeoutMs)
+  throws IOException, InterruptedException {
+    if (isAcquired()) {
+      throw new IllegalStateException("Lock " + fullyQualifiedZNode +
+        "has already been acquired");
+    }
+
+    boolean acquiredLock = waitForLock(timeoutMs);
+    if (acquiredLock) {
+      lockZNodeVersion = getZNodeVersionOrThrow();
+      LOG.debug("Acquired lock " + fullyQualifiedZNode + ", version " +
+        lockZNodeVersion);
+    } else {
+      LOG.error("Unable to acquire lock " + fullyQualifiedZNode + " in " +
+        timeoutMs + " ms");
+    }
+    return acquiredLock;
+  }
+
+  /**
+   * Main loop: if a ZNode already exists for the lock, set a watcher, to
+   * await the ZNode's deletion. Once the ZNode is deleted, try creating
+   * the ZNode again. Continue this loop until either the current thread
+   * is interrupted or maximum waiting time (if set) elapses.
+   * @param timeoutMs Maximum waiting time, -1 to wait indefinitely.
+   * @return False if maximum waiting time (if set) has elapsed
+   */
+  private boolean waitForLock(long timeoutMs)
+  throws IOException, InterruptedException {
+    boolean hasTimeout = timeoutMs != -1;
+    long waitUntilMs =
+      hasTimeout ? EnvironmentEdgeManager.currentTimeMillis() + timeoutMs : -1;
+    boolean acquiredLock = false;
+    boolean shouldCreate = true;
+    while (!acquiredLock && shouldCreate) {
+      CountDownLatch watcherFinished = new CountDownLatch(1);
+      DeletionWatcher watcher = new DeletionWatcher(watcherFinished);
+
+      // It's possible for another thread to acquire a znode after our
+      // watcher has fired
+      try {
+        acquiredLock = tryCreateZNode(watcher);
+      } catch (KeeperException.NoNodeException e) {
+        // If the znode is deleted before the watcher could be set,
+        // simulate the watcher firing; this means we will still
+        // check if the specified waiting time has elapsed before
+        // trying to create the znode again.
+        watcherFinished.countDown();
+      }
+      try {
+        if (!acquiredLock) {
+          printLockMetadata();
+          if (hasTimeout) {
+            long remainingMs =
+              waitUntilMs - EnvironmentEdgeManager.currentTimeMillis();
+            if (remainingMs < 0) {
+              shouldCreate = false;
+            } else {
+              shouldCreate = watcherFinished.await(remainingMs,
+                TimeUnit.MILLISECONDS);
+            }
+          } else {
+            // If a timeout is not set, await indefinitely
+            watcherFinished.await();
+          }
+          // Check if the watcher thread encountered an exception
+          // when re-setting the watch
+          if (watcher.hasException()) {
+            Throwable t = watcher.getException();
+            LOG.error("Exception in the watcher ", t);
+            throw new IOException("Exception in the watcher", t);
+          }
+        }
+      } finally {
+        zkWrapper.unregisterListener(watcher);
+      }
+    }
+    return acquiredLock;
+  }
+
+  /**
+   * Check if the lock is already acquired.
+   */
+  public boolean isAcquired() {
+    return lockZNodeVersion != -1;
+  }
+
+  private int getZNodeVersionOrThrow()
+  throws IOException {
+    Stat stat = new Stat();
+    if (zkWrapper.readZNode(fullyQualifiedZNode, stat) == null) {
+      throw new IllegalStateException("ZNode " + fullyQualifiedZNode +
+        " no longer exists!");
+    }
+    return stat.getVersion();
+  }
+
+  private boolean tryCreateZNode(Watcher watcher)
+  throws IOException, InterruptedException, KeeperException.NoNodeException {
+    return zkWrapper.checkExistsAndCreate(lockZNode, lockMeta,
+      CreateMode.PERSISTENT, watcher);
+  }
+
+  private boolean printLockMetadata()
+  throws IOException, InterruptedException {
+    byte[] ownerMetadata = null;
+    Stat stat = new Stat();
+    try {
+      ownerMetadata = zkWrapper.readZNodeIfExists(lockZNode, stat);
+      if (ownerMetadata == null) {
+        return false;
+      }
+      LOG.trace("Lock ZNode statistics " + stat);
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted reading ZNode " + fullyQualifiedZNode, e);
+      throw e;
+    }
+    if (metadataHandler != null) {
+      metadataHandler.printOwnerMetadata(ownerMetadata);
+    }
+    return true;
+  }
+
+  /**
+   * Release the lock.
+   * @throws IOException If there is an unrecoverable ZooKeeper error
+   * @throws InterruptedException If the current thread is interrupted while
+   *                              releasing the lock
+   */
+  public void release()
+  throws IOException, InterruptedException {
+    if (!isAcquired()) {
+      throw new IllegalStateException("Lock " + fullyQualifiedZNode +
+        " is not held by this thread");
+    }
+    try {
+      if (zkWrapper.checkExists(fullyQualifiedZNode) != -1) {
+        zkWrapper.deleteZNode(fullyQualifiedZNode, false, lockZNodeVersion);
+        lockZNodeVersion = -1;
+        LOG.debug("Released lock " + fullyQualifiedZNode);
+      } else {
+        throw new IllegalStateException("ZNode " + fullyQualifiedZNode +
+          " does not exist in ZooKeeper");
+      }
+    } catch (KeeperException.BadVersionException e) {
+      LOG.error("Attempted to delete a version we do not hold", e);
+      throw new IllegalStateException("ZNode " + fullyQualifiedZNode +
+        " has been modified since acquiring the lock");
+    } catch (KeeperException e) {
+      LOG.error("Unrecoverable ZooKeeper error releasing lock for " +
+                  fullyQualifiedZNode, e);
+      throw new IOException("Unrecoverable ZooKeeper error", e);
+    }
+  }
+
+  /**
+   * Callback interface to parse, and print metadata associated with locks
+   * (e.g., owner, purpose) stored within ZNodes.
+   */
+  public static interface OwnerMetadataHandler {
+    /**
+     * Called after the metadata associated with the lock is read.
+     * @param ownerMetadata
+     */
+    public void printOwnerMetadata(byte[] ownerMetadata);
+  }
+
+  private class DeletionWatcher implements Watcher {
+    private final CountDownLatch watcherFinished;
+
+    private volatile Throwable exception = null;
+
+    DeletionWatcher(CountDownLatch watcherFinished) {
+      this.watcherFinished = watcherFinished;
+    }
+
+    public Throwable getException() {
+      return exception;
+    }
+
+    public boolean hasException() {
+      return exception != null;
+    }
+
+    @Override
+    public void process(WatchedEvent watchedEvent) {
+      if (watchedEvent.getPath().equals(fullyQualifiedZNode)) {
+        if (watchedEvent.getType() == Event.EventType.NodeDeleted) {
+          watcherFinished.countDown();
+        } else {
+          try {
+            zkWrapper.watchAndCheckExists(fullyQualifiedZNode);
+          } catch (Throwable t) {
+            // If an exception occurs when we try to re-set the watcher,
+            // we should return control back waitForLock(): otherwise,
+            // waitForLock() would continue to wait for countDown on
+            // watcherFinished, which would never happen as the exception
+            // prevented the watcher from being re-set. When waitForLock()
+            // finishes waiting for the latch, it will use hasException()
+            // and getException() to check whether the watcher has encounter
+            // an exception and (if an exception was encountered), re-throw
+            // the exception.
+            exception = t;
+            watcherFinished.countDown();
+            LOG.error("Error when re-setting the watch on " +
+              fullyQualifiedZNode, t);
+          }
+        }
+      }
+    }
+  }
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java?rev=1334557&r1=1334556&r2=1334557&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java Sun May  6 04:35:51 2012
@@ -148,6 +148,10 @@ public class ZooKeeperWrapper implements
    */
   public final String splitLogZNode;
   /*
+   * ZNode used for table-level schema modification locks
+   */
+  public final String tableLockZNode;
+  /*
    * List of ZNodes in the unassgined region that are already being watched
    */
   private Set<String> unassignedZNodesWatched = new HashSet<String>();
@@ -263,12 +267,13 @@ public class ZooKeeperWrapper implements
     String stateZNodeName      = conf.get("zookeeper.znode.state", "shutdown");
     String regionsInTransitZNodeName = conf.get("zookeeper.znode.regionInTransition", "UNASSIGNED");
     String splitLogZNodeName   = conf.get("zookeeper.znode.splitlog", "splitlog");
-
+    String tableLockZNodeName  = conf.get("zookeeper.znode.tableLock", "tableLock");
     rootRegionZNode     = getZNode(parentZNode, rootServerZNodeName);
     rsZNode             = getZNode(parentZNode, rsZNodeName);
     rgnsInTransitZNode  = getZNode(parentZNode, regionsInTransitZNodeName);
     masterElectionZNode = getZNode(parentZNode, masterAddressZNodeName);
     clusterStateZNode   = getZNode(parentZNode, stateZNodeName);
+    tableLockZNode      = getZNode(parentZNode, tableLockZNodeName);
     int retryNum = conf.getInt(HConstants.ZOOKEEPER_CONNECTION_RETRY_NUM, 6);
     int retryFreq = conf.getInt("zookeeper.connection.retry.freq", 1000);
     zkDumpConnectionTimeOut = conf.getInt("zookeeper.dump.connection.timeout",
@@ -775,6 +780,20 @@ public class ZooKeeperWrapper implements
    * @throws InterruptedException
    */
   public void deleteZNode(String znode, boolean recursive)
+  throws KeeperException, InterruptedException {
+    deleteZNode(znode, recursive, -1);
+  }
+
+  /**
+   * Atomically delete a ZNode if the ZNode's version matches
+   * the expected version.
+   * @param znode Fully qualified path to the ZNode
+   * @param recursive If true, will recursively delete ZNode's children
+   * @param version Expected version, as obtained from a Stat object
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public void deleteZNode(String znode, boolean recursive, int version)
     throws KeeperException, InterruptedException {
     if (recursive) {
       LOG.info("<" + instanceName + ">" + "deleteZNode get children for " + znode);
@@ -783,11 +802,11 @@ public class ZooKeeperWrapper implements
         for (String child : znodes) {
           String childFullPath = getZNode(znode, child);
           LOG.info("<" + instanceName + ">" + "deleteZNode recursive call " + childFullPath);
-          this.deleteZNode(childFullPath, true);
+          this.deleteZNode(childFullPath, true, version);
         }
       }
     }
-    this.recoverableZK.delete(znode, -1);
+    this.recoverableZK.delete(znode, version);
     LOG.debug("<" + instanceName + ">" + "Deleted ZNode " + znode);
   }
 
@@ -1271,6 +1290,60 @@ public class ZooKeeperWrapper implements
     return fullyQualifiedZNodeName;
   }
 
+  /**
+   * Atomically create a ZNode, if and only if the ZNode doesn't exist.
+   * If watcher parameter is not null, and no exception is thrown, the
+   * supplied watcher will be set (to be triggered by the ZNode's deletion).
+   * @param zNodeName
+   * @param data
+   * @param createMode
+   * @param watcher Watcher to be set, or null to not set a watcher
+   * @throws IOException If there is an unrecoverable ZooKeeper error
+   * @throws NoNodeException If the node is deleted before watch can be set
+   * @return False if the ZNode already exists, true otherwise
+   */
+  public boolean checkExistsAndCreate(String zNodeName, byte[] data,
+    CreateMode createMode, Watcher watcher)
+  throws IOException, InterruptedException, NoNodeException {
+    String fullyQualifiedZNodeName = getZNode(parentZNode, zNodeName);
+    if (!ensureParentExists(fullyQualifiedZNodeName)) {
+      throw new IOException("Parent for " + zNodeName +
+        " does not exist and could not be created");
+    }
+    try {
+      if (watcher != null) {
+        registerListener(watcher);
+      }
+      recoverableZK.create(fullyQualifiedZNodeName, data, Ids.OPEN_ACL_UNSAFE,
+        createMode);
+    } catch (KeeperException.NodeExistsException e) {
+      if (watcher != null) {
+        boolean exists;
+        try {
+          exists = watchAndCheckExists(fullyQualifiedZNodeName);
+        } catch (KeeperException ke) {
+          LOG.error("Unrecoverable ZooKeeper error setting the watcher on " +
+          fullyQualifiedZNodeName, ke);
+          throw new IOException("Unrecoverable ZooKeeper error setting watcher",
+            ke);
+        }
+        if (!exists) {
+          throw new NoNodeException(fullyQualifiedZNodeName +
+            " deleted before watch could be set");
+        }
+      }
+      return false;
+    } catch (KeeperException e) {
+      LOG.error("Unrecoverable ZooKeeper error encountered on " +
+        fullyQualifiedZNodeName, e);
+      throw new IOException("Unrecoverable ZooKeeper error", e);
+    } catch(InterruptedException e) {
+      LOG.warn("Interrupted waiting for " + fullyQualifiedZNodeName, e);
+      throw e;
+    }
+    return true;
+  }
+
   public byte[] readZNode(String znodeName, Stat stat) throws IOException {
     byte[] data;
     try {
@@ -1284,6 +1357,23 @@ public class ZooKeeperWrapper implements
     return data;
   }
 
+  public byte[] readZNodeIfExists(String znodeName, Stat stat)
+    throws IOException, InterruptedException {
+    byte[] data;
+    String fullyQualifiedZNodeName = getZNode(parentZNode, znodeName);
+    try {
+      data = recoverableZK.getData(fullyQualifiedZNodeName, this, stat);
+    } catch (InterruptedException e) {
+      LOG.warn("Reading from ZNode interrupted ");
+      throw e;
+    } catch (NoNodeException e) {
+      LOG.warn(fullyQualifiedZNodeName + " no longer exists");
+      return null;
+    } catch (KeeperException e) {
+      throw new IOException(e);
+    }
+    return data;
+  }
   // TODO: perhaps return the version number from this write?
   public boolean writeZNode(String znodeName, byte[] data, int version, boolean watch) throws IOException {
       try {

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSchemaModificationLocks.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSchemaModificationLocks.java?rev=1334557&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSchemaModificationLocks.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSchemaModificationLocks.java Sun May  6 04:35:51 2012
@@ -0,0 +1,164 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableLockTimeoutException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.DelayInducingInjectionHandler;
+import org.apache.hadoop.hbase.util.InjectionEvent;
+import org.apache.hadoop.hbase.util.InjectionHandler;
+
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+public class TestSchemaModificationLocks {
+
+  private static final Log LOG =
+    LogFactory.getLog(TestSchemaModificationLocks.class);
+
+  private static final byte[] TABLE_NAME = Bytes.toBytes("TestTableLevelLocks");
+
+  private static final byte[] FAMILY = Bytes.toBytes("f1");
+
+  private static final byte[] NEW_FAMILY = Bytes.toBytes("f2");
+
+  private final HBaseTestingUtility TEST_UTIL =
+    new HBaseTestingUtility();
+
+
+  private HMaster getMaster() {
+    return TEST_UTIL.getMiniHBaseCluster().getMaster();
+  }
+
+  public void prepareMiniCluster() throws Exception {
+    TEST_UTIL.startMiniCluster(2);
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test(timeout = 60000)
+  public void testLockTimeoutException() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setInt(HConstants.MASTER_SCHEMA_CHANGES_LOCK_TIMEOUT_MS, 3000);
+    prepareMiniCluster();
+    DelayInducingInjectionHandler delayer =
+      new DelayInducingInjectionHandler();
+    delayer.setEventDelay(InjectionEvent.HMASTER_ALTER_TABLE, 10000);
+    InjectionHandler.set(delayer);
+
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    Future<Object> shouldFinish = executor.submit(new Callable<Object>() {
+      @Override
+      public Object call() throws Exception {
+        HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+        admin.deleteColumn(TABLE_NAME, FAMILY);
+        return null;
+      }
+    });
+
+    delayer.awaitEvent(InjectionEvent.HMASTER_ALTER_TABLE);
+    HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+    boolean caughtTimeoutException = false;
+    try {
+      admin.addColumn(TABLE_NAME, new HColumnDescriptor(NEW_FAMILY));
+      fail();
+    } catch (TableLockTimeoutException e) {
+      caughtTimeoutException = true;
+    }
+    assertTrue(caughtTimeoutException);
+    shouldFinish.get();
+  }
+
+  @Test(timeout = 60000)
+  public void testAlterAndDisable() throws Exception {
+    prepareMiniCluster();
+    // Send a request to alter a table, then sleep during
+    // the alteration phase. In the mean time, from another
+    // thread, send a request to disable, and then delete a table.
+
+    // The requests should be serialized
+    final DelayInducingInjectionHandler delayer =
+      new DelayInducingInjectionHandler();
+    delayer.setEventDelay(InjectionEvent.HMASTER_ALTER_TABLE, 6000);
+    delayer.setEventDelay(InjectionEvent.HMASTER_DISABLE_TABLE, 3000);
+    InjectionHandler.set(delayer);
+
+    ExecutorService executor = Executors.newFixedThreadPool(2);
+    Future alterTableFuture = executor.submit(new Callable<Object>() {
+      @Override
+      public Object call() throws Exception {
+        HMaster master = getMaster();
+        master.addColumn(TABLE_NAME, new HColumnDescriptor(NEW_FAMILY));
+        LOG.info("Added new column family");
+        HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+        assertTrue(admin.isTableEnabled(TABLE_NAME));
+        HTableDescriptor tableDesc = master.getTableDescriptor(TABLE_NAME);
+        assertTrue(tableDesc.getFamiliesKeys().contains(NEW_FAMILY));
+        return null;
+      }
+    });
+    Future disableTableFuture = executor.submit(new Callable<Object>() {
+      @Override
+      public Object call() throws Exception {
+        delayer.awaitEvent(InjectionEvent.HMASTER_ALTER_TABLE);
+        HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+        admin.disableTable(TABLE_NAME);
+        assertTrue(admin.isTableDisabled(TABLE_NAME));
+        admin.deleteTable(TABLE_NAME);
+        assertFalse(admin.tableExists(TABLE_NAME));
+        return null;
+      }
+    });
+
+    try {
+      disableTableFuture.get();
+      alterTableFuture.get();
+    } catch (ExecutionException e) {
+      if (e.getCause() instanceof AssertionError) {
+        throw (AssertionError) e.getCause();
+      }
+      throw e;
+    }
+  }
+
+}

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/DelayInducingInjectionHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/DelayInducingInjectionHandler.java?rev=1334557&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/DelayInducingInjectionHandler.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/DelayInducingInjectionHandler.java Sun May  6 04:35:51 2012
@@ -0,0 +1,93 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * An Injection handler intended to introduce configurable delays
+ * for events. <b>Not for production use.</b>
+ */
+public class DelayInducingInjectionHandler extends InjectionHandler {
+
+  private static final Log LOG =
+    LogFactory.getLog(DelayInducingInjectionHandler.class);
+
+  private final ConcurrentMap<InjectionEvent, Long> eventToDelayTimeMs =
+    new ConcurrentHashMap<InjectionEvent, Long>();
+
+  private final ConcurrentMap<InjectionEvent, CountDownLatch> eventsToWaitFor =
+    new ConcurrentHashMap<InjectionEvent, CountDownLatch>();
+
+  public DelayInducingInjectionHandler() {
+    LOG.warn("DelayInducingInjectionHandler initialized. Do not use this in" +
+      " production");
+  }
+
+  public void setEventDelay(InjectionEvent event, long delayTimeMs) {
+    LOG.warn("Setting delay of " + delayTimeMs + " ms");
+    eventToDelayTimeMs.put(event, delayTimeMs);
+  }
+
+  public void awaitEvent(InjectionEvent event)
+  throws InterruptedException {
+    if (!eventToDelayTimeMs.containsKey(event)) {
+      throw new IllegalArgumentException("No delay set for " + event + "!");
+    }
+    if (!eventsToWaitFor.containsKey(event)) {
+      eventsToWaitFor.putIfAbsent(event, new CountDownLatch(1));
+    }
+    eventsToWaitFor.get(event).await();
+  }
+
+  @Override
+  protected void _processEvent(InjectionEvent event, Object... args) {
+    notifyAndSleep(event);
+  }
+
+  @Override
+  protected void _processEventIO(InjectionEvent event, Object... args)
+    throws IOException {
+    notifyAndSleep(event);
+  }
+
+  private void notifyAndSleep(InjectionEvent event) {
+    if (eventToDelayTimeMs.containsKey(event)) {
+      if (!eventsToWaitFor.containsKey(event)) {
+        eventsToWaitFor.putIfAbsent(event, new CountDownLatch(1));
+      }
+      eventsToWaitFor.get(event).countDown();
+      long delayTimeMs = eventToDelayTimeMs.get(event);
+      LOG.warn("Sleeping for " + delayTimeMs + " ms for " + event);
+      try {
+        Thread.sleep(delayTimeMs);
+      } catch (InterruptedException e) {
+        LOG.warn("Sleep interrupted for " + event, e);
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+}

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/zookeeper/TestDistributedLock.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/zookeeper/TestDistributedLock.java?rev=1334557&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/zookeeper/TestDistributedLock.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/zookeeper/TestDistributedLock.java Sun May  6 04:35:51 2012
@@ -0,0 +1,183 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.RuntimeExceptionAbortStrategy;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+public class TestDistributedLock {
+
+  private static final Log LOG = LogFactory.getLog(TestDistributedLock.class);
+
+  private static final HBaseTestingUtility TEST_UTIL =
+    new HBaseTestingUtility();
+
+  private static final int NUM_THREADS = 10;
+
+  private static Configuration conf;
+
+  private final CountDownLatch lockHeldLatch = new CountDownLatch(1);
+
+  @BeforeClass
+  public static void beforeAllTests() throws Exception {
+    conf = TEST_UTIL.getConfiguration();
+    TEST_UTIL.startMiniZKCluster();
+    conf.setInt(HConstants.ZOOKEEPER_SESSION_TIMEOUT, 1000);
+  }
+
+  @AfterClass
+  public static void afterAllTests() throws IOException {
+    TEST_UTIL.shutdownMiniZKCluster();
+  }
+
+  @Test(timeout = 30000)
+  public void testTimeout() throws Exception {
+    Callable<Object> shouldHog = new Callable<Object>() {
+      @Override
+      public Object call() throws Exception {
+        ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(conf,
+          "hogThread", new RuntimeExceptionAbortStrategy());
+        DistributedLock lock = new DistributedLock(zkw, "testTimeout",
+          Bytes.toBytes("hogThread"), null);
+        lock.acquire();
+        lockHeldLatch.countDown();
+        Thread.sleep(10000);
+        lock.release();
+        return null;
+      }
+    };
+    Callable<Object> shouldTimeout = new Callable<Object>() {
+      @Override
+      public Object call() throws Exception {
+        ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(conf,
+          "threadShouldTimeout", new RuntimeExceptionAbortStrategy());
+        DistributedLock lock = new DistributedLock(zkw, "testTimeout",
+          Bytes.toBytes("threadShouldTimeout"), null);
+        lockHeldLatch.await();
+        assertFalse(lock.tryAcquire(5000));
+        return null;
+      }
+    };
+    Callable<Object> shouldAcquireLock = new Callable<Object>() {
+      @Override
+      public Object call() throws Exception {
+        ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(conf,
+          "threadShouldAcquireLock", new RuntimeExceptionAbortStrategy());
+        DistributedLock lock = new DistributedLock(zkw, "testTimeout",
+          Bytes.toBytes("threadShouldAcquireLock"), null);
+        lockHeldLatch.await();
+        assertTrue(lock.tryAcquire(30000));
+        lock.release();
+        return null;
+      }
+    };
+    ExecutorService executor = Executors.newFixedThreadPool(3);
+    List<Future<Object>> threadResults = new ArrayList<Future<Object>>();
+    threadResults.add(executor.submit(shouldHog));
+    threadResults.add(executor.submit(shouldAcquireLock));
+    threadResults.add(executor.submit(shouldTimeout));
+    executor.shutdown();
+    assertOnFutures(threadResults);
+  }
+
+  @Test(timeout = 30000)
+  public void testLockAndRelease() throws Exception {
+    final AtomicBoolean isLockHeld = new AtomicBoolean(false);
+    ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
+    List<Future<Object>> threadResults = new ArrayList<Future<Object>>();
+    for (int i = 0; i < NUM_THREADS; ++i) {
+      final String threadDesc = "testLockAndRelease" + i;
+      Future<Object> threadResult =
+        executor.submit(new Callable<Object>() {
+          @Override
+          public Object call() throws IOException {
+            try {
+              ZooKeeperWrapper zkw = ZooKeeperWrapper.createInstance(conf,
+                threadDesc, new RuntimeExceptionAbortStrategy());
+              DistributedLock lock =
+                new DistributedLock(zkw, "testLockAndRelease",
+                  Bytes.toBytes(threadDesc), null);
+              lock.acquire();
+              try {
+                // No one else should be holding the lock
+                assertTrue(isLockHeld.compareAndSet(false, true));
+                Thread.sleep(1000);
+                // No one else should have released the lock
+                assertTrue(isLockHeld.compareAndSet(true, false));
+              } finally {
+                isLockHeld.set(false);
+                lock.release();
+              }
+            } catch (IOException e) {
+              LOG.error("Exception acquiring lock", e);
+              throw e;
+            } catch (InterruptedException e) {
+              LOG.warn(threadDesc + " interrupted", e);
+              Thread.currentThread().interrupt();
+              throw new InterruptedIOException();
+            }
+            return null;
+          }
+        });
+      threadResults.add(threadResult);
+    }
+    assertOnFutures(threadResults);
+  }
+
+  private static void assertOnFutures(List<Future<Object>> threadResults)
+    throws InterruptedException, ExecutionException {
+    for (Future threadResult : threadResults) {
+      try {
+        threadResult.get();
+      } catch (ExecutionException e) {
+        if (e.getCause() instanceof AssertionError) {
+          throw (AssertionError) e.getCause();
+        }
+        throw e;
+      }
+    }
+  }
+}



Mime
View raw message