helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject git commit: [HELIX-268] Tests for the Atomic API
Date Thu, 10 Oct 2013 00:43:08 GMT
Updated Branches:
  refs/heads/helix-logical-model 558b42c61 -> 101fe1e9a


[HELIX-268] Tests for the Atomic API


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/101fe1e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/101fe1e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/101fe1e9

Branch: refs/heads/helix-logical-model
Commit: 101fe1e9a4c7d102643445d533dc0c59a245b4cc
Parents: 558b42c
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Wed Oct 9 17:42:27 2013 -0700
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Wed Oct 9 17:42:27 2013 -0700

----------------------------------------------------------------------
 .../helix/api/accessor/ClusterAccessor.java     |  14 +-
 .../helix/api/accessor/ParticipantAccessor.java |   3 +-
 .../java/org/apache/helix/lock/HelixLock.java   |   6 +
 .../org/apache/helix/lock/zk/ZKHelixLock.java   |  19 ++
 .../api/accessor/TestAccessorRecreate.java      | 162 +++++++++++++++
 .../helix/api/accessor/TestAtomicAccessors.java | 200 +++++++++++++++++++
 .../apache/helix/lock/zk/TestZKHelixLock.java   | 117 +++++++++++
 7 files changed, 512 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/101fe1e9/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index f283f74..ea7dbb9 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -118,20 +118,19 @@ public class ClusterAccessor {
     }
     _accessor.createProperty(_keyBuilder.constraints(), null);
     for (ClusterConstraints constraints : cluster.getConstraintMap().values()) {
-      _accessor.createProperty(_keyBuilder.constraint(constraints.getType().toString()),
-          constraints);
+      _accessor.setProperty(_keyBuilder.constraint(constraints.getType().toString()), constraints);
     }
     ClusterConfiguration clusterConfig = ClusterConfiguration.from(cluster.getUserConfig());
     if (cluster.autoJoinAllowed()) {
       clusterConfig.setAutoJoinAllowed(cluster.autoJoinAllowed());
     }
     if (cluster.getStats() != null && !cluster.getStats().getMapFields().isEmpty())
{
-      _accessor.createProperty(_keyBuilder.persistantStat(), cluster.getStats());
+      _accessor.setProperty(_keyBuilder.persistantStat(), cluster.getStats());
     }
     if (cluster.isPaused()) {
       pauseCluster();
     }
-    _accessor.createProperty(_keyBuilder.clusterConfig(), clusterConfig);
+    _accessor.setProperty(_keyBuilder.clusterConfig(), clusterConfig);
 
     return true;
   }
@@ -653,7 +652,7 @@ public class ClusterAccessor {
       configuration.addNamespacedConfig(resource.getRebalancerConfig().toNamespacedConfig());
       configuration.setBucketSize(resource.getBucketSize());
       configuration.setBatchMessageMode(resource.getBatchMessageMode());
-      _accessor.createProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+      _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
     }
 
     // Create an IdealState from a RebalancerConfig (if the resource is partitioned)
@@ -662,7 +661,7 @@ public class ClusterAccessor {
         ResourceAccessor.rebalancerConfigToIdealState(rebalancerConfig, resource.getBucketSize(),
             resource.getBatchMessageMode());
     if (idealState != null) {
-      _accessor.createProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
+      _accessor.setProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
     }
     return true;
   }
