helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [36/53] [abbrv] git commit: [HELIX-268] Atomic API - Add a ZK lock and a skeleton AtomicClusterAccessor
Date Thu, 07 Nov 2013 01:19:44 GMT
[HELIX-268] Atomic API - Add a ZK lock and a skeleton AtomicClusterAccessor


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/61643b1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/61643b1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/61643b1d

Branch: refs/heads/master
Commit: 61643b1df4e70c77f510e935d130aa7887b1139f
Parents: e23a308
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Mon Oct 7 18:04:51 2013 -0700
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Wed Nov 6 13:17:36 2013 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/helix/api/Scope.java   |   8 +
 .../api/accessor/AtomicClusterAccessor.java     | 256 ++++++++++++++++
 .../api/accessor/AtomicParticipantAccessor.java | 207 +++++++++++++
 .../api/accessor/AtomicResourceAccessor.java    | 146 +++++++++
 .../helix/api/accessor/ClusterAccessor.java     | 122 ++++----
 .../helix/api/accessor/ParticipantAccessor.java | 121 ++++++--
 .../helix/api/accessor/ResourceAccessor.java    |  38 ++-
 .../java/org/apache/helix/lock/HelixLock.java   |  43 +++
 .../org/apache/helix/lock/HelixLockable.java    |  36 +++
 .../org/apache/helix/lock/zk/LockListener.java  |  39 +++
 .../apache/helix/lock/zk/ProtocolSupport.java   | 191 ++++++++++++
 .../org/apache/helix/lock/zk/WriteLock.java     | 294 +++++++++++++++++++
 .../org/apache/helix/lock/zk/ZKHelixLock.java   | 152 ++++++++++
 .../org/apache/helix/lock/zk/ZNodeName.java     | 113 +++++++
 .../helix/lock/zk/ZooKeeperOperation.java       |  38 +++
 .../helix/model/ClusterConfiguration.java       |  20 ++
 .../api/accessor/TestAccessorRecreate.java      | 162 ++++++++++
 .../helix/api/accessor/TestAtomicAccessors.java | 202 +++++++++++++
 .../apache/helix/lock/zk/TestZKHelixLock.java   | 117 ++++++++
 19 files changed, 2215 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/61643b1d/helix-core/src/main/java/org/apache/helix/api/Scope.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Scope.java b/helix-core/src/main/java/org/apache/helix/api/Scope.java
index 7dc217c..26e09a9 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Scope.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Scope.java
@@ -60,6 +60,14 @@ public class Scope<T extends Id> {
     return getType() + "{" + getScopedId() + "}";
   }
 
