helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [4/4] git commit: [HELIX-389] Unify accessor classes into a single class
Date Wed, 23 Jul 2014 21:07:46 GMT
[HELIX-389] Unify accessor classes into a single class


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ce1e926c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ce1e926c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ce1e926c

Branch: refs/heads/master
Commit: ce1e926c9c485ac6e87a4c41a3cb5c35bd681e39
Parents: 410815d
Author: Kanak Biscuitwala <kanak@apache.org>
Authored: Tue Jul 22 12:05:05 2014 -0700
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Wed Jul 23 11:13:06 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/HelixConnection.java  |   16 -
 .../java/org/apache/helix/HelixManager.java     |    8 -
 .../java/org/apache/helix/api/Resource.java     |   25 +-
 .../api/accessor/AtomicClusterAccessor.java     |  125 +-
 .../api/accessor/AtomicParticipantAccessor.java |  217 ----
 .../api/accessor/AtomicResourceAccessor.java    |  153 ---
 .../helix/api/accessor/ClusterAccessor.java     |  537 ++++++--
 .../helix/api/accessor/ControllerAccessor.java  |   49 -
 .../helix/api/accessor/ParticipantAccessor.java |  809 ------------
 .../helix/api/accessor/ResourceAccessor.java    |  541 --------
 .../apache/helix/api/config/ResourceConfig.java |   69 +-
 .../controller/rebalancer/CustomRebalancer.java |   25 +-
 .../rebalancer/FallbackRebalancer.java          |   28 +-
 .../rebalancer/FullAutoRebalancer.java          |   63 +-
 .../controller/rebalancer/HelixRebalancer.java  |    7 +-
 .../rebalancer/SemiAutoRebalancer.java          |   24 +-
 .../config/PartitionedRebalancerConfig.java     |   65 +
 .../stages/BestPossibleStateCalcStage.java      |   10 +-
 .../controller/stages/ClusterDataCache.java     |    5 -
 .../stages/CurrentStateComputationStage.java    |    9 +-
 .../stages/ExternalViewComputeStage.java        |    2 +-
 .../stages/MessageGenerationStage.java          |    2 +-
 .../stages/ResourceComputationStage.java        |    6 +-
 .../helix/manager/zk/ZkHelixConnection.java     |   12 -
 .../helix/manager/zk/ZkHelixController.java     |    2 +-
 .../helix/manager/zk/ZkHelixParticipant.java    |   12 +-
 .../java/org/apache/helix/model/IdealState.java |    3 +
 .../org/apache/helix/task/TaskRebalancer.java   |    6 +-
 .../org/apache/helix/tools/ClusterSetup.java    |    1 +
 .../org/apache/helix/tools/NewClusterSetup.java | 1168 ------------------
 .../org/apache/helix/api/TestNewStages.java     |    4 +-
 .../org/apache/helix/api/TestUpdateConfig.java  |   13 +-
 .../api/accessor/TestAccessorRecreate.java      |    5 +-
 .../stages/TestMessageThrottleStage.java        |   15 +-
 .../stages/TestRebalancePipeline.java           |   38 +-
 .../stages/TestResourceComputationStage.java    |    6 -
 .../TestCustomizedIdealStateRebalancer.java     |   18 +-
 .../helix/integration/TestHelixConnection.java  |    8 +-
 .../integration/TestLocalContainerProvider.java |    7 +-
 .../mbeans/TestClusterStatusMonitor.java        |    7 +-
 .../helix/examples/LogicalModelExample.java     |   12 +-
 .../StatelessParticipantService.java            |    8 +-
 .../provisioning/tools/ContainerAdmin.java      |   19 +-
 .../tools/UpdateProvisionerConfig.java          |   16 +-
 .../provisioning/yarn/AppMasterLauncher.java    |   14 +-
 .../yarn/AppStatusReportGenerator.java          |    6 +-
 .../yarn/example/JobRunnerMain.java             |    4 +-
 .../apache/helix/filestore/ChangeLogReader.java |    1 +
 .../LockManagerRebalancer.java                  |   16 +-
 49 files changed, 805 insertions(+), 3411 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/HelixConnection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixConnection.java b/helix-core/src/main/java/org/apache/helix/HelixConnection.java
index c56b01a..ff5f458 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixConnection.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixConnection.java
@@ -20,8 +20,6 @@ package org.apache.helix;
  */
 
 import org.apache.helix.api.accessor.ClusterAccessor;
-import org.apache.helix.api.accessor.ParticipantAccessor;
-import org.apache.helix.api.accessor.ResourceAccessor;
 import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.ControllerId;
 import org.apache.helix.api.id.ParticipantId;