@@ -790,8 +789,7 @@ public class ClusterAccessor {
     for (PartitionId partitionId : disabledPartitions) {
       instanceConfig.setInstanceEnabledForPartition(partitionId, false);
     }
-    _accessor.createProperty(_keyBuilder.instanceConfig(participantId.stringify()), instanceConfig);
-    _accessor.createProperty(_keyBuilder.messages(participantId.stringify()), null);
+    _accessor.setProperty(_keyBuilder.instanceConfig(participantId.stringify()), instanceConfig);
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/101fe1e9/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index 90fd986..4e7d3c2 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@ -747,7 +747,8 @@ public class ParticipantAccessor {
    * @param participantId the participant for which to generate paths
    * @return list of required paths as strings
    */
-  static List<String> getRequiredPaths(PropertyKey.Builder keyBuilder, ParticipantId
participantId) {
+  private static List<String> getRequiredPaths(PropertyKey.Builder keyBuilder,
+      ParticipantId participantId) {
     List<String> paths = Lists.newArrayList();
     paths.add(keyBuilder.instanceConfig(participantId.stringify()).getPath());
     paths.add(keyBuilder.messages(participantId.stringify()).getPath());

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/101fe1e9/helix-core/src/main/java/org/apache/helix/lock/HelixLock.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/lock/HelixLock.java b/helix-core/src/main/java/org/apache/helix/lock/HelixLock.java
index 79c15d0..a567a5c 100644
--- a/helix-core/src/main/java/org/apache/helix/lock/HelixLock.java
+++ b/helix-core/src/main/java/org/apache/helix/lock/HelixLock.java
@@ -34,4 +34,10 @@ public interface HelixLock {
    * @return true if the lock was released, false if it could not be released
    */
   public boolean unlock();
+
+  /**
+   * Check if this object is blocked waiting on the lock
+   * @return true if blocked, false otherwise
+   */
+  public boolean isBlocked();
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/101fe1e9/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
index 7ddbce1..70614b3 100644
--- a/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
+++ b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
@@ -45,6 +45,7 @@ public class ZKHelixLock implements HelixLock {
   private final ZkClient _zkClient;
   private volatile boolean _locked;
   private volatile boolean _canceled;
+  private volatile boolean _blocked;
 
   private final LockListener _listener = new LockListener() {
     @Override
@@ -79,6 +80,7 @@ public class ZKHelixLock implements HelixLock {
     _writeLock = new WriteLock(zookeeper, _rootPath, null, _listener);
     _locked = false;
     _canceled = false;
+    _blocked = false;
   }
 
   /**
@@ -86,6 +88,7 @@ public class ZKHelixLock implements HelixLock {
    * @return true if the lock succeeded, false if it failed, as is the case if the connection
to ZK
    *         is lost
    */
+  @Override
   public synchronized boolean lock() {
     _canceled = false;
     if (_locked) {
@@ -99,6 +102,7 @@ public class ZKHelixLock implements HelixLock {
       if (acquired) {
         _locked = true;
       } else {
+        setBlocked(true);
         wait();
       }
     } catch (KeeperException e) {
@@ -108,6 +112,7 @@ public class ZKHelixLock implements HelixLock {
       LOG.error("Interrupted while acquiring a lock on " + _rootPath);
       _canceled = true;
     }
+    setBlocked(false);
     return _locked;
   }
 
@@ -115,6 +120,7 @@ public class ZKHelixLock implements HelixLock {
    * Unlock the scope
    * @return true if unlock executed, false otherwise
    */
+  @Override
   public synchronized boolean unlock() {
     try {
       _writeLock.unlock();
@@ -127,6 +133,19 @@ public class ZKHelixLock implements HelixLock {
     return true;
   }
 
+  @Override
+  public synchronized boolean isBlocked() {
+    return _blocked;
+  }
+
+  /**
+   * Set if this the lock method is currently blocked
+   * @param isBlocked true if blocked, false otherwise
+   */
+  protected synchronized void setBlocked(boolean isBlocked) {
+    _blocked = isBlocked;
+  }
+
   public static void main(String[] args) {
     ZkClient zkClient = new ZkClient("localhost:2199");
     ClusterId clusterId = ClusterId.from("exampleCluster");

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/101fe1e9/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java
b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java
new file mode 100644
index 0000000..a92f12a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java
@@ -0,0 +1,162 @@
+package org.apache.helix.api.accessor;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.config.UserConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class TestAccessorRecreate extends ZkUnitTestBase {
+  private static final Logger LOG = Logger.getLogger(TestAccessorRecreate.class);
+
+  /**
+   * This test just makes sure that a cluster is only recreated if it is incomplete. This
is not
+   * directly testing atomicity, but rather a use case where a machine died while creating
the
+   * cluster.
+   */
+  @Test
+  public void testRecreateCluster() {
+    final String MODIFIER = "modifier";
+    final ClusterId clusterId = ClusterId.from("testCluster");
+
+    // connect
+    boolean connected = _gZkClient.waitUntilConnected(30000, TimeUnit.MILLISECONDS);
+    if (!connected) {
+      LOG.warn("Connection not established");
+      return;
+    }
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    HelixDataAccessor helixAccessor = new ZKHelixDataAccessor(clusterId.stringify(), baseAccessor);
+    ClusterAccessor accessor = new ClusterAccessor(clusterId, helixAccessor);
+
+    // create a cluster
+    boolean created = createCluster(clusterId, accessor, MODIFIER, 1);
+    Assert.assertTrue(created);
+
+    // read the cluster
+    Cluster clusterSnapshot = accessor.readCluster();
+    Assert.assertEquals(clusterSnapshot.getUserConfig().getIntField(MODIFIER, -1), 1);
+
+    // create a cluster with the same id
+    boolean created2 = createCluster(clusterId, accessor, MODIFIER, 2);
+    Assert.assertFalse(created2); // should fail since cluster exists
+
+    // remove a required property
+    helixAccessor.removeProperty(helixAccessor.keyBuilder().liveInstances());
+
+    // try again, should work this time
+    created2 = createCluster(clusterId, accessor, MODIFIER, 2);
+    Assert.assertTrue(created2);
+
+    // read the cluster again
+    clusterSnapshot = accessor.readCluster();
+    Assert.assertEquals(clusterSnapshot.getUserConfig().getIntField(MODIFIER, -1), 2);
+
+    accessor.dropCluster();
+  }
+
+  /**
+   * This test just makes sure that a participant is only recreated if it is incomplete.
This is not
+   * directly testing atomicity, but rather a use case where a machine died while creating
the
+   * participant.
+   */
+  @Test
+  public void testRecreateParticipant() {
+    final String MODIFIER = "modifier";
+    final ClusterId clusterId = ClusterId.from("testCluster");
+    final ParticipantId participantId = ParticipantId.from("testParticipant");
+
+    // connect
+    boolean connected = _gZkClient.waitUntilConnected(30000, TimeUnit.MILLISECONDS);
+    if (!connected) {
+      LOG.warn("Connection not established");
+      return;
+    }
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    HelixDataAccessor helixAccessor = new ZKHelixDataAccessor(clusterId.stringify(), baseAccessor);
+    ClusterAccessor accessor = new ClusterAccessor(clusterId, helixAccessor);
+
+    // create the cluster
+    boolean clusterCreated = createCluster(clusterId, accessor, MODIFIER, 0);
+    Assert.assertTrue(clusterCreated);
+
+    // create the participant
+    boolean created = createParticipant(participantId, accessor, MODIFIER, 1);
+    Assert.assertTrue(created);
+
+    // read the participant
+    ParticipantAccessor participantAccessor = new ParticipantAccessor(helixAccessor);
+    Participant participantSnapshot = participantAccessor.readParticipant(participantId);
+    Assert.assertEquals(participantSnapshot.getUserConfig().getIntField(MODIFIER, -1), 1);
+
+    // create a participant with the same id
+    boolean created2 = createParticipant(participantId, accessor, MODIFIER, 2);
+    Assert.assertFalse(created2); // should fail since participant exists
+
+    // remove a required property
+    helixAccessor.removeProperty(helixAccessor.keyBuilder().messages(participantId.stringify()));
+
+    // try again, should work this time
+    created2 = createParticipant(participantId, accessor, MODIFIER, 2);
+    Assert.assertTrue(created2);
+
+    // read the cluster again
+    participantSnapshot = participantAccessor.readParticipant(participantId);
+    Assert.assertEquals(participantSnapshot.getUserConfig().getIntField(MODIFIER, -1), 2);
+
+    accessor.dropCluster();
+  }
+
+  private boolean createCluster(ClusterId clusterId, ClusterAccessor accessor, String modifierName,
+      int modifierValue) {
+    // create a cluster
+    UserConfig userConfig = new UserConfig(Scope.cluster(clusterId));
+    userConfig.setIntField(modifierName, modifierValue);
+    ClusterConfig cluster = new ClusterConfig.Builder(clusterId).userConfig(userConfig).build();
+    return accessor.createCluster(cluster);
+  }
+
+  private boolean createParticipant(ParticipantId participantId, ClusterAccessor accessor,
+      String modifierName, int modifierValue) {
+    // create a participant
+    UserConfig userConfig = new UserConfig(Scope.participant(participantId));
+    userConfig.setIntField(modifierName, modifierValue);
+    ParticipantConfig participant =
+        new ParticipantConfig.Builder(participantId).hostName("host").port(0)
+            .userConfig(userConfig).build();
+    return accessor.addParticipantToCluster(participant);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/101fe1e9/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
new file mode 100644
index 0000000..8dbb43f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
@@ -0,0 +1,200 @@
+package org.apache.helix.api.accessor;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.State;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.config.UserConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.lock.HelixLock;
+import org.apache.helix.lock.HelixLockable;
+import org.apache.helix.lock.zk.ZKHelixLock;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Test that the atomic accessors behave atomically in response to interwoven updates.
+ */
+public class TestAtomicAccessors extends ZkUnitTestBase {
+  private static final long TIMEOUT = 30000L;
+  private static final long EXTRA_WAIT = 10000L;
+
+  @Test
+  public void testClusterUpdates() {
+    final ClusterId clusterId = ClusterId.from("testCluster");
+    final BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    final HelixDataAccessor helixAccessor =
+        new ZKHelixDataAccessor(clusterId.stringify(), baseAccessor);
+    final LockProvider lockProvider = new LockProvider();
+    final StateModelDefId stateModelDefId = StateModelDefId.from("FakeModel");
+    final State state = State.from("fake");
+    final int constraint1 = 10;
+    final int constraint2 = 11;
+    final String key1 = "key1";
+    final String key2 = "key2";
+
+    // set up the cluster (non-atomically since this concurrency comes later)
+    ClusterAccessor accessor = new ClusterAccessor(clusterId, helixAccessor);
+    ClusterConfig config = new ClusterConfig.Builder(clusterId).build();
+    boolean created = accessor.createCluster(config);
+    Assert.assertTrue(created);
+
+    // thread that will update the cluster in one way
+    Thread t1 = new Thread() {
+      @Override
+      public void run() {
+        UserConfig userConfig = new UserConfig(Scope.cluster(clusterId));
+        userConfig.setBooleanField(key1, true);
+        ClusterConfig.Delta delta =
+            new ClusterConfig.Delta(clusterId).addStateUpperBoundConstraint(
+                Scope.cluster(clusterId), stateModelDefId, state, constraint1).setUserConfig(
+                userConfig);
+        ClusterAccessor accessor =
+            new AtomicClusterAccessor(clusterId, helixAccessor, lockProvider);
+        accessor.updateCluster(delta);
+      }
+    };
+
+    // thread that will update the cluster in another way
+    Thread t2 = new Thread() {
+      @Override
+      public void run() {
+        UserConfig userConfig = new UserConfig(Scope.cluster(clusterId));
+        userConfig.setBooleanField(key2, true);
+        ClusterConfig.Delta delta =
+            new ClusterConfig.Delta(clusterId).addStateUpperBoundConstraint(
+                Scope.cluster(clusterId), stateModelDefId, state, constraint2).setUserConfig(
+                userConfig);
+        ClusterAccessor accessor =
+            new AtomicClusterAccessor(clusterId, helixAccessor, lockProvider);
+        accessor.updateCluster(delta);
+      }
+    };
+
+    // start the threads
+    t1.start();
+    t2.start();
+
+    // make sure the threads are done
+    long startTime = System.currentTimeMillis();
+    try {
+      t1.join(TIMEOUT);
+      t2.join(TIMEOUT);
+    } catch (InterruptedException e) {
+      Assert.fail(e.getMessage());
+      t1.interrupt();
+      t2.interrupt();
+    }
+    long endTime = System.currentTimeMillis();
+    if (endTime - startTime > TIMEOUT - EXTRA_WAIT) {
+      Assert.fail("Test timed out");
+      t1.interrupt();
+      t2.interrupt();
+    }
+
+    Assert.assertTrue(lockProvider.hasLockBlocked());
+
+    accessor.dropCluster();
+  }
+
+  /**
+   * A HelixLockable that returns an instrumented ZKHelixLock
+   */
+  private class LockProvider implements HelixLockable {
+    private HelixLock _firstLock = null;
+    private AtomicBoolean _hasSecondBlocked = new AtomicBoolean(false);
+
+    @Override
+    public synchronized HelixLock getLock(ClusterId clusterId, Scope<?> scope) {
+      return new MyLock(clusterId, scope, _gZkClient);
+    }
+
+    /**
+     * Check if a lock object has blocked
+     * @return true if a block happened, false otherwise
+     */
+    public synchronized boolean hasLockBlocked() {
+      return _hasSecondBlocked.get();
+    }
+
+    /**
+     * An instrumented ZKHelixLock
+     */
+    private class MyLock extends ZKHelixLock {
+      /**
+       * Instantiate a lock that instruments a ZKHelixLock
+       * @param clusterId the cluster to lock
+       * @param scope the scope to lock on
+       * @param zkClient an active ZooKeeper client
+       */
+      public MyLock(ClusterId clusterId, Scope<?> scope, ZkClient zkClient) {
+        super(clusterId, scope, zkClient);
+      }
+
+      @Override
+      public synchronized boolean lock() {
+        // synchronize here to ensure atomic set and so that the first lock is the first
one who
+        // gets to lock
+        if (_firstLock == null) {
+          _firstLock = this;
+        }
+        return super.lock();
+      }
+
+      @Override
+      public boolean unlock() {
+        if (_firstLock == this) {
+          // wait to unlock until another thread has blocked
+          synchronized (_hasSecondBlocked) {
+            if (!_hasSecondBlocked.get()) {
+              try {
+                _hasSecondBlocked.wait(TIMEOUT);
+              } catch (InterruptedException e) {
+              }
+            }
+          }
+        }
+        return super.unlock();
+      }
+
+      @Override
+      protected synchronized void setBlocked(boolean isBlocked) {
+        if (isBlocked) {
+          synchronized (_hasSecondBlocked) {
+            _hasSecondBlocked.set(true);
+            _hasSecondBlocked.notify();
+          }
+        }
+        super.setBlocked(isBlocked);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/101fe1e9/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java b/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java
new file mode 100644
index 0000000..4e023ba
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java
@@ -0,0 +1,117 @@
+package org.apache.helix.lock.zk;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.lock.HelixLock;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Tests that the Zookeeper-based Helix lock can acquire, block, and release as appropriate
+ */
+public class TestZKHelixLock extends ZkUnitTestBase {
+  @Test
+  public void basicTest() {
+    _gZkClient.waitUntilConnected(30000, TimeUnit.MILLISECONDS);
+    final AtomicBoolean t1Locked = new AtomicBoolean(false);
+    final AtomicBoolean t1Done = new AtomicBoolean(false);
+    final AtomicInteger field1 = new AtomicInteger(0);
+    final AtomicInteger field2 = new AtomicInteger(1);
+    final ClusterId clusterId = ClusterId.from("testCluster");
+    final HelixLock lock1 = new ZKHelixLock(clusterId, Scope.cluster(clusterId), _gZkClient);
+    final HelixLock lock2 = new ZKHelixLock(clusterId, Scope.cluster(clusterId), _gZkClient);
+
+    // thread 1: get a lock, set fields to 1
+    Thread t1 = new Thread() {
+      @Override
+      public void run() {
+        lock1.lock();
+        synchronized (t1Locked) {
+          t1Locked.set(true);
+          t1Locked.notify();
+        }
+        yield(); // if locking doesn't work, t2 will set the fields first
+        field1.set(1);
+        field2.set(1);
+        synchronized (t1Done) {
+          t1Done.set(true);
+          t1Done.notify();
+        }
+      }
+    };
+
+    // thread 2: wait for t1 to acquire the lock, get a lock, set fields to 2
+    Thread t2 = new Thread() {
+      @Override
+      public void run() {
+        synchronized (t1Locked) {
+          while (!t1Locked.get()) {
+            try {
+              t1Locked.wait();
+            } catch (InterruptedException e) {
+            }
+          }
+        }
+        lock2.lock();
+        field1.set(2);
+        field2.set(2);
+      }
+    };
+
+    // start the threads
+    t1.setPriority(Thread.MIN_PRIORITY);
+    t2.setPriority(Thread.MAX_PRIORITY);
+    t1.start();
+    t2.start();
+
+    // wait for t1 to finish setting fields
+    synchronized (t1Done) {
+      while (!t1Done.get()) {
+        try {
+          t1Done.wait();
+        } catch (InterruptedException e) {
+        }
+      }
+    }
+
+    // make sure both fields are 1
+    Assert.assertEquals(field1.get(), 1);
+    Assert.assertEquals(field2.get(), 1);
+
+    // unlock t1's lock after checking that t2 is blocked
+    Assert.assertTrue(lock2.isBlocked());
+    lock1.unlock();
+
+    try {
+      // wait for t2, make sure both fields are 2
+      t2.join(10000);
+      Assert.assertEquals(field1.get(), 2);
+      Assert.assertEquals(field2.get(), 2);
+    } catch (InterruptedException e) {
+    }
+  }
+}


Mime
View raw message