+  @Override
+  public boolean equals(Object that) {
+    if (that instanceof Scope) {
+      return this.toString().equals(that.toString());
+    }
+    return false;
+  }
+
   /**
    * Get the Helix entity type that this scope covers
    * @return scope type

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/61643b1d/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
new file mode 100644
index 0000000..1196770
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
@@ -0,0 +1,256 @@
+package org.apache.helix.api.accessor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.lock.HelixLock;
+import org.apache.helix.lock.HelixLockable;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * An atomic version of the ClusterAccessor. If atomic operations are required, use instances of
+ * this class. Atomicity is not guaranteed when using instances of ClusterAccessor alongside
+ * instances of this class. Furthermore, depending on the semantics of the lock, lock acquisition
+ * may fail, in which case users should handle the return value of each function if necessary.
+ */
+public class AtomicClusterAccessor extends ClusterAccessor {
+  private static final Logger LOG = Logger.getLogger(AtomicClusterAccessor.class);
+
+  private final HelixLockable _lockProvider;
+  private final HelixDataAccessor _accessor;
+  private final PropertyKey.Builder _keyBuilder;
+  private final ClusterId _clusterId;
+
+  /**
+   * Non-atomic instance to protect against reentrant locking via polymorphism
+   */
+  private final ClusterAccessor _clusterAccessor;
+
+  /**
+   * Instantiate the accessor
+   * @param clusterId the cluster to access
+   * @param accessor a HelixDataAccessor for the physical properties
+   * @param lockProvider a lock provider
+   */
+  public AtomicClusterAccessor(ClusterId clusterId, HelixDataAccessor accessor,
+      HelixLockable lockProvider) {
+    super(clusterId, accessor);
+    _lockProvider = lockProvider;
+    _accessor = accessor;
+    _keyBuilder = accessor.keyBuilder();
+    _clusterId = clusterId;
+    _clusterAccessor = new ClusterAccessor(clusterId, accessor);
+  }
+
+  @Override
+  public boolean createCluster(ClusterConfig cluster) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.cluster(_clusterId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return _clusterAccessor.createCluster(cluster);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public boolean dropCluster() {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.cluster(_clusterId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return _clusterAccessor.dropCluster();
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public Cluster readCluster() {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.cluster(_clusterId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return _clusterAccessor.readCluster();
+      } finally {
+        lock.unlock();
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public boolean addParticipantToCluster(ParticipantConfig participant) {
+    if (participant == null) {
+      LOG.error("Participant config cannot be null");
+      return false;
+    }
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participant.getId()));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return _clusterAccessor.addParticipantToCluster(participant);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public boolean dropParticipantFromCluster(ParticipantId participantId) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return _clusterAccessor.dropParticipantFromCluster(participantId);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public boolean addResourceToCluster(ResourceConfig resource) {
+    if (resource == null) {
+      LOG.error("Resource config cannot be null");
+      return false;
+    }
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resource.getId()));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return _clusterAccessor.addResourceToCluster(resource);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public boolean dropResourceFromCluster(ResourceId resourceId) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return _clusterAccessor.dropResourceFromCluster(resourceId);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public ClusterConfig updateCluster(ClusterConfig.Delta clusterDelta) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.cluster(_clusterId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return _clusterAccessor.updateCluster(clusterDelta);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Read resources atomically. This is resource-atomic, not cluster-atomic
+   */
+  @Override
+  public Map<ResourceId, Resource> readResources() {
+    // read resources individually instead of together to maintain the equality link between ideal
+    // state and resource config
+    Map<ResourceId, Resource> resources = Maps.newHashMap();
+    Set<String> idealStateNames =
+        Sets.newHashSet(_accessor.getChildNames(_keyBuilder.idealStates()));
+    Set<String> resourceConfigNames =
+        Sets.newHashSet(_accessor.getChildNames(_keyBuilder.resourceConfigs()));
+    resourceConfigNames.addAll(idealStateNames);
+    ResourceAccessor accessor = new AtomicResourceAccessor(_clusterId, _accessor, _lockProvider);
+    for (String resourceName : resourceConfigNames) {
+      ResourceId resourceId = ResourceId.from(resourceName);
+      Resource resource = accessor.readResource(resourceId);
+      if (resource != null) {
+        resources.put(resourceId, resource);
+      }
+    }
+    return resources;
+  }
+
+  /**
+   * Read participants atomically. This is participant-atomic, not cluster-atomic
+   */
+  @Override
+  public Map<ParticipantId, Participant> readParticipants() {
+    // read participants individually to keep configs consistent with current state and messages
+    Map<ParticipantId, Participant> participants = Maps.newHashMap();
+    ParticipantAccessor accessor =
+        new AtomicParticipantAccessor(_clusterId, _accessor, _lockProvider);
+    List<String> participantNames = _accessor.getChildNames(_keyBuilder.instanceConfigs());
+    for (String participantName : participantNames) {
+      ParticipantId participantId = ParticipantId.from(participantName);
+      Participant participant = accessor.readParticipant(participantId);
+      if (participant != null) {
+        participants.put(participantId, participant);
+      }
+    }
+    return participants;
+  }
+
+  @Override
+  public void initClusterStructure() {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.cluster(_clusterId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        _clusterAccessor.initClusterStructure();
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/61643b1d/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
new file mode 100644
index 0000000..05fb0ec
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
@@ -0,0 +1,207 @@
+package org.apache.helix.api.accessor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.MessageId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.lock.HelixLock;
+import org.apache.helix.lock.HelixLockable;
+import org.apache.helix.model.Message;
+import org.apache.log4j.Logger;
+
+/**
+ * An atomic version of the ParticipantAccessor. If atomic operations are required, use instances of
+ * this class. Atomicity is not guaranteed when using instances of ParticipantAccessor alongside
+ * instances of this class. Furthermore, depending on the semantics of the lock, lock acquisition
+ * may fail, in which case users should handle the return value of each function if necessary.
+ */
+public class AtomicParticipantAccessor extends ParticipantAccessor {
+  private static final Logger LOG = Logger.getLogger(AtomicParticipantAccessor.class);
+
+  private final ClusterId _clusterId;
+  private final HelixDataAccessor _accessor;
+  private final HelixLockable _lockProvider;
+
+  /**
+   * Non-atomic instance to protect against reentrant locking via polymorphism
+   */
+  private final ParticipantAccessor _participantAccessor;
+
+  /**
+   * Instantiate the accessor
+   * @param clusterId the cluster to access
+   * @param accessor a HelixDataAccessor for the physical properties
+   * @param lockProvider a lock provider
+   */
+  public AtomicParticipantAccessor(ClusterId clusterId, HelixDataAccessor accessor,
+      HelixLockable lockProvider) {
+    super(accessor);
+    _clusterId = clusterId;
+    _accessor = accessor;
+    _lockProvider = lockProvider;
+    _participantAccessor = new ParticipantAccessor(accessor);
+  }
+
+  @Override
+  boolean enableParticipant(ParticipantId participantId, boolean isEnabled) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return _participantAccessor.enableParticipant(participantId);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public Participant readParticipant(ParticipantId participantId) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return _participantAccessor.readParticipant(participantId);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public boolean setParticipant(ParticipantConfig participantConfig) {
+    if (participantConfig == null) {
+      LOG.error("participant config cannot be null");
+      return false;
+    }
+    HelixLock lock =
+        _lockProvider.getLock(_clusterId, Scope.participant(participantConfig.getId()));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return _participantAccessor.setParticipant(participantConfig);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public ParticipantConfig updateParticipant(ParticipantId participantId,
+      ParticipantConfig.Delta participantDelta) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return _participantAccessor.updateParticipant(participantId, participantDelta);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return null;
+  }
+
+  @Override
+  boolean dropParticipant(ParticipantId participantId) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return _participantAccessor.dropParticipant(participantId);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public void insertMessagesToParticipant(ParticipantId participantId,
+      Map<MessageId, Message> msgMap) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        _participantAccessor.insertMessagesToParticipant(participantId, msgMap);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return;
+  }
+
+  @Override
+  public void updateMessageStatus(ParticipantId participantId, Map<MessageId, Message> msgMap) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        _participantAccessor.updateMessageStatus(participantId, msgMap);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return;
+  }
+
+  @Override
+  public void deleteMessagesFromParticipant(ParticipantId participantId, Set<MessageId> msgIdSet) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        _participantAccessor.deleteMessagesFromParticipant(participantId, msgIdSet);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return;
+  }
+
+  @Override
+  public void initParticipantStructure(ParticipantId participantId) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        _participantAccessor.initParticipantStructure(participantId);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return;
+  }
+
+  @Override
+  protected ResourceAccessor resourceAccessor() {
+    return new AtomicResourceAccessor(_clusterId, _accessor, _lockProvider);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/61643b1d/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
new file mode 100644
index 0000000..4bb2ebe
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
@@ -0,0 +1,146 @@
+package org.apache.helix.api.accessor;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.lock.HelixLock;
+import org.apache.helix.lock.HelixLockable;
+import org.apache.log4j.Logger;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * An atomic version of the ResourceAccessor. If atomic operations are required, use instances of
+ * this class. Atomicity is not guaranteed when using instances of ResourceAccessor alongside
+ * instances of this class. Furthermore, depending on the semantics of the lock, lock acquisition
+ * may fail, in which case users should handle the return value of each function if necessary.
+ */
+public class AtomicResourceAccessor extends ResourceAccessor {
+  private static final Logger LOG = Logger.getLogger(AtomicResourceAccessor.class);
+
+  private final ClusterId _clusterId;
+  private final HelixDataAccessor _accessor;
+  private final HelixLockable _lockProvider;
+
+  /**
+   * Non-atomic instance to protect against reentrant locking via polymorphism
+   */
+  private final ResourceAccessor _resourceAccessor;
+
+  /**
+   * Instantiate the accessor
+   * @param clusterId the cluster to access
+   * @param accessor a HelixDataAccessor for the physical properties
+   * @param lockProvider a lock provider
+   */
+  public AtomicResourceAccessor(ClusterId clusterId, HelixDataAccessor accessor,
+      HelixLockable lockProvider) {
+    super(accessor);
+    _clusterId = clusterId;
+    _accessor = accessor;
+    _lockProvider = lockProvider;
+    _resourceAccessor = new ResourceAccessor(accessor);
+  }
+
+  @Override
+  public Resource readResource(ResourceId resourceId) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return _resourceAccessor.readResource(resourceId);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public ResourceConfig updateResource(ResourceId resourceId, ResourceConfig.Delta resourceDelta) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return _resourceAccessor.updateResource(resourceId, resourceDelta);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public boolean setRebalancerContext(ResourceId resourceId, RebalancerContext context) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return _resourceAccessor.setRebalancerContext(resourceId, context);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public boolean setResource(ResourceConfig resourceConfig) {
+    if (resourceConfig == null) {
+      LOG.error("resource config cannot be null");
+      return false;
+    }
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceConfig.getId()));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return _resourceAccessor.setResource(resourceConfig);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public boolean generateDefaultAssignment(ResourceId resourceId, int replicaCount,
+      String participantGroupTag) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.cluster(_clusterId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return _resourceAccessor.generateDefaultAssignment(resourceId, replicaCount,
+            participantGroupTag);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  protected ParticipantAccessor participantAccessor() {
+    return new AtomicParticipantAccessor(_clusterId, _accessor, _lockProvider);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/61643b1d/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index 8780115..abeb649 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -19,7 +19,7 @@ package org.apache.helix.api.accessor;
  * under the License.
  */
 
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -71,6 +71,7 @@ import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 import org.testng.internal.annotations.Sets;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 public class ClusterAccessor {
@@ -80,6 +81,11 @@ public class ClusterAccessor {
   private final PropertyKey.Builder _keyBuilder;
   private final ClusterId _clusterId;
 
+  /**
+   * Instantiate a cluster accessor
+   * @param clusterId the cluster to access
+   * @param accessor HelixDataAccessor for the physical store
+   */
   public ClusterAccessor(ClusterId clusterId, HelixDataAccessor accessor) {
     _accessor = accessor;
     _keyBuilder = accessor.keyBuilder();
@@ -91,11 +97,12 @@ public class ClusterAccessor {
    * @return true if created, false if creation failed
    */
   public boolean createCluster(ClusterConfig cluster) {
-    boolean created = _accessor.createProperty(_keyBuilder.cluster(), null);
-    if (!created) {
+    ClusterConfiguration configuration = _accessor.getProperty(_keyBuilder.clusterConfig());
+    if (configuration != null && isClusterStructureValid()) {
       LOG.error("Cluster already created. Aborting.");
       return false;
     }
+    clearClusterStructure();
     initClusterStructure();
     Map<StateModelDefId, StateModelDefinition> stateModelDefs = cluster.getStateModelMap();
     for (StateModelDefinition stateModelDef : stateModelDefs.values()) {
@@ -111,20 +118,19 @@ public class ClusterAccessor {
     }
     _accessor.createProperty(_keyBuilder.constraints(), null);
     for (ClusterConstraints constraints : cluster.getConstraintMap().values()) {
-      _accessor.createProperty(_keyBuilder.constraint(constraints.getType().toString()),
-          constraints);
+      _accessor.setProperty(_keyBuilder.constraint(constraints.getType().toString()), constraints);
     }
     ClusterConfiguration clusterConfig = ClusterConfiguration.from(cluster.getUserConfig());
     if (cluster.autoJoinAllowed()) {
       clusterConfig.setAutoJoinAllowed(cluster.autoJoinAllowed());
     }
     if (cluster.getStats() != null && !cluster.getStats().getMapFields().isEmpty()) {
-      _accessor.createProperty(_keyBuilder.persistantStat(), cluster.getStats());
+      _accessor.setProperty(_keyBuilder.persistantStat(), cluster.getStats());
     }
-    _accessor.createProperty(_keyBuilder.clusterConfig(), clusterConfig);
     if (cluster.isPaused()) {
       pauseCluster();
     }
+    _accessor.setProperty(_keyBuilder.clusterConfig(), clusterConfig);
 
     return true;
   }
@@ -155,6 +161,7 @@ public class ClusterAccessor {
       return false;
     }
     ClusterConfiguration configuration = ClusterConfiguration.from(config.getUserConfig());
+    configuration.setAutoJoinAllowed(config.autoJoinAllowed());
     _accessor.setProperty(_keyBuilder.clusterConfig(), configuration);
     Map<ConstraintType, ClusterConstraints> constraints = config.getConstraintMap();
     for (ConstraintType type : constraints.keySet()) {
@@ -199,9 +206,13 @@ public class ClusterAccessor {
 
   /**
    * read entire cluster data
-   * @return cluster snapshot
+   * @return cluster snapshot or null
    */
   public Cluster readCluster() {
+    if (!isClusterStructureValid()) {
+      LOG.error("Cluster is not fully set up");
+      return null;
+    }
     LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
 
     /**
@@ -275,10 +286,15 @@ public class ClusterAccessor {
   }
 
   /**
-   * Read all resource in the cluster
+   * Read all resources in the cluster
    * @return map of resource id to resource
    */
   public Map<ResourceId, Resource> readResources() {
+    if (!isClusterStructureValid()) {
+      LOG.error("Cluster is not fully set up yet!");
+      return Collections.emptyMap();
+    }
+
     /**
      * map of resource-id to ideal-state
      */
@@ -319,9 +335,14 @@ public class ClusterAccessor {
 
   /**
    * Read all participants in the cluster
-   * @return map of participant id to participant
+   * @return map of participant id to participant, or empty map
    */
   public Map<ParticipantId, Participant> readParticipants() {
+    if (!isClusterStructureValid()) {
+      LOG.error("Cluster is not fully set up yet!");
+      return Collections.emptyMap();
+    }
+
     /**
      * map of instance-id to instance-config
      */
@@ -445,7 +466,8 @@ public class ClusterAccessor {
    */
   public boolean addStat(final String statName) {
     if (!isClusterStructureValid()) {
-      throw new HelixException("cluster " + _clusterId + " is not setup yet");
+      LOG.error("cluster " + _clusterId + " is not setup yet");
+      return false;
     }
 
     String persistentStatsPath = _keyBuilder.persistantStat().getPath();
@@ -476,7 +498,8 @@ public class ClusterAccessor {
    */
   public boolean dropStat(final String statName) {
     if (!isClusterStructureValid()) {
-      throw new HelixException("cluster " + _clusterId + " is not setup yet");
+      LOG.error("cluster " + _clusterId + " is not setup yet");
+      return false;
     }
 
     String persistentStatsPath = _keyBuilder.persistantStat().getPath();
@@ -508,7 +531,8 @@ public class ClusterAccessor {
    */
   public boolean addAlert(final String alertName) {
     if (!isClusterStructureValid()) {
-      throw new HelixException("cluster " + _clusterId + " is not setup yet");
+      LOG.error("cluster " + _clusterId + " is not setup yet");
+      return false;
     }
 
     BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
@@ -544,7 +568,8 @@ public class ClusterAccessor {
    */
   public boolean dropAlert(final String alertName) {
     if (!isClusterStructureValid()) {
-      throw new HelixException("cluster " + _clusterId + " is not setup yet");
+      LOG.error("cluster " + _clusterId + " is not setup yet");
+      return false;
     }
 
     String alertsPath = _keyBuilder.alerts().getPath();
@@ -577,16 +602,18 @@ public class ClusterAccessor {
 
   /**
    * pause controller of cluster
+   * @return true if cluster was paused, false if pause failed or already paused
    */
-  public void pauseCluster() {
-    _accessor.createProperty(_keyBuilder.pause(), new PauseSignal("pause"));
+  public boolean pauseCluster() {
+    return _accessor.createProperty(_keyBuilder.pause(), new PauseSignal("pause"));
   }
 
   /**
    * resume controller of cluster
+   * @return true if resume succeeded, false otherwise
    */
-  public void resumeCluster() {
-    _accessor.removeProperty(_keyBuilder.pause());
+  public boolean resumeCluster() {
+    return _accessor.removeProperty(_keyBuilder.pause());
   }
 
   /**
@@ -632,7 +659,7 @@ public class ClusterAccessor {
       configuration.addNamespacedConfig(resource.getRebalancerConfig().toNamespacedConfig());
       configuration.setBucketSize(resource.getBucketSize());
       configuration.setBatchMessageMode(resource.getBatchMessageMode());
-      _accessor.createProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+      _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
     }
 
     // Create an IdealState from a RebalancerConfig (if the resource is partitioned)
@@ -641,7 +668,7 @@ public class ClusterAccessor {
         ResourceAccessor.rebalancerConfigToIdealState(rebalancerConfig, resource.getBucketSize(),
             resource.getBatchMessageMode());
     if (idealState != null) {
-      _accessor.createProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
+      _accessor.setProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
     }
     return true;
   }
@@ -667,18 +694,8 @@ public class ClusterAccessor {
    * @return true if valid or false otherwise
    */
   public boolean isClusterStructureValid() {
-    return isClusterStructureValid(_clusterId, _accessor.getBaseDataAccessor());
-  }
-
-  /**
-   * check if cluster structure is valid
-   * @param clusterId the cluster to check
-   * @param baseAccessor a base data accessor
-   * @return true if valid or false otherwise
-   */
-  private static boolean isClusterStructureValid(ClusterId clusterId,
-      BaseDataAccessor<?> baseAccessor) {
-    List<String> paths = getRequiredPaths(clusterId);
+    List<String> paths = getRequiredPaths(_keyBuilder);
+    BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
     if (baseAccessor != null) {
       boolean[] existsResults = baseAccessor.exists(paths, 0);
       for (boolean exists : existsResults) {
@@ -693,9 +710,9 @@ public class ClusterAccessor {
   /**
    * Create empty persistent properties to ensure that there is a valid cluster structure
    */
-  private void initClusterStructure() {
+  public void initClusterStructure() {
     BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
-    List<String> paths = getRequiredPaths(_clusterId);
+    List<String> paths = getRequiredPaths(_keyBuilder);
     for (String path : paths) {
       boolean status = baseAccessor.create(path, null, AccessOption.PERSISTENT);
       if (!status && LOG.isDebugEnabled()) {
@@ -705,14 +722,21 @@ public class ClusterAccessor {
   }
 
   /**
+   * Remove all but the top level cluster node; intended for reconstructing the cluster
+   */
+  private void clearClusterStructure() {
+    BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+    List<String> paths = getRequiredPaths(_keyBuilder);
+    baseAccessor.remove(paths, 0);
+  }
+
+  /**
    * Get all property paths that must be set for a cluster structure to be valid
-   * @param the cluster that the paths will be relative to
+   * @param keyBuilder a PropertyKey.Builder for the cluster
    * @return list of paths as strings
    */
-  private static List<String> getRequiredPaths(ClusterId clusterId) {
-    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterId.stringify());
-    List<String> paths = new ArrayList<String>();
-    paths.add(keyBuilder.cluster().getPath());
+  private static List<String> getRequiredPaths(PropertyKey.Builder keyBuilder) {
+    List<String> paths = Lists.newArrayList();
     paths.add(keyBuilder.clusterConfigs().getPath());
     paths.add(keyBuilder.instanceConfigs().getPath());
     paths.add(keyBuilder.propertyStore().getPath());
@@ -743,22 +767,19 @@ public class ClusterAccessor {
       return false;
     }
 
+    ParticipantAccessor participantAccessor = new ParticipantAccessor(_accessor);
     ParticipantId participantId = participant.getId();
-    if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) != null) {
+    InstanceConfig existConfig =
+        _accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify()));
+    if (existConfig != null && participantAccessor.isParticipantStructureValid(participantId)) {
       LOG.error("Config for participant: " + participantId + " already exists in cluster: "
           + _clusterId);
       return false;
     }
 
-    // add empty root ZNodes
-    List<PropertyKey> createKeys = new ArrayList<PropertyKey>();
-    createKeys.add(_keyBuilder.messages(participantId.stringify()));
-    createKeys.add(_keyBuilder.currentStates(participantId.stringify()));
-    createKeys.add(_keyBuilder.participantErrors(participantId.stringify()));
-    createKeys.add(_keyBuilder.statusUpdates(participantId.stringify()));
-    for (PropertyKey key : createKeys) {
-      _accessor.createProperty(key, null);
-    }
+    // clear and rebuild the participant structure
+    participantAccessor.clearParticipantStructure(participantId);
+    participantAccessor.initParticipantStructure(participantId);
 
     // add the config
     InstanceConfig instanceConfig = new InstanceConfig(participant.getId());
@@ -775,8 +796,7 @@ public class ClusterAccessor {
     for (PartitionId partitionId : disabledPartitions) {
       instanceConfig.setInstanceEnabledForPartition(partitionId, false);
     }
-    _accessor.createProperty(_keyBuilder.instanceConfig(participantId.stringify()), instanceConfig);
-    _accessor.createProperty(_keyBuilder.messages(participantId.stringify()), null);
+    _accessor.setProperty(_keyBuilder.instanceConfig(participantId.stringify()), instanceConfig);
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/61643b1d/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index c53bcd8..ac8f79d 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@ -86,34 +86,36 @@ public class ParticipantAccessor {
    * enable/disable a participant
    * @param participantId
    * @param isEnabled
+   * @return true if enable state succeeded, false otherwise
    */
-  void enableParticipant(ParticipantId participantId, boolean isEnabled) {
+  boolean enableParticipant(ParticipantId participantId, boolean isEnabled) {
     String participantName = participantId.stringify();
     if (_accessor.getProperty(_keyBuilder.instanceConfig(participantName)) == null) {
       LOG.error("Config for participant: " + participantId + " does NOT exist in cluster");
-      return;
+      return false;
     }
 
     InstanceConfig config = new InstanceConfig(participantName);
     config.setInstanceEnabled(isEnabled);
-    _accessor.updateProperty(_keyBuilder.instanceConfig(participantName), config);
-
+    return _accessor.updateProperty(_keyBuilder.instanceConfig(participantName), config);
   }
 
   /**
    * disable participant
    * @param participantId
+   * @return true if disabled successfully, false otherwise
    */
-  public void disableParticipant(ParticipantId participantId) {
-    enableParticipant(participantId, false);
+  public boolean disableParticipant(ParticipantId participantId) {
+    return enableParticipant(participantId, false);
   }
 
   /**
    * enable participant
    * @param participantId
+   * @return true if enabled successfully, false otherwise
    */
-  public void enableParticipant(ParticipantId participantId) {
-    enableParticipant(participantId, true);
+  public boolean enableParticipant(ParticipantId participantId) {
+    return enableParticipant(participantId, true);
   }
 
   /**
@@ -173,8 +175,9 @@ public class ParticipantAccessor {
    * @param participantId
    * @param resourceId
    * @param partitionIdSet
+   * @return true if enable state changed successfully, false otherwise
    */
-  void enablePartitionsForParticipant(final boolean enabled, final ParticipantId participantId,
+  boolean enablePartitionsForParticipant(final boolean enabled, final ParticipantId participantId,
       final ResourceId resourceId, final Set<PartitionId> partitionIdSet) {
     String participantName = participantId.stringify();
     String resourceName = resourceId.stringify();
@@ -183,7 +186,7 @@ public class ParticipantAccessor {
     PropertyKey instanceConfigKey = _keyBuilder.instanceConfig(participantName);
     if (_accessor.getProperty(instanceConfigKey) == null) {
       LOG.error("Config for participant: " + participantId + " does NOT exist in cluster");
-      return;
+      return false;
     }
 
     // check resource exist. warn if not
@@ -205,16 +208,13 @@ public class ParticipantAccessor {
       }
     }
 
-    // TODO merge list logic should go to znrecord updater
-    // update participantConfig
-    // could not use ZNRecordUpdater since it doesn't do listField merge/subtract
     BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
     final List<String> partitionNames = new ArrayList<String>();
     for (PartitionId partitionId : partitionIdSet) {
       partitionNames.add(partitionId.stringify());
     }
 
-    baseAccessor.update(instanceConfigKey.getPath(), new DataUpdater<ZNRecord>() {
+    return baseAccessor.update(instanceConfigKey.getPath(), new DataUpdater<ZNRecord>() {
       @Override
       public ZNRecord update(ZNRecord currentData) {
         if (currentData == null) {
@@ -248,10 +248,11 @@ public class ParticipantAccessor {
    * @param participantId
    * @param resourceId
    * @param disablePartitionIdSet
+   * @return true if disabled successfully, false otherwise
    */
-  public void disablePartitionsForParticipant(ParticipantId participantId, ResourceId resourceId,
-      Set<PartitionId> disablePartitionIdSet) {
-    enablePartitionsForParticipant(false, participantId, resourceId, disablePartitionIdSet);
+  public boolean disablePartitionsForParticipant(ParticipantId participantId,
+      ResourceId resourceId, Set<PartitionId> disablePartitionIdSet) {
+    return enablePartitionsForParticipant(false, participantId, resourceId, disablePartitionIdSet);
   }
 
   /**
@@ -259,10 +260,11 @@ public class ParticipantAccessor {
    * @param participantId
    * @param resourceId
    * @param enablePartitionIdSet
+   * @return true if enabled successfully, false otherwise
    */
-  public void enablePartitionsForParticipant(ParticipantId participantId, ResourceId resourceId,
+  public boolean enablePartitionsForParticipant(ParticipantId participantId, ResourceId resourceId,
       Set<PartitionId> enablePartitionIdSet) {
-    enablePartitionsForParticipant(true, participantId, resourceId, enablePartitionIdSet);
+    return enablePartitionsForParticipant(true, participantId, resourceId, enablePartitionIdSet);
   }
 
   /**
@@ -306,7 +308,7 @@ public class ParticipantAccessor {
     RunningInstance runningInstance = participant.getRunningInstance();
 
     // check that the resource exists
-    ResourceAccessor resourceAccessor = new ResourceAccessor(_accessor);
+    ResourceAccessor resourceAccessor = resourceAccessor();
     Resource resource = resourceAccessor.readResource(resourceId);
     if (resource == null || resource.getRebalancerConfig() == null) {
       LOG.error("Cannot reset partitions because the resource is not present");
@@ -603,10 +605,11 @@ public class ParticipantAccessor {
    * @param resourceId resource id
    * @param participantId participant id
    * @param sessionId session id
+   * @return true if dropped, false otherwise
    */
-  public void dropCurrentState(ResourceId resourceId, ParticipantId participantId,
+  public boolean dropCurrentState(ResourceId resourceId, ParticipantId participantId,
       SessionId sessionId) {
-    _accessor.removeProperty(_keyBuilder.currentState(participantId.stringify(),
+    return _accessor.removeProperty(_keyBuilder.currentState(participantId.stringify(),
         sessionId.stringify(), resourceId.stringify()));
   }
 
@@ -618,12 +621,10 @@ public class ParticipantAccessor {
   boolean dropParticipant(ParticipantId participantId) {
     if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) == null) {
       LOG.error("Config for participant: " + participantId + " does NOT exist in cluster");
-      return false;
     }
 
     if (_accessor.getProperty(_keyBuilder.instance(participantId.stringify())) == null) {
       LOG.error("Participant: " + participantId + " structure does NOT exist in cluster");
-      return false;
     }
 
     // delete participant config path
@@ -660,7 +661,7 @@ public class ParticipantAccessor {
       return false;
     }
     dropParticipant(oldParticipantId);
-    ResourceAccessor resourceAccessor = new ResourceAccessor(_accessor);
+    ResourceAccessor resourceAccessor = resourceAccessor();
     Map<String, IdealState> idealStateMap = _accessor.getChildValuesMap(_keyBuilder.idealStates());
     for (String resourceName : idealStateMap.keySet()) {
       IdealState idealState = idealStateMap.get(resourceName);
@@ -678,8 +679,8 @@ public class ParticipantAccessor {
    * @param oldParticipantId the participant to drop
    * @param newParticipantId the participant that replaces it
    */
-  private void swapParticipantsInIdealState(IdealState idealState, ParticipantId oldParticipantId,
-      ParticipantId newParticipantId) {
+  protected void swapParticipantsInIdealState(IdealState idealState,
+      ParticipantId oldParticipantId, ParticipantId newParticipantId) {
     for (PartitionId partitionId : idealState.getPartitionSet()) {
       List<ParticipantId> oldPreferenceList = idealState.getPreferenceList(partitionId);
       if (oldPreferenceList != null) {
@@ -704,4 +705,70 @@ public class ParticipantAccessor {
       }
     }
   }
+
+  /**
+   * Create empty persistent properties to ensure that there is a valid participant structure
+   */
+  public void initParticipantStructure(ParticipantId participantId) {
+    List<String> paths = getRequiredPaths(_keyBuilder, participantId);
+    BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+    for (String path : paths) {
+      boolean status = baseAccessor.create(path, null, AccessOption.PERSISTENT);
+      if (!status && LOG.isDebugEnabled()) {
+        LOG.debug(path + " already exists");
+      }
+    }
+  }
+
+  /**
+   * Clear properties for the participant
+   */
+  void clearParticipantStructure(ParticipantId participantId) {
+    List<String> paths = getRequiredPaths(_keyBuilder, participantId);
+    BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+    baseAccessor.remove(paths, 0);
+  }
+
+  /**
+   * check if participant structure is valid
+   * @return true if valid or false otherwise
+   */
+  public boolean isParticipantStructureValid(ParticipantId participantId) {
+    List<String> paths = getRequiredPaths(_keyBuilder, participantId);
+    BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+    if (baseAccessor != null) {
+      boolean[] existsResults = baseAccessor.exists(paths, 0);
+      for (boolean exists : existsResults) {
+        if (!exists) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Get the paths that should be created if the participant exists
+   * @param keyBuilder PropertyKey.Builder for the cluster
+   * @param participantId the participant for which to generate paths
+   * @return list of required paths as strings
+   */
+  private static List<String> getRequiredPaths(PropertyKey.Builder keyBuilder,
+      ParticipantId participantId) {
+    List<String> paths = Lists.newArrayList();
+    paths.add(keyBuilder.instanceConfig(participantId.stringify()).getPath());
+    paths.add(keyBuilder.messages(participantId.stringify()).getPath());
+    paths.add(keyBuilder.currentStates(participantId.stringify()).getPath());
+    paths.add(keyBuilder.participantErrors(participantId.stringify()).getPath());
+    paths.add(keyBuilder.statusUpdates(participantId.stringify()).getPath());
+    return paths;
+  }
+
+  /**
+   * Get a ResourceAccessor instance
+   * @return ResourceAccessor
+   */
+  protected ResourceAccessor resourceAccessor() {
+    return new ResourceAccessor(_accessor);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/61643b1d/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
index c65cb44..58b226d 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@ -108,9 +108,10 @@ public class ResourceAccessor {
    * save resource assignment
    * @param resourceId
    * @param resourceAssignment
+   * @return true if set, false otherwise
    */
-  public void setResourceAssignment(ResourceId resourceId, ResourceAssignment resourceAssignment) {
-    _accessor.setProperty(_keyBuilder.resourceAssignment(resourceId.stringify()),
+  public boolean setResourceAssignment(ResourceId resourceId, ResourceAssignment resourceAssignment) {
+    return _accessor.setProperty(_keyBuilder.resourceAssignment(resourceId.stringify()),
         resourceAssignment);
   }
 
@@ -128,9 +129,11 @@ public class ResourceAccessor {
    * rebalancer configuration
    * @param resourceId
    * @param configuration
+   * @return true if set, false otherwise
    */
-  void setConfiguration(ResourceId resourceId, ResourceConfiguration configuration) {
-    _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+  private boolean setConfiguration(ResourceId resourceId, ResourceConfiguration configuration) {
+    boolean status =
+        _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
     // also set an ideal state if the resource supports it
     RebalancerConfig rebalancerConfig = new RebalancerConfig(configuration);
     IdealState idealState =
@@ -139,6 +142,7 @@ public class ResourceAccessor {
     if (idealState != null) {
       _accessor.setProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
     }
+    return status;
   }
 
   /**
@@ -249,27 +253,29 @@ public class ResourceAccessor {
    * Get a resource configuration, which may include user-defined configuration, as well as
    * rebalancer configuration
    * @param resourceId
-   * @return configuration
+   * @return configuration or null
    */
-  public void getConfiguration(ResourceId resourceId) {
-    _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
+  public ResourceConfiguration getConfiguration(ResourceId resourceId) {
+    return _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
   }
 
   /**
    * set external view of a resource
    * @param resourceId
    * @param extView
+   * @return true if set, false otherwise
    */
-  public void setExternalView(ResourceId resourceId, ExternalView extView) {
-    _accessor.setProperty(_keyBuilder.externalView(resourceId.stringify()), extView);
+  public boolean setExternalView(ResourceId resourceId, ExternalView extView) {
+    return _accessor.setProperty(_keyBuilder.externalView(resourceId.stringify()), extView);
   }
 
   /**
    * drop external view of a resource
    * @param resourceId
+   * @return true if dropped, false otherwise
    */
-  public void dropExternalView(ResourceId resourceId) {
-    _accessor.removeProperty(_keyBuilder.externalView(resourceId.stringify()));
+  public boolean dropExternalView(ResourceId resourceId) {
+    return _accessor.removeProperty(_keyBuilder.externalView(resourceId.stringify()));
   }
 
   /**
@@ -278,7 +284,7 @@ public class ResourceAccessor {
    * @return true if they were reset, false otherwise
    */
   public boolean resetResources(Set<ResourceId> resetResourceIdSet) {
-    ParticipantAccessor accessor = new ParticipantAccessor(_accessor);
+    ParticipantAccessor accessor = participantAccessor();
     List<ExternalView> extViews = _accessor.getChildValues(_keyBuilder.externalViews());
     for (ExternalView extView : extViews) {
       if (!resetResourceIdSet.contains(extView.getResourceId())) {
@@ -436,4 +442,12 @@ public class ResourceAccessor {
     return new Resource(resourceId, type, idealState, resourceAssignment, externalView,
         rebalancerContext, userConfig, bucketSize, batchMessageMode);
   }
+
+  /**
+   * Get a ParticipantAccessor instance
+   * @return ParticipantAccessor
+   */
+  protected ParticipantAccessor participantAccessor() {
+    return new ParticipantAccessor(_accessor);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/61643b1d/helix-core/src/main/java/org/apache/helix/lock/HelixLock.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/lock/HelixLock.java b/helix-core/src/main/java/org/apache/helix/lock/HelixLock.java
new file mode 100644
index 0000000..a567a5c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/lock/HelixLock.java
@@ -0,0 +1,43 @@
+package org.apache.helix.lock;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Generic (distributed) lock for Helix-related persisted updates
+ */
+public interface HelixLock {
+  /**
+   * Synchronously acquire a lock
+   * @return true if the lock was acquired, false if could not be acquired
+   */
+  public boolean lock();
+
+  /**
+   * Release a lock
+   * @return true if the lock was released, false if it could not be released
+   */
+  public boolean unlock();
+
+  /**
+   * Check if this object is blocked waiting on the lock
+   * @return true if blocked, false otherwise
+   */
+  public boolean isBlocked();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/61643b1d/helix-core/src/main/java/org/apache/helix/lock/HelixLockable.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/lock/HelixLockable.java b/helix-core/src/main/java/org/apache/helix/lock/HelixLockable.java
new file mode 100644
index 0000000..fdb2ca5
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/lock/HelixLockable.java
@@ -0,0 +1,36 @@
+package org.apache.helix.lock;
+
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.id.ClusterId;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Implemented by any Helix construct that is lockable and is able to return a HelixLock instance
+ */
+public interface HelixLockable {
+  /**
+   * Get a lock object on a scope
+   * @param clusterId cluster to lock
+   * @param scope scope relative to the cluster that the lock protects
+   * @return HelixLock instance
+   */
+  HelixLock getLock(ClusterId clusterId, Scope<?> scope);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/61643b1d/helix-core/src/main/java/org/apache/helix/lock/zk/LockListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/LockListener.java b/helix-core/src/main/java/org/apache/helix/lock/zk/LockListener.java
new file mode 100644
index 0000000..bb2118c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/lock/zk/LockListener.java
@@ -0,0 +1,39 @@
+package org.apache.helix.lock.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * This class has two methods which are call
+ * back methods when a lock is acquired and
+ * when the lock is released.
+ */
+interface LockListener {
+  /**
+   * call back called when the lock
+   * is acquired
+   */
+  public void lockAcquired();
+
+  /**
+   * call back called when the lock is
+   * released.
+   */
+  public void lockReleased();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/61643b1d/helix-core/src/main/java/org/apache/helix/lock/zk/ProtocolSupport.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/ProtocolSupport.java b/helix-core/src/main/java/org/apache/helix/lock/zk/ProtocolSupport.java
new file mode 100644
index 0000000..23bef6a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/lock/zk/ProtocolSupport.java
@@ -0,0 +1,191 @@
+package org.apache.helix.lock.zk;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * A base class for protocol implementations which provides a number of higher
+ * level helper methods for working with ZooKeeper along with retrying synchronous
+ * operations if the connection to ZooKeeper closes such as
+ * {@link #retryOperation(ZooKeeperOperation)}
+ */
+class ProtocolSupport {
+  private static final Logger LOG = Logger.getLogger(ProtocolSupport.class);
+
+  protected final ZooKeeper zookeeper;
+  private AtomicBoolean closed = new AtomicBoolean(false);
+  private long retryDelay = 500L;
+  private int retryCount = 10;
+  private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+  public ProtocolSupport(ZooKeeper zookeeper) {
+    this.zookeeper = zookeeper;
+  }
+
+  /**
+   * Closes this strategy and releases any ZooKeeper resources; but keeps the
+   * ZooKeeper instance open
+   */
+  public void close() {
+    if (closed.compareAndSet(false, true)) {
+      doClose();
+    }
+  }
+
+  /**
+   * return zookeeper client instance
+   * @return zookeeper client instance
+   */
+  public ZooKeeper getZookeeper() {
+    return zookeeper;
+  }
+
+  /**
+   * return the acl its using
+   * @return the acl.
+   */
+  public List<ACL> getAcl() {
+    return acl;
+  }
+
+  /**
+   * set the acl
+   * @param acl the acl to set to
+   */
+  public void setAcl(List<ACL> acl) {
+    this.acl = acl;
+  }
+
+  /**
+   * get the retry delay in milliseconds
+   * @return the retry delay
+   */
+  public long getRetryDelay() {
+    return retryDelay;
+  }
+
+  /**
+   * Sets the time waited between retry delays
+   * @param retryDelay the retry delay
+   */
+  public void setRetryDelay(long retryDelay) {
+    this.retryDelay = retryDelay;
+  }
+
+  /**
+   * Allow derived classes to perform
+   * some custom closing operations to release resources
+   */
+  protected void doClose() {
+  }
+
+  /**
+   * Perform the given operation, retrying if the connection fails
+   * @return object. it needs to be cast to the callee's expected
+   *         return type.
+   */
+  protected Object retryOperation(ZooKeeperOperation operation) throws KeeperException,
+      InterruptedException {
+    KeeperException exception = null;
+    for (int i = 0; i < retryCount; i++) {
+      try {
+        return operation.execute();
+      } catch (KeeperException.SessionExpiredException e) {
+        LOG.warn("Session expired for: " + zookeeper + " so reconnecting due to: " + e, e);
+        throw e;
+      } catch (KeeperException.ConnectionLossException e) {
+        if (exception == null) {
+          exception = e;
+        }
+        LOG.debug("Attempt " + i + " failed with connection loss so " + "attempting to reconnect: "
+            + e, e);
+        retryDelay(i);
+      }
+    }
+    throw exception;
+  }
+
+  /**
+   * Ensures that the given path exists with no data, the current
+   * ACL and no flags
+   * @param path
+   */
+  protected void ensurePathExists(String path) {
+    ensureExists(path, null, acl, CreateMode.PERSISTENT);
+  }
+
+  /**
+   * Ensures that the given path exists with the given data, ACL and flags
+   * @param path
+   * @param acl
+   * @param flags
+   */
+  protected void ensureExists(final String path, final byte[] data, final List<ACL> acl,
+      final CreateMode flags) {
+    try {
+      retryOperation(new ZooKeeperOperation() {
+        public boolean execute() throws KeeperException, InterruptedException {
+          Stat stat = zookeeper.exists(path, false);
+          if (stat != null) {
+            return true;
+          }
+          zookeeper.create(path, data, acl, flags);
+          return true;
+        }
+      });
+    } catch (KeeperException e) {
+      LOG.warn("Caught: " + e, e);
+    } catch (InterruptedException e) {
+      LOG.warn("Caught: " + e, e);
+    }
+  }
+
+  /**
+   * Returns true if this protocol has been closed
+   * @return true if this protocol is closed
+   */
+  protected boolean isClosed() {
+    return closed.get();
+  }
+
+  /**
+   * Performs a retry delay if this is not the first attempt
+   * @param attemptCount the number of the attempts performed so far
+   */
+  protected void retryDelay(int attemptCount) {
+    if (attemptCount > 0) {
+      try {
+        Thread.sleep(attemptCount * retryDelay);
+      } catch (InterruptedException e) {
+        LOG.debug("Failed to sleep: " + e, e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/61643b1d/helix-core/src/main/java/org/apache/helix/lock/zk/WriteLock.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/WriteLock.java b/helix-core/src/main/java/org/apache/helix/lock/zk/WriteLock.java
new file mode 100644
index 0000000..aef7618
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/lock/zk/WriteLock.java
@@ -0,0 +1,294 @@
+package org.apache.helix.lock.zk;
+
+import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL;
+
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * A protocol to implement an exclusive
+ * write lock or to elect a leader.
+ * <p/>
+ * You invoke {@link #lock()} to start the process of grabbing the lock; you may get the lock then
+ * or it may be some time later.
+ * <p/>
+ * You can register a listener so that you are invoked when you get the lock; otherwise you can ask
+ * if you have the lock by calling {@link #isOwner()}
+ */
+class WriteLock extends ProtocolSupport {
+  private static final Logger LOG = Logger.getLogger(WriteLock.class);
+
+  private final String dir;
+  private String id;
+  private ZNodeName idName;
+  private String ownerId;
+  private String lastChildId;
+  private byte[] data = {
+      0x12, 0x34
+  };
+  private LockListener callback;
+  private LockZooKeeperOperation zop;
+
+  /**
+   * zookeeper contructor for writelock
+   * @param zookeeper zookeeper client instance
+   * @param dir the parent path you want to use for locking
+   * @param acls the acls that you want to use for all the paths,
+   *          if null world read/write is used.
+   */
+  public WriteLock(ZooKeeper zookeeper, String dir, List<ACL> acl) {
+    super(zookeeper);
+    this.dir = dir;
+    if (acl != null) {
+      setAcl(acl);
+    }
+    this.zop = new LockZooKeeperOperation();
+  }
+
+  /**
+   * zookeeper contructor for writelock with callback
+   * @param zookeeper the zookeeper client instance
+   * @param dir the parent path you want to use for locking
+   * @param acl the acls that you want to use for all the paths
+   * @param callback the call back instance
+   */
+  public WriteLock(ZooKeeper zookeeper, String dir, List<ACL> acl, LockListener callback) {
+    this(zookeeper, dir, acl);
+    this.callback = callback;
+  }
+
+  /**
+   * return the current locklistener
+   * @return the locklistener
+   */
+  public LockListener getLockListener() {
+    return this.callback;
+  }
+
+  /**
+   * register a different call back listener
+   * @param callback the call back instance
+   */
+  public void setLockListener(LockListener callback) {
+    this.callback = callback;
+  }
+
+  /**
+   * Removes the lock or associated znode if
+   * you no longer require the lock. this also
+   * removes your request in the queue for locking
+   * in case you do not already hold the lock.
+   * @throws RuntimeException throws a runtime exception
+   *           if it cannot connect to zookeeper.
+   */
+  public synchronized void unlock() throws RuntimeException {
+
+    if (!isClosed() && id != null) {
+      // we don't need to retry this operation in the case of failure
+      // as ZK will remove ephemeral files and we don't wanna hang
+      // this process when closing if we cannot reconnect to ZK
+      try {
+
+        ZooKeeperOperation zopdel = new ZooKeeperOperation() {
+          public boolean execute() throws KeeperException, InterruptedException {
+            zookeeper.delete(id, -1);
+            return Boolean.TRUE;
+          }
+        };
+        zopdel.execute();
+      } catch (InterruptedException e) {
+        LOG.warn("Caught: " + e, e);
+        // set that we have been interrupted.
+        Thread.currentThread().interrupt();
+      } catch (KeeperException.NoNodeException e) {
+        // do nothing
+      } catch (KeeperException e) {
+        LOG.warn("Caught: " + e, e);
+        throw (RuntimeException) new RuntimeException(e.getMessage()).initCause(e);
+      } finally {
+        if (callback != null) {
+          callback.lockReleased();
+        }
+        id = null;
+      }
+    }
+  }
+
+  /**
+   * the watcher called on
+   * getting watch while watching
+   * my predecessor
+   */
+  private class LockWatcher implements Watcher {
+    public void process(WatchedEvent event) {
+      // lets either become the leader or watch the new/updated node
+      LOG.debug("Watcher fired on path: " + event.getPath() + " state: " + event.getState()
+          + " type " + event.getType());
+      try {
+        lock();
+      } catch (Exception e) {
+        LOG.warn("Failed to acquire lock: " + e, e);
+      }
+    }
+  }
+
+  /**
+   * a zoookeeper operation that is mainly responsible
+   * for all the magic required for locking.
+   */
+  private class LockZooKeeperOperation implements ZooKeeperOperation {
+
+    /**
+     * find if we have been created earler if not create our node
+     * @param prefix the prefix node
+     * @param zookeeper teh zookeeper client
+     * @param dir the dir paretn
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir)
+        throws KeeperException, InterruptedException {
+      List<String> names = zookeeper.getChildren(dir, false);
+      for (String name : names) {
+        if (name.startsWith(prefix)) {
+          id = name;
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Found id created last time: " + id);
+          }
+          break;
+        }
+      }
+      if (id == null) {
+        id = zookeeper.create(dir + "/" + prefix, data, getAcl(), EPHEMERAL_SEQUENTIAL);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Created id: " + id);
+        }
+      }
+
+    }
+
+    /**
+     * the command that is run and retried for actually
+     * obtaining the lock
+     * @return if the command was successful or not
+     */
+    public boolean execute() throws KeeperException, InterruptedException {
+      do {
+        if (id == null) {
+          long sessionId = zookeeper.getSessionId();
+          String prefix = "x-" + sessionId + "-";
+          // lets try look up the current ID if we failed
+          // in the middle of creating the znode
+          findPrefixInChildren(prefix, zookeeper, dir);
+          idName = new ZNodeName(id);
+        }
+        if (id != null) {
+          List<String> names = zookeeper.getChildren(dir, false);
+          if (names.isEmpty()) {
+            LOG.warn("No children in: " + dir + " when we've just "
+                + "created one! Lets recreate it...");
+            // lets force the recreation of the id
+            id = null;
+          } else {
+            // lets sort them explicitly (though they do seem to come back in order ususally :)
+            SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>();
+            for (String name : names) {
+              sortedNames.add(new ZNodeName(dir + "/" + name));
+            }
+            ownerId = sortedNames.first().getName();
+            SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName);
+            if (!lessThanMe.isEmpty()) {
+              ZNodeName lastChildName = lessThanMe.last();
+              lastChildId = lastChildName.getName();
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("watching less than me node: " + lastChildId);
+              }
+              Stat stat = zookeeper.exists(lastChildId, new LockWatcher());
+              if (stat != null) {
+                return Boolean.FALSE;
+              } else {
+                LOG.warn("Could not find the" + " stats for less than me: "
+                    + lastChildName.getName());
+              }
+            } else {
+              if (isOwner()) {
+                if (callback != null) {
+                  callback.lockAcquired();
+                }
+                return Boolean.TRUE;
+              }
+            }
+          }
+        }
+      } while (id == null);
+      return Boolean.FALSE;
+    }
+  };
+
+  /**
+   * Attempts to acquire the exclusive write lock returning whether or not it was
+   * acquired. Note that the exclusive lock may be acquired some time later after
+   * this method has been invoked due to the current lock owner going away.
+   */
+  public synchronized boolean lock() throws KeeperException, InterruptedException {
+    if (isClosed()) {
+      return false;
+    }
+    ensurePathExists(dir);
+
+    return (Boolean) retryOperation(zop);
+  }
+
+  /**
+   * return the parent dir for lock
+   * @return the parent dir used for locks.
+   */
+  public String getDir() {
+    return dir;
+  }
+
+  /**
+   * Returns true if this node is the owner of the
+   * lock (or the leader)
+   */
+  public boolean isOwner() {
+    return id != null && ownerId != null && id.equals(ownerId);
+  }
+
+  /**
+   * return the id for this lock
+   * @return the id for this lock
+   */
+  public String getId() {
+    return this.id;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/61643b1d/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
new file mode 100644
index 0000000..4d3b459
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
@@ -0,0 +1,152 @@
+package org.apache.helix.lock.zk;
+
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.lock.HelixLock;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Locking scheme for Helix that uses the ZooKeeper exclusive lock implementation
+ * Please use the following lock order convention: Cluster, Participant, Resource, Partition
+ * WARNING: this is not a reentrant lock
+ */
+public class ZKHelixLock implements HelixLock {
+  private static final Logger LOG = Logger.getLogger(ZKHelixLock.class);
+
+  private static final String LOCK_ROOT = "LOCKS";
+  private final String _rootPath;
+  private final WriteLock _writeLock;
+  private final ZkClient _zkClient;
+  private volatile boolean _locked;
+  private volatile boolean _canceled;
+  private volatile boolean _blocked;
+
+  private final LockListener _listener = new LockListener() {
+    @Override
+    public void lockReleased() {
+    }
+
+    @Override
+    public void lockAcquired() {
+      synchronized (ZKHelixLock.this) {
+        if (!_canceled) {
+          _locked = true;
+        } else {
+          unlock();
+        }
+        ZKHelixLock.this.notify();
+      }
+    }
+  };
+
+  /**
+   * Initialize for a cluster and scope
+   * @param clusterId the cluster under which the lock will live
+   * @param scope the scope to lock
+   * @param zkClient an active ZK client
+   */
+  public ZKHelixLock(ClusterId clusterId, Scope<?> scope, ZkClient zkClient) {
+    _zkClient = zkClient;
+    _rootPath =
+        '/' + clusterId.stringify() + '/' + LOCK_ROOT + '/' + scope.getType() + '_'
+            + scope.getScopedId();
+    ZooKeeper zookeeper = ((ZkConnection) zkClient.getConnection()).getZookeeper();
+    _writeLock = new WriteLock(zookeeper, _rootPath, null, _listener);
+    _locked = false;
+    _canceled = false;
+    _blocked = false;
+  }
+
+  /**
+   * Try to synchronously lock the scope
+   * @return true if the lock succeeded, false if it failed, as is the case if the connection to ZK
+   *         is lost
+   */
+  @Override
+  public synchronized boolean lock() {
+    _canceled = false;
+    if (_locked) {
+      // no need to proceed if the lock is already acquired
+      return true;
+    }
+    try {
+      // create the root path if it doesn't exist
+      BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+      baseAccessor.create(_rootPath, null, AccessOption.PERSISTENT);
+
+      // try to acquire the lock
+      boolean acquired = _writeLock.lock();
+      if (acquired) {
+        _locked = true;
+      } else {
+        setBlocked(true);
+        wait();
+      }
+    } catch (KeeperException e) {
+      LOG.error("Error acquiring a lock on " + _rootPath, e);
+      _canceled = true;
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted while acquiring a lock on " + _rootPath);
+      _canceled = true;
+    }
+    setBlocked(false);
+    return _locked;
+  }
+
+  /**
+   * Unlock the scope
+   * @return true if unlock executed, false otherwise
+   */
+  @Override
+  public synchronized boolean unlock() {
+    try {
+      _writeLock.unlock();
+    } catch (IllegalArgumentException e) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Unlock skipped because lock node was not present");
+      }
+    }
+    _locked = false;
+    return true;
+  }
+
+  @Override
+  public boolean isBlocked() {
+    return _blocked;
+  }
+
+  /**
+   * Set if this the lock method is currently blocked
+   * @param isBlocked true if blocked, false otherwise
+   */
+  protected void setBlocked(boolean isBlocked) {
+    _blocked = isBlocked;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/61643b1d/helix-core/src/main/java/org/apache/helix/lock/zk/ZNodeName.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/ZNodeName.java b/helix-core/src/main/java/org/apache/helix/lock/zk/ZNodeName.java
new file mode 100644
index 0000000..47253e6
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/lock/zk/ZNodeName.java
@@ -0,0 +1,113 @@
+package org.apache.helix.lock.zk;
+
+import org.apache.log4j.Logger;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Represents an ephemeral znode name which has an ordered sequence number
+ * and can be sorted in order
+ */
+class ZNodeName implements Comparable<ZNodeName> {
+  private final String name;
+  private String prefix;
+  private int sequence = -1;
+  private static final Logger LOG = Logger.getLogger(ZNodeName.class);
+
+  public ZNodeName(String name) {
+    if (name == null) {
+      throw new NullPointerException("id cannot be null");
+    }
+    this.name = name;
+    this.prefix = name;
+    int idx = name.lastIndexOf('-');
+    if (idx >= 0) {
+      this.prefix = name.substring(0, idx);
+      try {
+        this.sequence = Integer.parseInt(name.substring(idx + 1));
+        // If an exception occurred we misdetected a sequence suffix,
+        // so return -1.
+      } catch (NumberFormatException e) {
+        LOG.info("Number format exception for " + idx, e);
+      } catch (ArrayIndexOutOfBoundsException e) {
+        LOG.info("Array out of bounds for " + idx, e);
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    return name.toString();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o)
+      return true;
+    if (o == null || getClass() != o.getClass())
+      return false;
+
+    ZNodeName sequence = (ZNodeName) o;
+
+    if (!name.equals(sequence.name))
+      return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return name.hashCode() + 37;
+  }
+
+  public int compareTo(ZNodeName that) {
+    int answer = this.prefix.compareTo(that.prefix);
+    if (answer == 0) {
+      int s1 = this.sequence;
+      int s2 = that.sequence;
+      if (s1 == -1 && s2 == -1) {
+        return this.name.compareTo(that.name);
+      }
+      answer = s1 == -1 ? 1 : s2 == -1 ? -1 : s1 - s2;
+    }
+    return answer;
+  }
+
+  /**
+   * Returns the name of the znode
+   */
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * Returns the sequence number
+   */
+  public int getZNodeName() {
+    return sequence;
+  }
+
+  /**
+   * Returns the text prefix before the sequence number
+   */
+  public String getPrefix() {
+    return prefix;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/61643b1d/helix-core/src/main/java/org/apache/helix/lock/zk/ZooKeeperOperation.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/ZooKeeperOperation.java b/helix-core/src/main/java/org/apache/helix/lock/zk/ZooKeeperOperation.java
new file mode 100644
index 0000000..58b9fe3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/lock/zk/ZooKeeperOperation.java
@@ -0,0 +1,38 @@
+package org.apache.helix.lock.zk;
+
+import org.apache.zookeeper.KeeperException;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * A callback object which can be used for implementing retry-able operations in the
+ * {@link org.apache.helix.lock.zk.recipes.lock.ProtocolSupport} class
+ */
+interface ZooKeeperOperation {
+
+  /**
+   * Performs the operation - which may be involved multiple times if the connection
+   * to ZooKeeper closes during this operation
+   * @return the result of the operation or null
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public boolean execute() throws KeeperException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/61643b1d/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
index d733a5c..1278ceb 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
@@ -29,6 +29,10 @@ import org.apache.helix.api.id.ClusterId;
  * Persisted configuration properties for a cluster
  */
 public class ClusterConfiguration extends HelixProperty {
+  private enum Fields {
+    WRITE_ID
+  }
+
   /**
    * Instantiate for an id
    * @param id cluster id
@@ -62,6 +66,22 @@ public class ClusterConfiguration extends HelixProperty {
   }
 
   /**
+   * Set the identifier of this configuration for the last write
+   * @param writeId positive random long identifier
+   */
+  public void setWriteId(long writeId) {
+    _record.setLongField(Fields.WRITE_ID.toString(), writeId);
+  }
+
+  /**
+   * Get the identifier for the last write to this configuration
+   * @return positive write identifier, or -1 of not set
+   */
+  public long getWriteId() {
+    return _record.getLongField(Fields.WRITE_ID.toString(), -1);
+  }
+
+  /**
    * Create a new ClusterConfiguration from a UserConfig
    * @param userConfig user-defined configuration properties
    * @return ClusterConfiguration


Mime
View raw message