@@ -94,20 +92,6 @@ public interface HelixConnection {
   ClusterAccessor createClusterAccessor(ClusterId clusterId);
 
   /**
-   * create a resource accessor
-   * @param clusterId
-   * @return resource accessor
-   */
-  ResourceAccessor createResourceAccessor(ClusterId clusterId);
-
-  /**
-   * create a participant accessor
-   * @param clusterId
-   * @return participant-accessor
-   */
-  ParticipantAccessor createParticipantAccessor(ClusterId clusterId);
-
-  /**
    * Provides admin interface to setup and modify cluster
    * @return instantiated HelixAdmin
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/HelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java b/helix-core/src/main/java/org/apache/helix/HelixManager.java
index 6901715..73313c0 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java
@@ -33,11 +33,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
  * Class that represents the Helix Agent.
  * First class Object any process will interact with<br/>
  * General flow <blockquote>
-<<<<<<< HEAD
  * 
-=======
- *
->>>>>>> 77cc651... [HELIX-395] Remove old Helix alert/stat modules
  * <pre>
  * manager = HelixManagerFactory.getZKHelixManager(
  *    clusterName, instanceName, ROLE, zkAddr);
@@ -53,11 +49,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
  * FINALIZE -> will be invoked when listener is removed or session expires
  * manager.disconnect()
  * </pre>
-<<<<<<< HEAD
  * 
-=======
- *
->>>>>>> 77cc651... [HELIX-395] Remove old Helix alert/stat modules
  * </blockquote> Default implementations available
  * @see HelixStateMachineEngine HelixStateMachineEngine for participant
  * @see RoutingTableProvider RoutingTableProvider for spectator

http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index 1153032..239748c 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -42,7 +42,6 @@ import org.apache.helix.model.ResourceAssignment;
  */
 public class Resource {
   private final ResourceConfig _config;
-  private final IdealState _idealState;
   private final ExternalView _externalView;
   private final ResourceAssignment _resourceAssignment;
 
@@ -65,9 +64,8 @@ public class Resource {
       UserConfig userConfig, int bucketSize, boolean batchMessageMode) {
     SchedulerTaskConfig schedulerTaskConfig = schedulerTaskConfig(idealState);
     _config =
-        new ResourceConfig(id, type, schedulerTaskConfig, rebalancerConfig, provisionerConfig,
-            userConfig, bucketSize, batchMessageMode);
-    _idealState = idealState;
+        new ResourceConfig(id, type, idealState, schedulerTaskConfig, rebalancerConfig,
+            provisionerConfig, userConfig, bucketSize, batchMessageMode);
     _externalView = externalView;
     _resourceAssignment = resourceAssignment;
   }
@@ -115,23 +113,6 @@ public class Resource {
   }
 
   /**
-   * Get the subunits of the resource
-   * @return map of subunit id to partition or empty map if none
-   */
-  public Map<? extends PartitionId, ? extends Partition> getSubUnitMap() {
-    return _config.getSubUnitMap();
-  }
-
-  /**
-   * Get a subunit that the resource contains
-   * @param subUnitId the subunit id to look up
-   * @return Partition or null if none is present with the given id
-   */
-  public Partition getSubUnit(PartitionId subUnitId) {
-    return _config.getSubUnit(subUnitId);
-  }
-
-  /**
    * Get the set of subunit ids that the resource contains
    * @return subunit id set, or empty if none
    */
@@ -216,7 +197,7 @@ public class Resource {
    * @return IdealState instance
    */
   public IdealState getIdealState() {
-    return _idealState;
+    return _config.getIdealState();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
index 216b3ad..83fde95 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
@@ -19,12 +19,7 @@ package org.apache.helix.api.accessor;
  * under the License.
  */
 
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.PropertyKey;
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.Resource;
@@ -39,9 +34,6 @@ import org.apache.helix.lock.HelixLock;
 import org.apache.helix.lock.HelixLockable;
 import org.apache.log4j.Logger;
 
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
 /**
  * An atomic version of the ClusterAccessor. If atomic operations are required, use instances of
  * this class. Atomicity is not guaranteed when using instances of ClusterAccessor alongside
@@ -203,67 +195,102 @@ public class AtomicClusterAccessor extends ClusterAccessor {
     return null;
   }
 
-  /**
-   * Read resources atomically. This is resource-atomic, not cluster-atomic
-   */
   @Override
-  public Map<ResourceId, Resource> readResources() {
-    // read resources individually instead of together to maintain the equality link between ideal
-    // state and resource config
+  public Participant readParticipant(ParticipantId participantId) {
     ClusterId clusterId = clusterId();
-    HelixDataAccessor dataAccessor = dataAccessor();
-    PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
-    Map<ResourceId, Resource> resources = Maps.newHashMap();
-    Set<String> idealStateNames =
-        Sets.newHashSet(dataAccessor.getChildNames(keyBuilder.idealStates()));
-    Set<String> resourceConfigNames =
-        Sets.newHashSet(dataAccessor.getChildNames(keyBuilder.resourceConfigs()));
-    resourceConfigNames.addAll(idealStateNames);
-    ResourceAccessor accessor = new AtomicResourceAccessor(clusterId, dataAccessor, _lockProvider);
-    for (String resourceName : resourceConfigNames) {
-      ResourceId resourceId = ResourceId.from(resourceName);
-      Resource resource = accessor.readResource(resourceId);
-      if (resource != null) {
-        resources.put(resourceId, resource);
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return _clusterAccessor.readParticipant(participantId);
+      } finally {
+        lock.unlock();
       }
     }
-    return resources;
+    return null;
   }
 
-  /**
-   * Read participants atomically. This is participant-atomic, not cluster-atomic
-   */
   @Override
-  public Map<ParticipantId, Participant> readParticipants() {
-    // read participants individually to keep configs consistent with current state and messages
+  public boolean setParticipant(ParticipantConfig participantConfig) {
+    if (participantConfig == null) {
+      LOG.error("participant config cannot be null");
+      return false;
+    }
     ClusterId clusterId = clusterId();
-    HelixDataAccessor dataAccessor = dataAccessor();
-    PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
-    Map<ParticipantId, Participant> participants = Maps.newHashMap();
-    ParticipantAccessor accessor =
-        new AtomicParticipantAccessor(clusterId, dataAccessor, _lockProvider);
-    List<String> participantNames = dataAccessor.getChildNames(keyBuilder.instanceConfigs());
-    for (String participantName : participantNames) {
-      ParticipantId participantId = ParticipantId.from(participantName);
-      Participant participant = accessor.readParticipant(participantId);
-      if (participant != null) {
-        participants.put(participantId, participant);
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantConfig.getId()));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return _clusterAccessor.setParticipant(participantConfig);
+      } finally {
+        lock.unlock();
       }
     }
-    return participants;
+    return false;
   }
 
   @Override
-  public void initClusterStructure() {
+  public ParticipantConfig updateParticipant(ParticipantId participantId,
+      ParticipantConfig.Delta participantDelta) {
     ClusterId clusterId = clusterId();
-    HelixLock lock = _lockProvider.getLock(clusterId, Scope.cluster(clusterId));
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return _clusterAccessor.updateParticipant(participantId, participantDelta);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public Resource readResource(ResourceId resourceId) {
+    ClusterId clusterId = clusterId();
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.resource(resourceId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return _clusterAccessor.readResource(resourceId);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public ResourceConfig updateResource(ResourceId resourceId, ResourceConfig.Delta resourceDelta) {
+    ClusterId clusterId = clusterId();
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.resource(resourceId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return _clusterAccessor.updateResource(resourceId, resourceDelta);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public boolean setResource(ResourceConfig resourceConfig) {
+    if (resourceConfig == null) {
+      LOG.error("resource config cannot be null");
+      return false;
+    }
+    ClusterId clusterId = clusterId();
+    HelixLock lock = _lockProvider.getLock(clusterId, Scope.resource(resourceConfig.getId()));
     boolean locked = lock.lock();
     if (locked) {
       try {
-        _clusterAccessor.initClusterStructure();
+        return _clusterAccessor.setResource(resourceConfig);
       } finally {
         lock.unlock();
       }
     }
+    return false;
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
deleted file mode 100644
index 6b9b10e..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
+++ /dev/null
@@ -1,217 +0,0 @@
-package org.apache.helix.api.accessor;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.api.Participant;
-import org.apache.helix.api.Scope;
-import org.apache.helix.api.config.ParticipantConfig;
-import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.api.id.MessageId;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.lock.HelixLock;
-import org.apache.helix.lock.HelixLockable;
-import org.apache.helix.model.Message;
-import org.apache.log4j.Logger;
-
-/**
- * An atomic version of the ParticipantAccessor. If atomic operations are required, use instances of
- * this class. Atomicity is not guaranteed when using instances of ParticipantAccessor alongside
- * instances of this class. Furthermore, depending on the semantics of the lock, lock acquisition
- * may fail, in which case users should handle the return value of each function if necessary. <br/>
- * <br/>
- * Using this class is quite expensive; it should thus be used sparingly and only in systems where
- * contention on these operations is expected. For most systems running Helix, this is typically not
- * the case.
- */
-public class AtomicParticipantAccessor extends ParticipantAccessor {
-  private static final Logger LOG = Logger.getLogger(AtomicParticipantAccessor.class);
-
-  private final HelixLockable _lockProvider;
-
-  /**
-   * Non-atomic instance to protect against reentrant locking via polymorphism
-   */
-  private final ParticipantAccessor _participantAccessor;
-
-  /**
-   * Instantiate the accessor
-   * @param clusterId the cluster to access
-   * @param accessor a HelixDataAccessor for the physical properties
-   * @param lockProvider a lock provider
-   */
-  public AtomicParticipantAccessor(ClusterId clusterId, HelixDataAccessor accessor,
-      HelixLockable lockProvider) {
-    super(clusterId, accessor);
-    _lockProvider = lockProvider;
-    _participantAccessor = new ParticipantAccessor(clusterId, accessor);
-  }
-
-  @Override
-  boolean enableParticipant(ParticipantId participantId, boolean isEnabled) {
-    ClusterId clusterId = clusterId();
-    HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId));
-    boolean locked = lock.lock();
-    if (locked) {
-      try {
-        return _participantAccessor.enableParticipant(participantId);
-      } finally {
-        lock.unlock();
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public Participant readParticipant(ParticipantId participantId) {
-    ClusterId clusterId = clusterId();
-    HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId));
-    boolean locked = lock.lock();
-    if (locked) {
-      try {
-        return _participantAccessor.readParticipant(participantId);
-      } finally {
-        lock.unlock();
-      }
-    }
-    return null;
-  }
-
-  @Override
-  public boolean setParticipant(ParticipantConfig participantConfig) {
-    if (participantConfig == null) {
-      LOG.error("participant config cannot be null");
-      return false;
-    }
-    ClusterId clusterId = clusterId();
-    HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantConfig.getId()));
-    boolean locked = lock.lock();
-    if (locked) {
-      try {
-        return _participantAccessor.setParticipant(participantConfig);
-      } finally {
-        lock.unlock();
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public ParticipantConfig updateParticipant(ParticipantId participantId,
-      ParticipantConfig.Delta participantDelta) {
-    ClusterId clusterId = clusterId();
-    HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId));
-    boolean locked = lock.lock();
-    if (locked) {
-      try {
-        return _participantAccessor.updateParticipant(participantId, participantDelta);
-      } finally {
-        lock.unlock();
-      }
-    }
-    return null;
-  }
-
-  @Override
-  boolean dropParticipant(ParticipantId participantId) {
-    ClusterId clusterId = clusterId();
-    HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId));
-    boolean locked = lock.lock();
-    if (locked) {
-      try {
-        return _participantAccessor.dropParticipant(participantId);
-      } finally {
-        lock.unlock();
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public void insertMessagesToParticipant(ParticipantId participantId,
-      Map<MessageId, Message> msgMap) {
-    ClusterId clusterId = clusterId();
-    HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId));
-    boolean locked = lock.lock();
-    if (locked) {
-      try {
-        _participantAccessor.insertMessagesToParticipant(participantId, msgMap);
-      } finally {
-        lock.unlock();
-      }
-    }
-    return;
-  }
-
-  @Override
-  public void updateMessageStatus(ParticipantId participantId, Map<MessageId, Message> msgMap) {
-    ClusterId clusterId = clusterId();
-    HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId));
-    boolean locked = lock.lock();
-    if (locked) {
-      try {
-        _participantAccessor.updateMessageStatus(participantId, msgMap);
-      } finally {
-        lock.unlock();
-      }
-    }
-    return;
-  }
-
-  @Override
-  public void deleteMessagesFromParticipant(ParticipantId participantId, Set<MessageId> msgIdSet) {
-    ClusterId clusterId = clusterId();
-    HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId));
-    boolean locked = lock.lock();
-    if (locked) {
-      try {
-        _participantAccessor.deleteMessagesFromParticipant(participantId, msgIdSet);
-      } finally {
-        lock.unlock();
-      }
-    }
-    return;
-  }
-
-  @Override
-  public boolean initParticipantStructure(ParticipantId participantId) {
-    ClusterId clusterId = clusterId();
-    HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId));
-    boolean locked = lock.lock();
-    if (locked) {
-      try {
-        return _participantAccessor.initParticipantStructure(participantId);
-      } finally {
-        lock.unlock();
-      }
-    }
-    return false;
-  }
-
-  @Override
-  protected ResourceAccessor resourceAccessor() {
-    ClusterId clusterId = clusterId();
-    HelixDataAccessor accessor = dataAccessor();
-    return new AtomicResourceAccessor(clusterId, accessor, _lockProvider);
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
deleted file mode 100644
index 65fda39..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
+++ /dev/null
@@ -1,153 +0,0 @@
-package org.apache.helix.api.accessor;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.api.Resource;
-import org.apache.helix.api.Scope;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
-import org.apache.helix.lock.HelixLock;
-import org.apache.helix.lock.HelixLockable;
-import org.apache.log4j.Logger;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * An atomic version of the ResourceAccessor. If atomic operations are required, use instances of
- * this class. Atomicity is not guaranteed when using instances of ResourceAccessor alongside
- * instances of this class. Furthermore, depending on the semantics of the lock, lock acquisition
- * may fail, in which case users should handle the return value of each function if necessary. <br/>
- * <br/>
- * Using this class is quite expensive; it should thus be used sparingly and only in systems where
- * contention on these operations is expected. For most systems running Helix, this is typically not
- * the case.
- */
-public class AtomicResourceAccessor extends ResourceAccessor {
-  private static final Logger LOG = Logger.getLogger(AtomicResourceAccessor.class);
-
-  private final HelixLockable _lockProvider;
-
-  /**
-   * Non-atomic instance to protect against reentrant locking via polymorphism
-   */
-  private final ResourceAccessor _resourceAccessor;
-
-  /**
-   * Instantiate the accessor
-   * @param clusterId the cluster to access
-   * @param accessor a HelixDataAccessor for the physical properties
-   * @param lockProvider a lock provider
-   */
-  public AtomicResourceAccessor(ClusterId clusterId, HelixDataAccessor accessor,
-      HelixLockable lockProvider) {
-    super(clusterId, accessor);
-    _lockProvider = lockProvider;
-    _resourceAccessor = new ResourceAccessor(clusterId, accessor);
-  }
-
-  @Override
-  public Resource readResource(ResourceId resourceId) {
-    ClusterId clusterId = clusterId();
-    HelixLock lock = _lockProvider.getLock(clusterId, Scope.resource(resourceId));
-    boolean locked = lock.lock();
-    if (locked) {
-      try {
-        return _resourceAccessor.readResource(resourceId);
-      } finally {
-        lock.unlock();
-      }
-    }
-    return null;
-  }
-
-  @Override
-  public ResourceConfig updateResource(ResourceId resourceId, ResourceConfig.Delta resourceDelta) {
-    ClusterId clusterId = clusterId();
-    HelixLock lock = _lockProvider.getLock(clusterId, Scope.resource(resourceId));
-    boolean locked = lock.lock();
-    if (locked) {
-      try {
-        return _resourceAccessor.updateResource(resourceId, resourceDelta);
-      } finally {
-        lock.unlock();
-      }
-    }
-    return null;
-  }
-
-  @Override
-  public boolean setRebalancerConfig(ResourceId resourceId, RebalancerConfig config) {
-    ClusterId clusterId = clusterId();
-    HelixLock lock = _lockProvider.getLock(clusterId, Scope.resource(resourceId));
-    boolean locked = lock.lock();
-    if (locked) {
-      try {
-        return _resourceAccessor.setRebalancerConfig(resourceId, config);
-      } finally {
-        lock.unlock();
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public boolean setResource(ResourceConfig resourceConfig) {
-    if (resourceConfig == null) {
-      LOG.error("resource config cannot be null");
-      return false;
-    }
-    ClusterId clusterId = clusterId();
-    HelixLock lock = _lockProvider.getLock(clusterId, Scope.resource(resourceConfig.getId()));
-    boolean locked = lock.lock();
-    if (locked) {
-      try {
-        return _resourceAccessor.setResource(resourceConfig);
-      } finally {
-        lock.unlock();
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public boolean generateDefaultAssignment(ResourceId resourceId, int replicaCount,
-      String participantGroupTag) {
-    ClusterId clusterId = clusterId();
-    HelixLock lock = _lockProvider.getLock(clusterId, Scope.cluster(clusterId));
-    boolean locked = lock.lock();
-    if (locked) {
-      try {
-        return _resourceAccessor.generateDefaultAssignment(resourceId, replicaCount,
-            participantGroupTag);
-      } finally {
-        lock.unlock();
-      }
-    }
-    return false;
-  }
-
-  @Override
-  protected ParticipantAccessor participantAccessor() {
-    ClusterId clusterId = clusterId();
-    HelixDataAccessor accessor = dataAccessor();
-    return new AtomicParticipantAccessor(clusterId, accessor, _lockProvider);
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index 92fb636..21d40b1 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -22,6 +22,7 @@ package org.apache.helix.api.accessor;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -34,15 +35,18 @@ import org.apache.helix.api.Cluster;
 import org.apache.helix.api.Controller;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.Resource;
+import org.apache.helix.api.RunningInstance;
 import org.apache.helix.api.Scope;
 import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.config.ContainerConfig;
 import org.apache.helix.api.config.ParticipantConfig;
 import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.config.ResourceConfig.ResourceType;
 import org.apache.helix.api.config.UserConfig;
 import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.api.id.ConstraintId;
 import org.apache.helix.api.id.ContextId;
 import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.api.id.MessageId;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
@@ -50,6 +54,9 @@ import org.apache.helix.api.id.SessionId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.context.ControllerContext;
 import org.apache.helix.controller.context.ControllerContextHolder;
+import org.apache.helix.controller.provisioner.ContainerId;
+import org.apache.helix.controller.provisioner.ContainerSpec;
+import org.apache.helix.controller.provisioner.ContainerState;
 import org.apache.helix.controller.provisioner.ProvisionerConfig;
 import org.apache.helix.controller.rebalancer.RebalancerRef;
 import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig;
@@ -275,14 +282,6 @@ public class ClusterAccessor {
 
   /**
    * Get all the state model definitions for this cluster
-   * @return map of state model def id to state model definition
-   */
-  public Map<StateModelDefId, StateModelDefinition> readStateModelDefinitions() {
-    return readStateModelDefinitions(false);
-  }
-
-  /**
-   * Get all the state model definitions for this cluster
    * @param useCache Use the ClusterDataCache associated with this class rather than reading again
    * @return map of state model def id to state model definition
    */
@@ -302,14 +301,6 @@ public class ClusterAccessor {
 
   /**
    * Read all resources in the cluster
-   * @return map of resource id to resource
-   */
-  public Map<ResourceId, Resource> readResources() {
-    return readResources(false);
-  }
-
-  /**
-   * Read all resources in the cluster
    * @param useCache Use the ClusterDataCache associated with this class rather than reading again
    * @return map of resource id to resource
    */
@@ -375,9 +366,11 @@ public class ClusterAccessor {
     Map<ResourceId, Resource> resourceMap = Maps.newHashMap();
     for (String resourceName : allResources) {
       ResourceId resourceId = ResourceId.from(resourceName);
-      resourceMap.put(resourceId, ResourceAccessor.createResource(resourceId,
-          resourceConfigMap.get(resourceName), idealStateMap.get(resourceName),
-          externalViewMap.get(resourceName), resourceAssignmentMap.get(resourceName)));
+      resourceMap.put(
+          resourceId,
+          createResource(resourceId, resourceConfigMap.get(resourceName),
+              idealStateMap.get(resourceName), externalViewMap.get(resourceName),
+              resourceAssignmentMap.get(resourceName)));
     }
 
     return resourceMap;
@@ -385,14 +378,6 @@ public class ClusterAccessor {
 
   /**
    * Read all participants in the cluster
-   * @return map of participant id to participant, or empty map
-   */
-  public Map<ParticipantId, Participant> readParticipants() {
-    return readParticipants(false);
-  }
-
-  /**
-   * Read all participants in the cluster
    * @param useCache Use the ClusterDataCache associated with this class rather than reading again
    * @return map of participant id to participant, or empty map
    */
@@ -453,75 +438,16 @@ public class ClusterAccessor {
 
       ParticipantId participantId = ParticipantId.from(participantName);
 
-      participantMap.put(participantId, ParticipantAccessor.createParticipant(participantId,
-          instanceConfig, userConfig, liveInstance, instanceMsgMap,
-          currentStateMap.get(participantName)));
+      participantMap.put(
+          participantId,
+          createParticipant(participantId, instanceConfig, userConfig, liveInstance,
+              instanceMsgMap, currentStateMap.get(participantName)));
     }
 
     return participantMap;
   }
 
   /**
-   * Get cluster constraints of a given type
-   * @param type ConstraintType value
-   * @return ClusterConstraints, or null if none present
-   */
-  public ClusterConstraints readConstraints(ConstraintType type) {
-    return _accessor.getProperty(_keyBuilder.constraint(type.toString()));
-  }
-
-  /**
-   * Remove a constraint from the cluster
-   * @param type the constraint type
-   * @param constraintId the constraint id
-   * @return true if removed, false otherwise
-   */
-  public boolean removeConstraint(ConstraintType type, ConstraintId constraintId) {
-    ClusterConstraints constraints = _accessor.getProperty(_keyBuilder.constraint(type.toString()));
-    if (constraints == null || constraints.getConstraintItem(constraintId) == null) {
-      LOG.error("Constraint with id " + constraintId + " not present");
-      return false;
-    }
-    constraints.removeConstraintItem(constraintId);
-    return _accessor.setProperty(_keyBuilder.constraint(type.toString()), constraints);
-  }
-
-  /**
-   * Read the user config of the cluster
-   * @return UserConfig, or null
-   */
-  public UserConfig readUserConfig() {
-    ClusterConfiguration clusterConfig = _accessor.getProperty(_keyBuilder.clusterConfig());
-    return clusterConfig != null ? clusterConfig.getUserConfig() : null;
-  }
-
-  /**
-   * Set the user config of the cluster, overwriting existing user configs
-   * @param userConfig the new user config
-   * @return true if the user config was set, false otherwise
-   */
-  public boolean setUserConfig(UserConfig userConfig) {
-    ClusterConfig.Delta delta = new ClusterConfig.Delta(_clusterId).setUserConfig(userConfig);
-    return updateCluster(delta) != null;
-  }
-
-  /**
-   * Clear any user-specified configuration from the cluster
-   * @return true if the config was cleared, false otherwise
-   */
-  public boolean dropUserConfig() {
-    return setUserConfig(new UserConfig(Scope.cluster(_clusterId)));
-  }
-
-  /**
-   * Read the persisted controller contexts
-   * @return map of context id to controller context
-   */
-  public Map<ContextId, ControllerContext> readControllerContext() {
-    return readControllerContext(false);
-  }
-
-  /**
    * Read the persisted controller contexts
    * @param useCache Use the ClusterDataCache associated with this class rather than reading again
    * @return map of context id to controller context
@@ -541,18 +467,6 @@ public class ClusterAccessor {
   }
 
   /**
-   * Add user configuration to the existing cluster user configuration. Overwrites properties with
-   * the same key
-   * @param userConfig the user config key-value pairs to add
-   * @return true if the user config was updated, false otherwise
-   */
-  public boolean updateUserConfig(UserConfig userConfig) {
-    ClusterConfiguration clusterConfig = new ClusterConfiguration(_clusterId);
-    clusterConfig.addNamespacedConfig(userConfig);
-    return _accessor.updateProperty(_keyBuilder.clusterConfig(), clusterConfig);
-  }
-
-  /**
    * pause controller of cluster
    * @return true if cluster was paused, false if pause failed or already paused
    */
@@ -604,7 +518,7 @@ public class ClusterAccessor {
 
     // Create an IdealState from a RebalancerConfig (if the resource supports it)
     IdealState idealState =
-        ResourceAccessor.rebalancerConfigToIdealState(resource.getRebalancerConfig(),
+        PartitionedRebalancerConfig.rebalancerConfigToIdealState(resource.getRebalancerConfig(),
             resource.getBucketSize(), resource.getBatchMessageMode());
     if (idealState != null) {
       _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
@@ -653,14 +567,13 @@ public class ClusterAccessor {
    * check if cluster structure is valid
    * @return true if valid or false otherwise
    */
-  public boolean isClusterStructureValid() {
+  protected boolean isClusterStructureValid() {
     List<String> paths = HelixUtil.getRequiredPathsForCluster(_clusterId.toString());
     BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
     if (baseAccessor != null) {
       boolean[] existsResults = baseAccessor.exists(paths, 0);
       int ind = 0;
       for (boolean exists : existsResults) {
-
         if (!exists) {
           LOG.warn("Path does not exist:" + paths.get(ind));
           return false;
@@ -674,7 +587,7 @@ public class ClusterAccessor {
   /**
    * Create empty persistent properties to ensure that there is a valid cluster structure
    */
-  public void initClusterStructure() {
+  private void initClusterStructure() {
     BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
     List<String> paths = HelixUtil.getRequiredPathsForCluster(_clusterId.toString());
     for (String path : paths) {
@@ -709,19 +622,18 @@ public class ClusterAccessor {
       return false;
     }
 
-    ParticipantAccessor participantAccessor = new ParticipantAccessor(_clusterId, _accessor);
     ParticipantId participantId = participant.getId();
     InstanceConfig existConfig =
         _accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify()));
-    if (existConfig != null && participantAccessor.isParticipantStructureValid(participantId)) {
+    if (existConfig != null && isParticipantStructureValid(participantId)) {
       LOG.error("Config for participant: " + participantId + " already exists in cluster: "
           + _clusterId);
       return false;
     }
 
     // clear and rebuild the participant structure
-    participantAccessor.clearParticipantStructure(participantId);
-    participantAccessor.initParticipantStructure(participantId);
+    clearParticipantStructure(participantId);
+    initParticipantStructure(participantId);
 
     // add the config
     InstanceConfig instanceConfig = new InstanceConfig(participant.getId());
@@ -748,8 +660,20 @@ public class ClusterAccessor {
    * @return true if participant dropped, false if there was an error
    */
   public boolean dropParticipantFromCluster(ParticipantId participantId) {
-    ParticipantAccessor accessor = new ParticipantAccessor(_clusterId, _accessor);
-    return accessor.dropParticipant(participantId);
+    if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) == null) {
+      LOG.error("Config for participant: " + participantId + " does NOT exist in cluster");
+    }
+
+    if (_accessor.getProperty(_keyBuilder.instance(participantId.stringify())) == null) {
+      LOG.error("Participant: " + participantId + " structure does NOT exist in cluster");
+    }
+
+    // delete participant config path
+    _accessor.removeProperty(_keyBuilder.instanceConfig(participantId.stringify()));
+
+    // delete participant path
+    _accessor.removeProperty(_keyBuilder.instance(participantId.stringify()));
+    return true;
   }
 
   /**
@@ -777,6 +701,332 @@ public class ClusterAccessor {
   }
 
   /**
+   * Read the leader controller if it is live
+   * @return Controller snapshot, or null
+   */
+  public Controller readLeader() {
+    LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
+    if (leader != null) {
+      ControllerId leaderId = ControllerId.from(leader.getId());
+      return new Controller(leaderId, leader, true);
+    }
+    return null;
+  }
+
+  /**
+   * Update a participant configuration
+   * @param participantId the participant to update
+   * @param participantDelta changes to the participant
+   * @return ParticipantConfig, or null if participant is not persisted
+   */
+  public ParticipantConfig updateParticipant(ParticipantId participantId,
+      ParticipantConfig.Delta participantDelta) {
+    Participant participant = readParticipant(participantId);
+    if (participant == null) {
+      LOG.error("Participant " + participantId + " does not exist, cannot be updated");
+      return null;
+    }
+    ParticipantConfig config = participantDelta.mergeInto(participant.getConfig());
+    setParticipant(config);
+    return config;
+  }
+
+  /**
+   * Set the configuration of an existing participant
+   * @param participantConfig participant configuration
+   * @return true if config was set, false if there was an error
+   */
+  public boolean setParticipant(ParticipantConfig participantConfig) {
+    if (participantConfig == null) {
+      LOG.error("Participant config not initialized");
+      return false;
+    }
+    InstanceConfig instanceConfig = new InstanceConfig(participantConfig.getId());
+    instanceConfig.setHostName(participantConfig.getHostName());
+    instanceConfig.setPort(Integer.toString(participantConfig.getPort()));
+    for (String tag : participantConfig.getTags()) {
+      instanceConfig.addTag(tag);
+    }
+    for (PartitionId partitionId : participantConfig.getDisabledPartitions()) {
+      instanceConfig.setParticipantEnabledForPartition(partitionId, false);
+    }
+    instanceConfig.setInstanceEnabled(participantConfig.isEnabled());
+    instanceConfig.addNamespacedConfig(participantConfig.getUserConfig());
+    _accessor.setProperty(_keyBuilder.instanceConfig(participantConfig.getId().stringify()),
+        instanceConfig);
+    return true;
+  }
+
+  /**
+   * create a participant based on physical model
+   * @param participantId
+   * @param instanceConfig
+   * @param userConfig
+   * @param liveInstance
+   * @param instanceMsgMap map of message-id to message
+   * @param instanceCurStateMap map of resource-id to current-state
+   * @return participant
+   */
+  private static Participant createParticipant(ParticipantId participantId,
+      InstanceConfig instanceConfig, UserConfig userConfig, LiveInstance liveInstance,
+      Map<String, Message> instanceMsgMap, Map<String, CurrentState> instanceCurStateMap) {
+
+    String hostName = instanceConfig.getHostName();
+
+    int port = -1;
+    try {
+      port = Integer.parseInt(instanceConfig.getPort());
+    } catch (IllegalArgumentException e) {
+      // keep as -1
+    }
+    if (port < 0 || port > 65535) {
+      port = -1;
+    }
+    boolean isEnabled = instanceConfig.getInstanceEnabled();
+
+    List<String> disabledPartitions = instanceConfig.getDisabledPartitions();
+    Set<PartitionId> disabledPartitionIdSet = Collections.emptySet();
+    if (disabledPartitions != null) {
+      disabledPartitionIdSet = new HashSet<PartitionId>();
+      for (String partitionId : disabledPartitions) {
+        disabledPartitionIdSet.add(PartitionId.from(PartitionId.extractResourceId(partitionId),
+            PartitionId.stripResourceId(partitionId)));
+      }
+    }
+
+    Set<String> tags = new HashSet<String>(instanceConfig.getTags());
+
+    RunningInstance runningInstance = null;
+    if (liveInstance != null) {
+      runningInstance =
+          new RunningInstance(liveInstance.getTypedSessionId(),
+              liveInstance.getTypedHelixVersion(), liveInstance.getProcessId());
+    }
+
+    Map<MessageId, Message> msgMap = new HashMap<MessageId, Message>();
+    if (instanceMsgMap != null) {
+      for (String msgId : instanceMsgMap.keySet()) {
+        Message message = instanceMsgMap.get(msgId);
+        msgMap.put(MessageId.from(msgId), message);
+      }
+    }
+
+    Map<ResourceId, CurrentState> curStateMap = new HashMap<ResourceId, CurrentState>();
+    if (instanceCurStateMap != null) {
+
+      for (String resourceName : instanceCurStateMap.keySet()) {
+        curStateMap.put(ResourceId.from(resourceName), instanceCurStateMap.get(resourceName));
+      }
+    }
+
+    // set up the container config if it exists
+    ContainerConfig containerConfig = null;
+    ContainerSpec containerSpec = instanceConfig.getContainerSpec();
+    ContainerState containerState = instanceConfig.getContainerState();
+    ContainerId containerId = instanceConfig.getContainerId();
+    if (containerSpec != null || containerState != null || containerId != null) {
+      containerConfig = new ContainerConfig(containerId, containerSpec, containerState);
+    }
+
+    return new Participant(participantId, hostName, port, isEnabled, disabledPartitionIdSet, tags,
+        runningInstance, curStateMap, msgMap, userConfig, containerConfig);
+  }
+
+  /**
+   * read participant related data
+   * @param participantId
+   * @return participant, or null if participant not available
+   */
+  public Participant readParticipant(ParticipantId participantId) {
+    // read physical model
+    String participantName = participantId.stringify();
+    InstanceConfig instanceConfig =
+        _accessor.getProperty(_keyBuilder.instanceConfig(participantName));
+
+    if (instanceConfig == null) {
+      LOG.error("Participant " + participantId + " is not present on the cluster");
+      return null;
+    }
+
+    UserConfig userConfig = instanceConfig.getUserConfig();
+    LiveInstance liveInstance = _accessor.getProperty(_keyBuilder.liveInstance(participantName));
+
+    Map<String, Message> instanceMsgMap = Collections.emptyMap();
+    Map<String, CurrentState> instanceCurStateMap = Collections.emptyMap();
+    if (liveInstance != null) {
+      SessionId sessionId = liveInstance.getTypedSessionId();
+
+      instanceMsgMap = _accessor.getChildValuesMap(_keyBuilder.messages(participantName));
+      instanceCurStateMap =
+          _accessor.getChildValuesMap(_keyBuilder.currentStates(participantName,
+              sessionId.stringify()));
+    }
+
+    return createParticipant(participantId, instanceConfig, userConfig, liveInstance,
+        instanceMsgMap, instanceCurStateMap);
+  }
+
+  /**
+   * Read a single snapshot of a resource
+   * @param resourceId the resource id to read
+   * @return Resource or null if not present
+   */
+  public Resource readResource(ResourceId resourceId) {
+    ResourceConfiguration config =
+        _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
+    IdealState idealState = _accessor.getProperty(_keyBuilder.idealStates(resourceId.stringify()));
+
+    if (config == null && idealState == null) {
+      LOG.error("Resource " + resourceId + " not present on the cluster");
+      return null;
+    }
+
+    ExternalView externalView =
+        _accessor.getProperty(_keyBuilder.externalView(resourceId.stringify()));
+    ResourceAssignment resourceAssignment =
+        _accessor.getProperty(_keyBuilder.resourceAssignment(resourceId.stringify()));
+    return createResource(resourceId, config, idealState, externalView, resourceAssignment);
+  }
+
+  /**
+   * Update a resource configuration
+   * @param resourceId the resource id to update
+   * @param resourceDelta changes to the resource
+   * @return ResourceConfig, or null if the resource is not persisted
+   */
+  public ResourceConfig updateResource(ResourceId resourceId, ResourceConfig.Delta resourceDelta) {
+    Resource resource = readResource(resourceId);
+    if (resource == null) {
+      LOG.error("Resource " + resourceId + " does not exist, cannot be updated");
+      return null;
+    }
+    ResourceConfig config = resourceDelta.mergeInto(resource.getConfig());
+    setResource(config);
+    return config;
+  }
+
+  /**
+   * Set a physical resource configuration, which may include user-defined configuration, as well as
+   * rebalancer configuration
+   * @param resourceId
+   * @param configuration
+   * @return true if set, false otherwise
+   */
+  private boolean setResourceConfiguration(ResourceId resourceId,
+      ResourceConfiguration configuration, RebalancerConfig rebalancerConfig) {
+    boolean status = true;
+    if (configuration != null) {
+      status =
+          _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+    }
+    // set an ideal state if the resource supports it
+    IdealState idealState =
+        PartitionedRebalancerConfig.rebalancerConfigToIdealState(rebalancerConfig,
+            configuration.getBucketSize(), configuration.getBatchMessageMode());
+    if (idealState != null) {
+      status =
+          status
+              && _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
+    }
+    return status;
+  }
+
+  /**
+   * Persist an existing resource's logical configuration
+   * @param resourceConfig logical resource configuration
+   * @return true if resource is set, false otherwise
+   */
+  public boolean setResource(ResourceConfig resourceConfig) {
+    if (resourceConfig == null || resourceConfig.getRebalancerConfig() == null) {
+      LOG.error("Resource not fully defined with a rebalancer context");
+      return false;
+    }
+    ResourceId resourceId = resourceConfig.getId();
+    ResourceConfiguration config = new ResourceConfiguration(resourceId);
+    UserConfig userConfig = resourceConfig.getUserConfig();
+    if (userConfig != null
+        && (!userConfig.getSimpleFields().isEmpty() || !userConfig.getListFields().isEmpty() || !userConfig
+            .getMapFields().isEmpty())) {
+      config.addNamespacedConfig(userConfig);
+    } else {
+      userConfig = null;
+    }
+    PartitionedRebalancerConfig partitionedConfig =
+        PartitionedRebalancerConfig.from(resourceConfig.getRebalancerConfig());
+    if (partitionedConfig == null
+        || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+      // only persist if this is not easily convertible to an ideal state
+      config.addNamespacedConfig(new RebalancerConfigHolder(resourceConfig.getRebalancerConfig())
+          .toNamespacedConfig());
+      config.setBucketSize(resourceConfig.getBucketSize());
+      config.setBatchMessageMode(resourceConfig.getBatchMessageMode());
+    } else if (userConfig == null) {
+      config = null;
+    }
+    if (resourceConfig.getProvisionerConfig() != null) {
+      config.addNamespacedConfig(new ProvisionerConfigHolder(resourceConfig.getProvisionerConfig())
+          .toNamespacedConfig());
+    }
+    config.setBucketSize(resourceConfig.getBucketSize());
+    config.setBatchMessageMode(resourceConfig.getBatchMessageMode());
+    setResourceConfiguration(resourceId, config, resourceConfig.getRebalancerConfig());
+    return true;
+  }
+
+  /**
+   * Create a resource snapshot instance from the physical model
+   * @param resourceId the resource id
+   * @param resourceConfiguration physical resource configuration
+   * @param idealState ideal state of the resource
+   * @param externalView external view of the resource
+   * @param resourceAssignment current resource assignment
+   * @return Resource
+   */
+  private static Resource createResource(ResourceId resourceId,
+      ResourceConfiguration resourceConfiguration, IdealState idealState,
+      ExternalView externalView, ResourceAssignment resourceAssignment) {
+    UserConfig userConfig;
+    ProvisionerConfig provisionerConfig = null;
+    RebalancerConfig rebalancerConfig = null;
+    ResourceType type = ResourceType.DATA;
+    if (resourceConfiguration != null) {
+      userConfig = resourceConfiguration.getUserConfig();
+      type = resourceConfiguration.getType();
+    } else {
+      userConfig = new UserConfig(Scope.resource(resourceId));
+    }
+    int bucketSize = 0;
+    boolean batchMessageMode = false;
+    if (idealState != null) {
+      if (resourceConfiguration != null
+          && idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
+        // prefer rebalancer config for user_defined data rebalancing
+        rebalancerConfig =
+            resourceConfiguration.getRebalancerConfig(PartitionedRebalancerConfig.class);
+      }
+      if (rebalancerConfig == null) {
+        // prefer ideal state for non-user_defined data rebalancing
+        rebalancerConfig = PartitionedRebalancerConfig.from(idealState);
+      }
+      bucketSize = idealState.getBucketSize();
+      batchMessageMode = idealState.getBatchMessageMode();
+      idealState.updateUserConfig(userConfig);
+    } else if (resourceConfiguration != null) {
+      bucketSize = resourceConfiguration.getBucketSize();
+      batchMessageMode = resourceConfiguration.getBatchMessageMode();
+      rebalancerConfig = resourceConfiguration.getRebalancerConfig(RebalancerConfig.class);
+    }
+    if (rebalancerConfig == null) {
+      rebalancerConfig = new PartitionedRebalancerConfig();
+    }
+    if (resourceConfiguration != null) {
+      provisionerConfig = resourceConfiguration.getProvisionerConfig(ProvisionerConfig.class);
+    }
+    return new Resource(resourceId, type, idealState, resourceAssignment, externalView,
+        rebalancerConfig, provisionerConfig, userConfig, bucketSize, batchMessageMode);
+  }
+
+  /**
    * Get the cluster ID this accessor is connected to
    * @return ClusterId
    */
@@ -791,4 +1041,65 @@ public class ClusterAccessor {
   protected HelixDataAccessor dataAccessor() {
     return _accessor;
   }
+
+  /**
+   * Create empty persistent properties to ensure that there is a valid participant structure
+   * @param participantId the identifier under which to initialize the structure
+   * @return true if the participant structure exists at the end of this call, false otherwise
+   */
+  private boolean initParticipantStructure(ParticipantId participantId) {
+    if (participantId == null) {
+      LOG.error("Participant ID cannot be null when clearing the participant in cluster "
+          + _clusterId + "!");
+      return false;
+    }
+    List<String> paths =
+        HelixUtil.getRequiredPathsForInstance(_clusterId.toString(), participantId.toString());
+    BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+    for (String path : paths) {
+      boolean status = baseAccessor.create(path, null, AccessOption.PERSISTENT);
+      if (!status) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(path + " already exists");
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Clear properties for the participant
+   * @param participantId the participant for which to clear
+   * @return true if all paths removed, false otherwise
+   */
+  private boolean clearParticipantStructure(ParticipantId participantId) {
+    List<String> paths =
+        HelixUtil.getRequiredPathsForInstance(_clusterId.toString(), participantId.toString());
+    BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+    boolean[] removeResults = baseAccessor.remove(paths, 0);
+    boolean result = true;
+    for (boolean removeResult : removeResults) {
+      result = result && removeResult;
+    }
+    return result;
+  }
+
+  /**
+   * check if participant structure is valid
+   * @return true if valid or false otherwise
+   */
+  private boolean isParticipantStructureValid(ParticipantId participantId) {
+    List<String> paths =
+        HelixUtil.getRequiredPathsForInstance(_clusterId.toString(), participantId.toString());
+    BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+    if (baseAccessor != null) {
+      boolean[] existsResults = baseAccessor.exists(paths, 0);
+      for (boolean exists : existsResults) {
+        if (!exists) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/api/accessor/ControllerAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ControllerAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ControllerAccessor.java
deleted file mode 100644
index 609e458..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ControllerAccessor.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package org.apache.helix.api.accessor;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.api.Controller;
-import org.apache.helix.api.id.ControllerId;
-import org.apache.helix.model.LiveInstance;
-
-public class ControllerAccessor {
-  private final HelixDataAccessor _accessor;
-  private final PropertyKey.Builder _keyBuilder;
-
-  public ControllerAccessor(HelixDataAccessor accessor) {
-    _accessor = accessor;
-    _keyBuilder = accessor.keyBuilder();
-  }
-
-  /**
-   * Read the leader controller if it is live
-   * @return Controller snapshot, or null
-   */
-  public Controller readLeader() {
-    LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
-    if (leader != null) {
-      ControllerId leaderId = ControllerId.from(leader.getId());
-      return new Controller(leaderId, leader, true);
-    }
-    return null;
-  }
-}


Mime
View raw message