helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject git commit: [HELIX-360] Remove code duplication for list of required paths, minor fixes
Date Fri, 21 Feb 2014 17:53:11 GMT
Repository: helix
Updated Branches:
  refs/heads/master 0beb0d9f3 -> 507feeb4f


[HELIX-360] Remove code duplication for list of required paths, minor fixes

Changes based on Sandeep's and Kishore's comments on the code review.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/507feeb4
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/507feeb4
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/507feeb4

Branch: refs/heads/master
Commit: 507feeb4f4a16a803e4497f0ee3e4482b2303045
Parents: 0beb0d9
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Wed Feb 19 11:41:28 2014 -0800
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Fri Feb 21 09:52:57 2014 -0800

----------------------------------------------------------------------
 .../api/accessor/AtomicClusterAccessor.java     | 49 ++++++++-------
 .../api/accessor/AtomicParticipantAccessor.java | 42 +++++++------
 .../api/accessor/AtomicResourceAccessor.java    | 23 +++----
 .../helix/api/accessor/ClusterAccessor.java     | 24 ++++++--
 .../helix/api/accessor/ParticipantAccessor.java | 51 +++++++++++++---
 .../helix/api/accessor/ResourceAccessor.java    | 16 +++++
 .../org/apache/helix/manager/zk/ZKUtil.java     | 63 +++-----------------
 .../helix/manager/zk/ZkHelixConnection.java     | 19 +++---
 .../java/org/apache/helix/util/HelixUtil.java   | 50 ++++++++++++++++
 9 files changed, 215 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/507feeb4/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
