hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject svn commit: r1448867 [3/3] - in /hbase/trunk: hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ hbase-protocol/src/main/protobuf/ hbase-server/src/main/java/org/apache/hadoop/hbase/ hbase-server/src/main/java/org/apache/hadoop/hb...
Date Fri, 22 Feb 2013 00:15:53 GMT
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java?rev=1448867&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java
(added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java
Fri Feb 22 00:15:52 2013
@@ -0,0 +1,294 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.LockTimeoutException;
+import org.apache.hadoop.hbase.TableNotDisabledException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests the default table lock manager
+ */
+@Category(MediumTests.class)
+public class TestTableLockManager {
+
+  private static final Log LOG =
+    LogFactory.getLog(TestTableLockManager.class);
+
+  private static final byte[] TABLE_NAME = Bytes.toBytes("TestTableLevelLocks");
+
+  private static final byte[] FAMILY = Bytes.toBytes("f1");
+
+  private static final byte[] NEW_FAMILY = Bytes.toBytes("f2");
+
+  private final HBaseTestingUtility TEST_UTIL =
+    new HBaseTestingUtility();
+
+  private static final CountDownLatch deleteColumn = new CountDownLatch(1);
+  private static final CountDownLatch addColumn = new CountDownLatch(1);
+
+  public void prepareMiniCluster() throws Exception {
+    TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
+    TEST_UTIL.startMiniCluster(2);
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+  }
+
+  public void prepareMiniZkCluster() throws Exception {
+    TEST_UTIL.startMiniZKCluster(1);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test(timeout = 600000)
+  public void testLockTimeoutException() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setInt(TableLockManager.TABLE_WRITE_LOCK_TIMEOUT_MS, 3000);
+    prepareMiniCluster();
+    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
+    master.getCoprocessorHost().load(TestLockTimeoutExceptionMasterObserver.class,
+        0, TEST_UTIL.getConfiguration());
+
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    Future<Object> shouldFinish = executor.submit(new Callable<Object>() {
+      @Override
+      public Object call() throws Exception {
+        HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+        admin.deleteColumn(TABLE_NAME, FAMILY);
+        return null;
+      }
+    });
+
+    deleteColumn.await();
+
+    try {
+      HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+      admin.addColumn(TABLE_NAME, new HColumnDescriptor(NEW_FAMILY));
+      fail("Was expecting TableLockTimeoutException");
+    } catch (LockTimeoutException ex) {
+      //expected
+    }
+    shouldFinish.get();
+  }
+
+  public static class TestLockTimeoutExceptionMasterObserver extends BaseMasterObserver {
+    @Override
+    public void preDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment>
ctx,
+        byte[] tableName, byte[] c) throws IOException {
+      deleteColumn.countDown();
+    }
+    @Override
+    public void postDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment>
ctx,
+        byte[] tableName, byte[] c) throws IOException {
+      Threads.sleep(10000);
+    }
+
+    @Override
+    public void preAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
+        byte[] tableName, HColumnDescriptor column) throws IOException {
+      fail("Add column should have timeouted out for acquiring the table lock");
+    }
+  }
+
+  @Test(timeout = 600000)
+  public void testAlterAndDisable() throws Exception {
+    prepareMiniCluster();
+    // Send a request to alter a table, then sleep during
+    // the alteration phase. In the mean time, from another
+    // thread, send a request to disable, and then delete a table.
+
+    HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
+    master.getCoprocessorHost().load(TestAlterAndDisableMasterObserver.class,
+        0, TEST_UTIL.getConfiguration());
+
+    ExecutorService executor = Executors.newFixedThreadPool(2);
+    Future<Object> alterTableFuture = executor.submit(new Callable<Object>()
{
+      @Override
+      public Object call() throws Exception {
+        HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+        admin.addColumn(TABLE_NAME, new HColumnDescriptor(NEW_FAMILY));
+        LOG.info("Added new column family");
+        HTableDescriptor tableDesc = admin.getTableDescriptor(TABLE_NAME);
+        assertTrue(tableDesc.getFamiliesKeys().contains(NEW_FAMILY));
+        return null;
+      }
+    });
+    Future<Object> disableTableFuture = executor.submit(new Callable<Object>()
{
+      @Override
+      public Object call() throws Exception {
+        HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+        admin.disableTable(TABLE_NAME);
+        assertTrue(admin.isTableDisabled(TABLE_NAME));
+        admin.deleteTable(TABLE_NAME);
+        assertFalse(admin.tableExists(TABLE_NAME));
+        return null;
+      }
+    });
+
+    try {
+      disableTableFuture.get();
+      alterTableFuture.get();
+    } catch (ExecutionException e) {
+      if (e.getCause() instanceof AssertionError) {
+        throw (AssertionError) e.getCause();
+      }
+      throw e;
+    }
+  }
+
+  public static class TestAlterAndDisableMasterObserver extends BaseMasterObserver {
+    @Override
+    public void preAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
+        byte[] tableName, HColumnDescriptor column) throws IOException {
+      LOG.debug("addColumn called");
+      addColumn.countDown();
+    }
+
+    @Override
+    public void postAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment>
ctx,
+        byte[] tableName, HColumnDescriptor column) throws IOException {
+      Threads.sleep(6000);
+      try {
+        ctx.getEnvironment().getMasterServices().checkTableModifiable(tableName);
+      } catch(TableNotDisabledException expected) {
+        //pass
+        return;
+      } catch(IOException ex) {
+      }
+      fail("was expecting the table to be enabled");
+    }
+
+    @Override
+    public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
+        byte[] tableName) throws IOException {
+      try {
+        LOG.debug("Waiting for addColumn to be processed first");
+        //wait for addColumn to be processed first
+        addColumn.await();
+        LOG.debug("addColumn started, we can continue");
+      } catch (InterruptedException ex) {
+        LOG.warn("Sleep interrupted while waiting for addColumn countdown");
+      }
+    }
+
+    @Override
+    public void postDisableTableHandler(ObserverContext<MasterCoprocessorEnvironment>
ctx,
+        byte[] tableName) throws IOException {
+      Threads.sleep(3000);
+    }
+  }
+
+  @Test(timeout = 600000)
+  public void testDelete() throws Exception {
+    prepareMiniCluster();
+
+    HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+    admin.disableTable(TABLE_NAME);
+    admin.deleteTable(TABLE_NAME);
+
+    //ensure that znode for the table node has been deleted
+    ZooKeeperWatcher zkWatcher = TEST_UTIL.getZooKeeperWatcher();
+
+    assertTrue(ZKUtil.checkExists(zkWatcher,
+        ZKUtil.joinZNode(zkWatcher.tableLockZNode, Bytes.toString(TABLE_NAME))) < 0);
+
+  }
+
+
+  @Test(timeout = 600000)
+  public void testReapAllTableLocks() throws Exception {
+    prepareMiniZkCluster();
+    ServerName serverName = new ServerName("localhost:10000", 0);
+    final TableLockManager lockManager = TableLockManager.createTableLockManager(
+        TEST_UTIL.getConfiguration(), TEST_UTIL.getZooKeeperWatcher(), serverName);
+
+    String tables[] = {"table1", "table2", "table3", "table4"};
+    ExecutorService executor = Executors.newFixedThreadPool(6);
+
+    final CountDownLatch writeLocksObtained = new CountDownLatch(4);
+    final CountDownLatch writeLocksAttempted = new CountDownLatch(10);
+    //TODO: read lock tables
+
+    //6 threads will be stuck waiting for the table lock
+    for (int i = 0; i < tables.length; i++) {
+      final String table = tables[i];
+      for (int j = 0; j < i+1; j++) { //i+1 write locks attempted for table[i]
+        executor.submit(new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            writeLocksAttempted.countDown();
+            lockManager.writeLock(Bytes.toBytes(table), "testReapAllTableLocks").acquire();
+            writeLocksObtained.countDown();
+            return null;
+          }
+        });
+      }
+    }
+
+    writeLocksObtained.await();
+    writeLocksAttempted.await();
+
+    //now reap all table locks
+    lockManager.reapAllTableWriteLocks();
+
+    TEST_UTIL.getConfiguration().setInt(TableLockManager.TABLE_WRITE_LOCK_TIMEOUT_MS, 0);
+    TableLockManager zeroTimeoutLockManager = TableLockManager.createTableLockManager(
+          TEST_UTIL.getConfiguration(), TEST_UTIL.getZooKeeperWatcher(), serverName);
+
+    //should not throw table lock timeout exception
+    zeroTimeoutLockManager.writeLock(Bytes.toBytes(tables[tables.length -1]), "zero timeout")
+      .acquire();
+
+    executor.shutdownNow();
+  }
+
+}

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/lock/TestZKInterProcessReadWriteLock.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/lock/TestZKInterProcessReadWriteLock.java?rev=1448867&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/lock/TestZKInterProcessReadWriteLock.java
(added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/lock/TestZKInterProcessReadWriteLock.java
Fri Feb 22 00:15:52 2013
@@ -0,0 +1,359 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper.lock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DaemonThreadFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.InterProcessLock;
+import org.apache.hadoop.hbase.InterProcessLock.MetadataHandler;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.MultithreadedTestUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+@Category(MediumTests.class)
+public class TestZKInterProcessReadWriteLock {
+
+  private static final Log LOG =
+      LogFactory.getLog(TestZKInterProcessReadWriteLock.class);
+
+  private static final HBaseTestingUtility TEST_UTIL =
+      new HBaseTestingUtility();
+
+  private static final int NUM_THREADS = 10;
+
+  private static Configuration conf;
+
+  private final AtomicBoolean isLockHeld = new AtomicBoolean(false);
+  private final ExecutorService executor =
+      Executors.newFixedThreadPool(NUM_THREADS,
+          new DaemonThreadFactory("TestZKInterProcessReadWriteLock-"));
+
+  @BeforeClass
+  public static void beforeAllTests() throws Exception {
+    conf = TEST_UTIL.getConfiguration();
+    TEST_UTIL.startMiniZKCluster();
+    conf.setInt(HConstants.ZK_SESSION_TIMEOUT, 1000);
+    ZooKeeperWatcher zkw = getZooKeeperWatcher("setup");
+    ZKUtil.createWithParents(zkw, zkw.tableLockZNode);
+  }
+
+  @AfterClass
+  public static void afterAllTests() throws Exception {
+    TEST_UTIL.shutdownMiniZKCluster();
+  }
+
+  @After
+  public void tearDown() {
+    executor.shutdown();
+  }
+
+  private static ZooKeeperWatcher getZooKeeperWatcher(String desc)
+  throws IOException {
+    return TEST_UTIL.getZooKeeperWatcher();
+  }
+
+
+  @Test(timeout = 30000)
+  public void testWriteLockExcludesWriters() throws Exception {
+    final String testName = "testWriteLockExcludesWriters";
+    final ZKInterProcessReadWriteLock readWriteLock =
+        getReadWriteLock(testName);
+    List<Future<Void>> results = Lists.newArrayList();
+    for (int i = 0; i < NUM_THREADS; ++i) {
+      final String threadDesc = testName + i;
+      results.add(executor.submit(new Callable<Void>() {
+        @Override
+        public Void call() throws IOException {
+          ZKInterProcessWriteLock writeLock =
+              readWriteLock.writeLock(Bytes.toBytes(threadDesc));
+          try {
+            writeLock.acquire();
+            try {
+              // No one else should hold the lock
+              assertTrue(isLockHeld.compareAndSet(false, true));
+              Thread.sleep(1000);
+              // No one else should have released the lock
+              assertTrue(isLockHeld.compareAndSet(true, false));
+            } finally {
+              isLockHeld.set(false);
+              writeLock.release();
+            }
+          } catch (InterruptedException e) {
+            LOG.warn(threadDesc + " interrupted", e);
+            Thread.currentThread().interrupt();
+            throw new InterruptedIOException();
+          }
+          return null;
+        }
+      }));
+
+    }
+    MultithreadedTestUtil.assertOnFutures(results);
+  }
+
+  @Test(timeout = 30000)
+  public void testReadLockDoesNotExcludeReaders() throws Exception {
+    final String testName = "testReadLockDoesNotExcludeReaders";
+    final ZKInterProcessReadWriteLock readWriteLock =
+        getReadWriteLock(testName);
+    final CountDownLatch locksAcquiredLatch = new CountDownLatch(NUM_THREADS);
+    final AtomicInteger locksHeld = new AtomicInteger(0);
+    List<Future<Void>> results = Lists.newArrayList();
+    for (int i = 0; i < NUM_THREADS; ++i) {
+      final String threadDesc = testName + i;
+      results.add(executor.submit(new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          ZKInterProcessReadLock readLock =
+              readWriteLock.readLock(Bytes.toBytes(threadDesc));
+          readLock.acquire();
+          try {
+            locksHeld.incrementAndGet();
+            locksAcquiredLatch.countDown();
+            Thread.sleep(1000);
+          } finally {
+            readLock.release();
+            locksHeld.decrementAndGet();
+          }
+          return null;
+        }
+      }));
+    }
+    locksAcquiredLatch.await();
+    assertEquals(locksHeld.get(), NUM_THREADS);
+    MultithreadedTestUtil.assertOnFutures(results);
+  }
+
+  @Test(timeout = 3000)
+  public void testReadLockExcludesWriters() throws Exception {
+    // Submit a read lock request first
+    // Submit a write lock request second
+    final String testName = "testReadLockExcludesWriters";
+    List<Future<Void>> results = Lists.newArrayList();
+    final CountDownLatch readLockAcquiredLatch = new CountDownLatch(1);
+    Callable<Void> acquireReadLock = new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        final String threadDesc = testName + "-acquireReadLock";
+        ZKInterProcessReadLock readLock =
+            getReadWriteLock(testName).readLock(Bytes.toBytes(threadDesc));
+        readLock.acquire();
+        try {
+          assertTrue(isLockHeld.compareAndSet(false, true));
+          readLockAcquiredLatch.countDown();
+          Thread.sleep(1000);
+        } finally {
+          isLockHeld.set(false);
+          readLock.release();
+        }
+        return null;
+      }
+    };
+    Callable<Void> acquireWriteLock = new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        final String threadDesc = testName + "-acquireWriteLock";
+        ZKInterProcessWriteLock writeLock =
+            getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
+        readLockAcquiredLatch.await();
+        assertTrue(isLockHeld.get());
+        writeLock.acquire();
+        try {
+          assertFalse(isLockHeld.get());
+        } finally {
+          writeLock.release();
+        }
+        return null;
+      }
+    };
+    results.add(executor.submit(acquireReadLock));
+    results.add(executor.submit(acquireWriteLock));
+    MultithreadedTestUtil.assertOnFutures(results);
+  }
+
+  private static ZKInterProcessReadWriteLock getReadWriteLock(String testName)
+      throws IOException {
+    MetadataHandler handler = new MetadataHandler() {
+      @Override
+      public void handleMetadata(byte[] ownerMetadata) {
+        LOG.info("Lock info: " + Bytes.toString(ownerMetadata));
+      }
+    };
+    ZooKeeperWatcher zkWatcher = getZooKeeperWatcher(testName);
+    String znode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, testName);
+
+    return new ZKInterProcessReadWriteLock(zkWatcher, znode, handler);
+  }
+
+  @Test(timeout = 30000)
+  public void testWriteLockExcludesReaders() throws Exception {
+    // Submit a read lock request first
+    // Submit a write lock request second
+    final String testName = "testReadLockExcludesWriters";
+    List<Future<Void>> results = Lists.newArrayList();
+    final CountDownLatch writeLockAcquiredLatch = new CountDownLatch(1);
+    Callable<Void> acquireWriteLock = new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        final String threadDesc = testName + "-acquireWriteLock";
+        ZKInterProcessWriteLock writeLock =
+            getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
+        writeLock.acquire();
+        try {
+          writeLockAcquiredLatch.countDown();
+          assertTrue(isLockHeld.compareAndSet(false, true));
+          Thread.sleep(1000);
+        } finally {
+          isLockHeld.set(false);
+          writeLock.release();
+        }
+        return null;
+      }
+    };
+    Callable<Void> acquireReadLock = new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        final String threadDesc = testName + "-acquireReadLock";
+        ZKInterProcessReadLock readLock =
+            getReadWriteLock(testName).readLock(Bytes.toBytes(threadDesc));
+        writeLockAcquiredLatch.await();
+        readLock.acquire();
+        try {
+          assertFalse(isLockHeld.get());
+        } finally {
+          readLock.release();
+        }
+        return null;
+      }
+    };
+    results.add(executor.submit(acquireWriteLock));
+    results.add(executor.submit(acquireReadLock));
+    MultithreadedTestUtil.assertOnFutures(results);
+  }
+
+  @Test(timeout = 60000)
+  public void testTimeout() throws Exception {
+    final String testName = "testTimeout";
+    final CountDownLatch lockAcquiredLatch = new CountDownLatch(1);
+    Callable<Void> shouldHog = new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        final String threadDesc = testName + "-shouldHog";
+        ZKInterProcessWriteLock lock =
+            getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
+        lock.acquire();
+        lockAcquiredLatch.countDown();
+        Thread.sleep(10000);
+        lock.release();
+        return null;
+      }
+    };
+    Callable<Void> shouldTimeout = new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        final String threadDesc = testName + "-shouldTimeout";
+        ZKInterProcessWriteLock lock =
+            getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
+        lockAcquiredLatch.await();
+        assertFalse(lock.tryAcquire(5000));
+        return null;
+      }
+    };
+    Callable<Void> shouldAcquireLock = new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        final String threadDesc = testName + "-shouldAcquireLock";
+        ZKInterProcessWriteLock lock =
+            getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
+        lockAcquiredLatch.await();
+        assertTrue(lock.tryAcquire(30000));
+        lock.release();
+        return null;
+      }
+    };
+    List<Future<Void>> results = Lists.newArrayList();
+    results.add(executor.submit(shouldHog));
+    results.add(executor.submit(shouldTimeout));
+    results.add(executor.submit(shouldAcquireLock));
+    MultithreadedTestUtil.assertOnFutures(results);
+  }
+
+  @Test(timeout = 60000)
+  public void testMultipleClients() throws Exception {
+    //tests lock usage from multiple zookeeper clients with different sessions.
+    //acquire one read lock, then one write lock
+    final String testName = "testMultipleClients";
+
+    //different zookeeper sessions with separate identifiers
+    ZooKeeperWatcher zkWatcher1 = new ZooKeeperWatcher(conf, "testMultipleClients-1", null);
+    ZooKeeperWatcher zkWatcher2 = new ZooKeeperWatcher(conf, "testMultipleClients-2", null);
+
+    String znode = ZKUtil.joinZNode(zkWatcher1.tableLockZNode, testName);
+
+    ZKInterProcessReadWriteLock clientLock1
+      = new ZKInterProcessReadWriteLock(zkWatcher1, znode, null);
+    ZKInterProcessReadWriteLock clientLock2
+      = new ZKInterProcessReadWriteLock(zkWatcher2, znode, null);
+
+    InterProcessLock lock1 = clientLock1.readLock(Bytes.toBytes("client1"));
+    lock1.acquire();
+
+    //try to acquire, but it will timeout. We are testing whether this will cause any problems
+    //due to the read lock being from another client
+    InterProcessLock lock2 = clientLock2.writeLock(Bytes.toBytes("client2"));
+    assertFalse(lock2.tryAcquire(1000));
+
+    lock1.release();
+
+    //this time it will acquire
+    assertTrue(lock2.tryAcquire(5000));
+    lock2.release();
+    zkWatcher1.close();
+    zkWatcher2.close();
+  }
+}



Mime
View raw message