index 37cc47e..216b3ad 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
@@ -56,9 +56,6 @@ public class AtomicClusterAccessor extends ClusterAccessor {
   private static final Logger LOG = Logger.getLogger(AtomicClusterAccessor.class);
 
   private final HelixLockable _lockProvider;
-  private final HelixDataAccessor _accessor;
-  private final PropertyKey.Builder _keyBuilder;
-  private final ClusterId _clusterId;
 
   /**
    * Non-atomic instance to protect against reentrant locking via polymorphism
@@ -75,15 +72,13 @@ public class AtomicClusterAccessor extends ClusterAccessor {
       HelixLockable lockProvider) {
     super(clusterId, accessor);
     _lockProvider = lockProvider;
-    _accessor = accessor;
-    _keyBuilder = accessor.keyBuilder();
-    _clusterId = clusterId;
     _clusterAccessor = new ClusterAccessor(clusterId, accessor);
   }
 
   @Override
   public boolean createCluster(ClusterConfig cluster) {
-    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.cluster(_clusterId));
+    ClusterId clusterId = clusterId();
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.cluster(clusterId));
     boolean locked = lock.lock();
     if (locked) {
       try {
@@ -97,7 +92,8 @@ public class AtomicClusterAccessor extends ClusterAccessor {
 
   @Override
   public boolean dropCluster() {
-    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.cluster(_clusterId));
+    ClusterId clusterId = clusterId();
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.cluster(clusterId));
     boolean locked = lock.lock();
     if (locked) {
       try {
@@ -111,7 +107,8 @@ public class AtomicClusterAccessor extends ClusterAccessor {
 
   @Override
   public Cluster readCluster() {
-    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.cluster(_clusterId));
+    ClusterId clusterId = clusterId();
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.cluster(clusterId));
     boolean locked = lock.lock();
     if (locked) {
       try {
@@ -129,7 +126,8 @@ public class AtomicClusterAccessor extends ClusterAccessor {
       LOG.error("Participant config cannot be null");
       return false;
     }
-    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participant.getId()));
+    ClusterId clusterId = clusterId();
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participant.getId()));
     boolean locked = lock.lock();
     if (locked) {
       try {
@@ -143,7 +141,8 @@ public class AtomicClusterAccessor extends ClusterAccessor {
 
   @Override
   public boolean dropParticipantFromCluster(ParticipantId participantId) {
-    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+    ClusterId clusterId = clusterId();
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId));
     boolean locked = lock.lock();
     if (locked) {
       try {
@@ -161,7 +160,8 @@ public class AtomicClusterAccessor extends ClusterAccessor {
       LOG.error("Resource config cannot be null");
       return false;
     }
-    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resource.getId()));
+    ClusterId clusterId = clusterId();
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.resource(resource.getId()));
     boolean locked = lock.lock();
     if (locked) {
       try {
@@ -175,7 +175,8 @@ public class AtomicClusterAccessor extends ClusterAccessor {
 
   @Override
   public boolean dropResourceFromCluster(ResourceId resourceId) {
-    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceId));
+    ClusterId clusterId = clusterId();
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.resource(resourceId));
     boolean locked = lock.lock();
     if (locked) {
       try {
@@ -189,7 +190,8 @@ public class AtomicClusterAccessor extends ClusterAccessor {
 
   @Override
   public ClusterConfig updateCluster(ClusterConfig.Delta clusterDelta) {
-    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.cluster(_clusterId));
+    ClusterId clusterId = clusterId();
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.cluster(clusterId));
     boolean locked = lock.lock();
     if (locked) {
       try {
@@ -208,13 +210,16 @@ public class AtomicClusterAccessor extends ClusterAccessor {
   public Map<ResourceId, Resource> readResources() {
     // read resources individually instead of together to maintain the equality link between
ideal
     // state and resource config
+    ClusterId clusterId = clusterId();
+    HelixDataAccessor dataAccessor = dataAccessor();
+    PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
     Map<ResourceId, Resource> resources = Maps.newHashMap();
     Set<String> idealStateNames =
-        Sets.newHashSet(_accessor.getChildNames(_keyBuilder.idealStates()));
+        Sets.newHashSet(dataAccessor.getChildNames(keyBuilder.idealStates()));
     Set<String> resourceConfigNames =
-        Sets.newHashSet(_accessor.getChildNames(_keyBuilder.resourceConfigs()));
+        Sets.newHashSet(dataAccessor.getChildNames(keyBuilder.resourceConfigs()));
     resourceConfigNames.addAll(idealStateNames);
-    ResourceAccessor accessor = new AtomicResourceAccessor(_clusterId, _accessor, _lockProvider);
+    ResourceAccessor accessor = new AtomicResourceAccessor(clusterId, dataAccessor, _lockProvider);
     for (String resourceName : resourceConfigNames) {
       ResourceId resourceId = ResourceId.from(resourceName);
       Resource resource = accessor.readResource(resourceId);
@@ -231,10 +236,13 @@ public class AtomicClusterAccessor extends ClusterAccessor {
   @Override
   public Map<ParticipantId, Participant> readParticipants() {
     // read participants individually to keep configs consistent with current state and messages
+    ClusterId clusterId = clusterId();
+    HelixDataAccessor dataAccessor = dataAccessor();
+    PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
     Map<ParticipantId, Participant> participants = Maps.newHashMap();
     ParticipantAccessor accessor =
-        new AtomicParticipantAccessor(_clusterId, _accessor, _lockProvider);
-    List<String> participantNames = _accessor.getChildNames(_keyBuilder.instanceConfigs());
+        new AtomicParticipantAccessor(clusterId, dataAccessor, _lockProvider);
+    List<String> participantNames = dataAccessor.getChildNames(keyBuilder.instanceConfigs());
     for (String participantName : participantNames) {
       ParticipantId participantId = ParticipantId.from(participantName);
       Participant participant = accessor.readParticipant(participantId);
@@ -247,7 +255,8 @@ public class AtomicClusterAccessor extends ClusterAccessor {
 
   @Override
   public void initClusterStructure() {
-    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.cluster(_clusterId));
+    ClusterId clusterId = clusterId();
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.cluster(clusterId));
     boolean locked = lock.lock();
     if (locked) {
       try {

http://git-wip-us.apache.org/repos/asf/helix/blob/507feeb4/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
index 90f58ea..6b9b10e 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
@@ -47,8 +47,6 @@ import org.apache.log4j.Logger;
 public class AtomicParticipantAccessor extends ParticipantAccessor {
   private static final Logger LOG = Logger.getLogger(AtomicParticipantAccessor.class);
 
-  private final ClusterId _clusterId;
-  private final HelixDataAccessor _accessor;
   private final HelixLockable _lockProvider;
 
   /**
@@ -65,15 +63,14 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
   public AtomicParticipantAccessor(ClusterId clusterId, HelixDataAccessor accessor,
       HelixLockable lockProvider) {
     super(clusterId, accessor);
-    _clusterId = clusterId;
-    _accessor = accessor;
     _lockProvider = lockProvider;
     _participantAccessor = new ParticipantAccessor(clusterId, accessor);
   }
 
   @Override
   boolean enableParticipant(ParticipantId participantId, boolean isEnabled) {
-    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+    ClusterId clusterId = clusterId();
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId));
     boolean locked = lock.lock();
     if (locked) {
       try {
@@ -87,7 +84,8 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
 
   @Override
   public Participant readParticipant(ParticipantId participantId) {
-    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+    ClusterId clusterId = clusterId();
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId));
     boolean locked = lock.lock();
     if (locked) {
       try {
@@ -105,8 +103,8 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
       LOG.error("participant config cannot be null");
       return false;
     }
-    HelixLock lock =
-        _lockProvider.getLock(_clusterId, Scope.participant(participantConfig.getId()));
+    ClusterId clusterId = clusterId();
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantConfig.getId()));
     boolean locked = lock.lock();
     if (locked) {
       try {
@@ -121,7 +119,8 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
   @Override
   public ParticipantConfig updateParticipant(ParticipantId participantId,
       ParticipantConfig.Delta participantDelta) {
-    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+    ClusterId clusterId = clusterId();
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId));
     boolean locked = lock.lock();
     if (locked) {
       try {
@@ -135,7 +134,8 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
 
   @Override
   boolean dropParticipant(ParticipantId participantId) {
-    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+    ClusterId clusterId = clusterId();
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId));
     boolean locked = lock.lock();
     if (locked) {
       try {
@@ -150,7 +150,8 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
   @Override
   public void insertMessagesToParticipant(ParticipantId participantId,
       Map<MessageId, Message> msgMap) {
-    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+    ClusterId clusterId = clusterId();
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId));
     boolean locked = lock.lock();
     if (locked) {
       try {
@@ -164,7 +165,8 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
 
   @Override
   public void updateMessageStatus(ParticipantId participantId, Map<MessageId, Message>
msgMap) {
-    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+    ClusterId clusterId = clusterId();
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId));
     boolean locked = lock.lock();
     if (locked) {
       try {
@@ -178,7 +180,8 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
 
   @Override
   public void deleteMessagesFromParticipant(ParticipantId participantId, Set<MessageId>
msgIdSet) {
-    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+    ClusterId clusterId = clusterId();
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId));
     boolean locked = lock.lock();
     if (locked) {
       try {
@@ -191,21 +194,24 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
   }
 
   @Override
-  public void initParticipantStructure(ParticipantId participantId) {
-    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+  public boolean initParticipantStructure(ParticipantId participantId) {
+    ClusterId clusterId = clusterId();
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId));
     boolean locked = lock.lock();
     if (locked) {
       try {
-        _participantAccessor.initParticipantStructure(participantId);
+        return _participantAccessor.initParticipantStructure(participantId);
       } finally {
         lock.unlock();
       }
     }
-    return;
+    return false;
   }
 
   @Override
   protected ResourceAccessor resourceAccessor() {
-    return new AtomicResourceAccessor(_clusterId, _accessor, _lockProvider);
+    ClusterId clusterId = clusterId();
+    HelixDataAccessor accessor = dataAccessor();
+    return new AtomicResourceAccessor(clusterId, accessor, _lockProvider);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/507feeb4/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
index 48457b2..65fda39 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
@@ -43,8 +43,6 @@ import org.apache.log4j.Logger;
 public class AtomicResourceAccessor extends ResourceAccessor {
   private static final Logger LOG = Logger.getLogger(AtomicResourceAccessor.class);
 
-  private final ClusterId _clusterId;
-  private final HelixDataAccessor _accessor;
   private final HelixLockable _lockProvider;
 
   /**
@@ -61,15 +59,14 @@ public class AtomicResourceAccessor extends ResourceAccessor {
   public AtomicResourceAccessor(ClusterId clusterId, HelixDataAccessor accessor,
       HelixLockable lockProvider) {
     super(clusterId, accessor);
-    _clusterId = clusterId;
-    _accessor = accessor;
     _lockProvider = lockProvider;
     _resourceAccessor = new ResourceAccessor(clusterId, accessor);
   }
 
   @Override
   public Resource readResource(ResourceId resourceId) {
-    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceId));
+    ClusterId clusterId = clusterId();
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.resource(resourceId));
     boolean locked = lock.lock();
     if (locked) {
       try {
@@ -83,7 +80,8 @@ public class AtomicResourceAccessor extends ResourceAccessor {
 
   @Override
   public ResourceConfig updateResource(ResourceId resourceId, ResourceConfig.Delta resourceDelta)
{
-    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceId));
+    ClusterId clusterId = clusterId();
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.resource(resourceId));
     boolean locked = lock.lock();
     if (locked) {
       try {
@@ -97,7 +95,8 @@ public class AtomicResourceAccessor extends ResourceAccessor {
 
   @Override
   public boolean setRebalancerConfig(ResourceId resourceId, RebalancerConfig config) {
-    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceId));
+    ClusterId clusterId = clusterId();
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.resource(resourceId));
     boolean locked = lock.lock();
     if (locked) {
       try {
@@ -115,7 +114,8 @@ public class AtomicResourceAccessor extends ResourceAccessor {
       LOG.error("resource config cannot be null");
       return false;
     }
-    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceConfig.getId()));
+    ClusterId clusterId = clusterId();
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.resource(resourceConfig.getId()));
     boolean locked = lock.lock();
     if (locked) {
       try {
@@ -130,7 +130,8 @@ public class AtomicResourceAccessor extends ResourceAccessor {
   @Override
   public boolean generateDefaultAssignment(ResourceId resourceId, int replicaCount,
       String participantGroupTag) {
-    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.cluster(_clusterId));
+    ClusterId clusterId = clusterId();
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.cluster(clusterId));
     boolean locked = lock.lock();
     if (locked) {
       try {
@@ -145,6 +146,8 @@ public class AtomicResourceAccessor extends ResourceAccessor {
 
   @Override
   protected ParticipantAccessor participantAccessor() {
-    return new AtomicParticipantAccessor(_clusterId, _accessor, _lockProvider);
+    ClusterId clusterId = clusterId();
+    HelixDataAccessor accessor = dataAccessor();
+    return new AtomicParticipantAccessor(clusterId, accessor, _lockProvider);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/507feeb4/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index 5ecc210..a41da5a 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -60,7 +60,6 @@ import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder;
 import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.manager.zk.ZKUtil;
 import org.apache.helix.model.Alerts;
 import org.apache.helix.model.ClusterConfiguration;
 import org.apache.helix.model.ClusterConstraints;
@@ -77,6 +76,7 @@ import org.apache.helix.model.PersistentStats;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfiguration;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.util.HelixUtil;
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.Maps;
@@ -808,7 +808,7 @@ public class ClusterAccessor {
    * @return true if valid or false otherwise
    */
   public boolean isClusterStructureValid() {
-    List<String> paths = ZKUtil.getRequiredPathsForCluster(_clusterId.toString());
+    List<String> paths = HelixUtil.getRequiredPathsForCluster(_clusterId.toString());
     BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
     if (baseAccessor != null) {
       boolean[] existsResults = baseAccessor.exists(paths, 0);
@@ -826,7 +826,7 @@ public class ClusterAccessor {
    */
   public void initClusterStructure() {
     BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
-    List<String> paths = ZKUtil.getRequiredPathsForCluster(_clusterId.toString());
+    List<String> paths = HelixUtil.getRequiredPathsForCluster(_clusterId.toString());
     for (String path : paths) {
       boolean status = baseAccessor.create(path, null, AccessOption.PERSISTENT);
       if (!status && LOG.isDebugEnabled()) {
@@ -840,7 +840,7 @@ public class ClusterAccessor {
    */
   private void clearClusterStructure() {
     BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
-    List<String> paths = ZKUtil.getRequiredPathsForCluster(_clusterId.toString());
+    List<String> paths = HelixUtil.getRequiredPathsForCluster(_clusterId.toString());
     baseAccessor.remove(paths, 0);
   }
 
@@ -925,4 +925,20 @@ public class ClusterAccessor {
   public boolean dropStateModelDefinitionFromCluster(StateModelDefId stateModelDefId) {
     return _accessor.removeProperty(_keyBuilder.stateModelDef(stateModelDefId.stringify()));
   }
+
+  /**
+   * Get the cluster ID this accessor is connected to
+   * @return ClusterId
+   */
+  protected ClusterId clusterId() {
+    return _clusterId;
+  }
+
+  /**
+   * Get the accessor for the properties stored for this cluster
+   * @return HelixDataAccessor
+   */
+  protected HelixDataAccessor dataAccessor() {
+    return _accessor;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/507feeb4/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index 3a34ca2..d0cc3ba 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@ -54,7 +54,6 @@ import org.apache.helix.api.id.SessionId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
-import org.apache.helix.manager.zk.ZKUtil;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -66,6 +65,7 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageState;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.util.HelixUtil;
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.ImmutableSet;
@@ -711,27 +711,44 @@ public class ParticipantAccessor {
 
   /**
    * Create empty persistent properties to ensure that there is a valid participant structure
+   * @param participantId the identifier under which to initialize the structure
+   * @return true if the participant structure exists at the end of this call, false otherwise
    */
-  public void initParticipantStructure(ParticipantId participantId) {
+  public boolean initParticipantStructure(ParticipantId participantId) {
+    if (participantId == null) {
+      LOG.error("Participant ID cannot be null when clearing the participant in cluster "
+          + _clusterId + "!");
+      return false;
+    }
     List<String> paths =
-        ZKUtil.getRequiredPathsForInstance(_clusterId.toString(), participantId.toString());
+        HelixUtil.getRequiredPathsForInstance(_clusterId.toString(), participantId.toString());
     BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
     for (String path : paths) {
       boolean status = baseAccessor.create(path, null, AccessOption.PERSISTENT);
-      if (!status && LOG.isDebugEnabled()) {
-        LOG.debug(path + " already exists");
+      if (!status) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(path + " already exists");
+        }
       }
     }
+    return true;
   }
 
   /**
    * Clear properties for the participant
+   * @param participantId the participant for which to clear
+   * @return true if all paths removed, false otherwise
    */
-  void clearParticipantStructure(ParticipantId participantId) {
+  protected boolean clearParticipantStructure(ParticipantId participantId) {
     List<String> paths =
-        ZKUtil.getRequiredPathsForInstance(_clusterId.toString(), participantId.toString());
+        HelixUtil.getRequiredPathsForInstance(_clusterId.toString(), participantId.toString());
     BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
-    baseAccessor.remove(paths, 0);
+    boolean[] removeResults = baseAccessor.remove(paths, 0);
+    boolean result = true;
+    for (boolean removeResult : removeResults) {
+      result = result && removeResult;
+    }
+    return result;
   }
 
   /**
@@ -740,7 +757,7 @@ public class ParticipantAccessor {
    */
   public boolean isParticipantStructureValid(ParticipantId participantId) {
     List<String> paths =
-        ZKUtil.getRequiredPathsForInstance(_clusterId.toString(), participantId.toString());
+        HelixUtil.getRequiredPathsForInstance(_clusterId.toString(), participantId.toString());
     BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
     if (baseAccessor != null) {
       boolean[] existsResults = baseAccessor.exists(paths, 0);
@@ -760,4 +777,20 @@ public class ParticipantAccessor {
   protected ResourceAccessor resourceAccessor() {
     return new ResourceAccessor(_clusterId, _accessor);
   }
+
+  /**
+   * Get the cluster ID this accessor is connected to
+   * @return ClusterId
+   */
+  protected ClusterId clusterId() {
+    return _clusterId;
+  }
+
+  /**
+   * Get the accessor for the properties stored for this cluster
+   * @return HelixDataAccessor
+   */
+  protected HelixDataAccessor dataAccessor() {
+    return _accessor;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/507feeb4/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
index a1d6580..310b457 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@ -510,4 +510,20 @@ public class ResourceAccessor {
   protected ParticipantAccessor participantAccessor() {
     return new ParticipantAccessor(_clusterId, _accessor);
   }
+
+  /**
+   * Get the cluster ID this accessor is connected to
+   * @return ClusterId
+   */
+  protected ClusterId clusterId() {
+    return _clusterId;
+  }
+
+  /**
+   * Get the accessor for the properties stored for this cluster
+   * @return HelixDataAccessor
+   */
+  protected HelixDataAccessor dataAccessor() {
+    return _accessor;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/507feeb4/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
index 30ee16c..9037151 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
@@ -25,10 +25,8 @@ import java.util.List;
 
 import org.I0Itec.zkclient.DataUpdater;
 import org.apache.helix.InstanceType;
-import org.apache.helix.PropertyPathConfig;
-import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.util.HelixUtil;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.data.Stat;
@@ -45,13 +43,15 @@ public final class ZKUtil {
       return false;
     }
 
-    List<String> requiredPaths = getRequiredPathsForCluster(clusterName);
+    List<String> requiredPaths = HelixUtil.getRequiredPathsForCluster(clusterName);
     boolean isValid = true;
 
     for (String path : requiredPaths) {
       if (!zkClient.exists(path)) {
         isValid = false;
-        logger.info("Invalid cluster setup, missing znode path: " + path);
+        if (logger.isInfoEnabled()) {
+          logger.info("Invalid cluster setup, missing znode path: " + path);
+        }
       }
     }
     return isValid;
@@ -60,13 +60,15 @@ public final class ZKUtil {
   public static boolean isInstanceSetup(ZkClient zkclient, String clusterName, String instanceName,
       InstanceType type) {
     if (type == InstanceType.PARTICIPANT || type == InstanceType.CONTROLLER_PARTICIPANT)
{
-      List<String> requiredPaths = getRequiredPathsForInstance(clusterName, instanceName);
+      List<String> requiredPaths = HelixUtil.getRequiredPathsForInstance(clusterName,
instanceName);
       boolean isValid = true;
 
       for (String path : requiredPaths) {
         if (!zkclient.exists(path)) {
           isValid = false;
-          logger.info("Invalid instance setup, missing znode path: " + path);
+          if (logger.isInfoEnabled()) {
+            logger.info("Invalid instance setup, missing znode path: " + path);
+          }
         }
       }
       return isValid;
@@ -75,53 +77,6 @@ public final class ZKUtil {
     return true;
   }
 
-  /**
-   * Get the required ZK paths for a valid cluster
-   * @param clusterName the cluster to check
-   * @return List of paths as strings
-   */
-  public static List<String> getRequiredPathsForCluster(String clusterName) {
-    List<String> requiredPaths = new ArrayList<String>();
-    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName));
-    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
-        ConfigScopeProperty.CLUSTER.toString(), clusterName));
-    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
-        ConfigScopeProperty.PARTICIPANT.toString()));
-    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
-        ConfigScopeProperty.RESOURCE.toString()));
-    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName));
-    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.LIVEINSTANCES, clusterName));
-    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.INSTANCES, clusterName));
-    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName));
-    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.CONTROLLER, clusterName));
-    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.STATEMODELDEFS, clusterName));
-    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES_CONTROLLER, clusterName));
-    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.ERRORS_CONTROLLER, clusterName));
-    requiredPaths.add(PropertyPathConfig
-        .getPath(PropertyType.STATUSUPDATES_CONTROLLER, clusterName));
-    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.HISTORY, clusterName));
-    return requiredPaths;
-  }
-
-  /**
-   * Get the required ZK paths for a valid instance
-   * @param clusterName the cluster that owns the instance
-   * @param instanceName the instance to check
-   * @return List of paths as strings
-   */
-  public static List<String> getRequiredPathsForInstance(String clusterName, String
instanceName) {
-    List<String> requiredPaths = new ArrayList<String>();
-    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
-        ConfigScopeProperty.PARTICIPANT.toString(), instanceName));
-    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, clusterName, instanceName));
-    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, clusterName,
-        instanceName));
-    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.STATUSUPDATES, clusterName,
-        instanceName));
-    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.ERRORS, clusterName, instanceName));
-    return requiredPaths;
-  }
-
   public static void createChildren(ZkClient client, String parentPath, List<ZNRecord>
list) {
     client.createPersistent(parentPath, true);
     if (list != null) {

http://git-wip-us.apache.org/repos/asf/helix/blob/507feeb4/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
index d59fad7..7bc5fa5 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java
@@ -172,14 +172,16 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener
{
 
   @Override
   public void disconnect() {
-    if (_zkclient == null) {
-      return;
-    }
-
-    LOG.info("Disconnecting connection: " + this);
-
     try {
       _lock.lock();
+      if (_zkclient == null) {
+        return;
+      }
+
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Disconnecting connection: " + this);
+      }
+
       for (final HelixConnectionStateListener listener : _connectionListener) {
         try {
 
@@ -190,7 +192,10 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener
{
       }
       _zkclient.close();
       _zkclient = null;
-      LOG.info("Disconnected connection: " + this);
+
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Disconnected connection: " + this);
+      }
     } catch (Exception e) {
       LOG.error("Exception disconnect", e);
     } finally {

http://git-wip-us.apache.org/repos/asf/helix/blob/507feeb4/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 82d7b6c..f289211 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -19,11 +19,14 @@ package org.apache.helix.util;
  * under the License.
  */
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
 import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.log4j.Logger;
 
 public final class HelixUtil {
@@ -150,6 +153,53 @@ public final class HelixUtil {
   }
 
   /**
+   * Get the required paths for a valid cluster
+   * @param clusterName the cluster to check
+   * @return List of paths as strings
+   */
+  public static List<String> getRequiredPathsForCluster(String clusterName) {
+    List<String> requiredPaths = new ArrayList<String>();
+    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName));
+    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
+        ConfigScopeProperty.CLUSTER.toString(), clusterName));
+    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
+        ConfigScopeProperty.PARTICIPANT.toString()));
+    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
+        ConfigScopeProperty.RESOURCE.toString()));
+    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName));
+    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.LIVEINSTANCES, clusterName));
+    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.INSTANCES, clusterName));
+    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName));
+    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.CONTROLLER, clusterName));
+    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.STATEMODELDEFS, clusterName));
+    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES_CONTROLLER, clusterName));
+    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.ERRORS_CONTROLLER, clusterName));
+    requiredPaths.add(PropertyPathConfig
+        .getPath(PropertyType.STATUSUPDATES_CONTROLLER, clusterName));
+    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.HISTORY, clusterName));
+    return requiredPaths;
+  }
+
+  /**
+   * Get the required paths for a valid instance
+   * @param clusterName the cluster that owns the instance
+   * @param instanceName the instance to check
+   * @return List of paths as strings
+   */
+  public static List<String> getRequiredPathsForInstance(String clusterName, String
instanceName) {
+    List<String> requiredPaths = new ArrayList<String>();
+    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.CONFIGS, clusterName,
+        ConfigScopeProperty.PARTICIPANT.toString(), instanceName));
+    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, clusterName, instanceName));
+    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, clusterName,
+        instanceName));
+    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.STATUSUPDATES, clusterName,
+        instanceName));
+    requiredPaths.add(PropertyPathConfig.getPath(PropertyType.ERRORS, clusterName, instanceName));
+    return requiredPaths;
+  }
+
+  /**
    * get the parent-path of given path
    * return "/" string if path = "/xxx", null if path = "/"
    * @param path


Mime
View raw message