Return-Path: X-Original-To: apmail-helix-commits-archive@minotaur.apache.org Delivered-To: apmail-helix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A4B5711802 for ; Wed, 23 Jul 2014 21:07:43 +0000 (UTC) Received: (qmail 85406 invoked by uid 500); 23 Jul 2014 21:07:43 -0000 Delivered-To: apmail-helix-commits-archive@helix.apache.org Received: (qmail 85324 invoked by uid 500); 23 Jul 2014 21:07:43 -0000 Mailing-List: contact commits-help@helix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@helix.apache.org Delivered-To: mailing list commits@helix.apache.org Received: (qmail 85216 invoked by uid 99); 23 Jul 2014 21:07:43 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Jul 2014 21:07:43 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 386369B1FF6; Wed, 23 Jul 2014 21:07:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kanak@apache.org To: commits@helix.apache.org Date: Wed, 23 Jul 2014 21:07:46 -0000 Message-Id: <0db82cfcc9b5425bb284a2464b0080d7@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [4/4] git commit: [HELIX-389] Unify accessor classes into a single class [HELIX-389] Unify accessor classes into a single class Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ce1e926c Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ce1e926c Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ce1e926c Branch: refs/heads/master Commit: ce1e926c9c485ac6e87a4c41a3cb5c35bd681e39 Parents: 410815d Author: Kanak Biscuitwala Authored: Tue Jul 22 12:05:05 2014 -0700 Committer: Kanak Biscuitwala Committed: Wed Jul 23 11:13:06 2014 -0700 ---------------------------------------------------------------------- .../java/org/apache/helix/HelixConnection.java | 16 - .../java/org/apache/helix/HelixManager.java | 8 - .../java/org/apache/helix/api/Resource.java | 25 +- .../api/accessor/AtomicClusterAccessor.java | 125 +- .../api/accessor/AtomicParticipantAccessor.java | 217 ---- .../api/accessor/AtomicResourceAccessor.java | 153 --- .../helix/api/accessor/ClusterAccessor.java | 537 ++++++-- .../helix/api/accessor/ControllerAccessor.java | 49 - .../helix/api/accessor/ParticipantAccessor.java | 809 ------------ .../helix/api/accessor/ResourceAccessor.java | 541 -------- .../apache/helix/api/config/ResourceConfig.java | 69 +- .../controller/rebalancer/CustomRebalancer.java | 25 +- .../rebalancer/FallbackRebalancer.java | 28 +- .../rebalancer/FullAutoRebalancer.java | 63 +- .../controller/rebalancer/HelixRebalancer.java | 7 +- .../rebalancer/SemiAutoRebalancer.java | 24 +- .../config/PartitionedRebalancerConfig.java | 65 + .../stages/BestPossibleStateCalcStage.java | 10 +- .../controller/stages/ClusterDataCache.java | 5 - .../stages/CurrentStateComputationStage.java | 9 +- .../stages/ExternalViewComputeStage.java | 2 +- .../stages/MessageGenerationStage.java | 2 +- .../stages/ResourceComputationStage.java | 6 +- .../helix/manager/zk/ZkHelixConnection.java | 12 - .../helix/manager/zk/ZkHelixController.java | 2 +- .../helix/manager/zk/ZkHelixParticipant.java | 12 +- .../java/org/apache/helix/model/IdealState.java | 3 + .../org/apache/helix/task/TaskRebalancer.java | 6 +- .../org/apache/helix/tools/ClusterSetup.java | 1 + .../org/apache/helix/tools/NewClusterSetup.java | 1168 ------------------ .../org/apache/helix/api/TestNewStages.java | 4 +- .../org/apache/helix/api/TestUpdateConfig.java | 13 +- .../api/accessor/TestAccessorRecreate.java | 5 +- .../stages/TestMessageThrottleStage.java | 15 +- .../stages/TestRebalancePipeline.java | 38 +- .../stages/TestResourceComputationStage.java | 6 - .../TestCustomizedIdealStateRebalancer.java | 18 +- .../helix/integration/TestHelixConnection.java | 8 +- .../integration/TestLocalContainerProvider.java | 7 +- .../mbeans/TestClusterStatusMonitor.java | 7 +- .../helix/examples/LogicalModelExample.java | 12 +- .../StatelessParticipantService.java | 8 +- .../provisioning/tools/ContainerAdmin.java | 19 +- .../tools/UpdateProvisionerConfig.java | 16 +- .../provisioning/yarn/AppMasterLauncher.java | 14 +- .../yarn/AppStatusReportGenerator.java | 6 +- .../yarn/example/JobRunnerMain.java | 4 +- .../apache/helix/filestore/ChangeLogReader.java | 1 + .../LockManagerRebalancer.java | 16 +- 49 files changed, 805 insertions(+), 3411 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/HelixConnection.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/HelixConnection.java b/helix-core/src/main/java/org/apache/helix/HelixConnection.java index c56b01a..ff5f458 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixConnection.java +++ b/helix-core/src/main/java/org/apache/helix/HelixConnection.java @@ -20,8 +20,6 @@ package org.apache.helix; */ import org.apache.helix.api.accessor.ClusterAccessor; -import org.apache.helix.api.accessor.ParticipantAccessor; -import org.apache.helix.api.accessor.ResourceAccessor; import org.apache.helix.api.id.ClusterId; import org.apache.helix.api.id.ControllerId; import org.apache.helix.api.id.ParticipantId; @@ -94,20 +92,6 @@ public interface HelixConnection { ClusterAccessor createClusterAccessor(ClusterId clusterId); /** - * create a resource accessor - * @param clusterId - * @return resource accessor - */ - ResourceAccessor createResourceAccessor(ClusterId clusterId); - - /** - * create a participant accessor - * @param clusterId - * @return participant-accessor - */ - ParticipantAccessor createParticipantAccessor(ClusterId clusterId); - - /** * Provides admin interface to setup and modify cluster * @return instantiated HelixAdmin */ http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/HelixManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java b/helix-core/src/main/java/org/apache/helix/HelixManager.java index 6901715..73313c0 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java @@ -33,11 +33,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore; * Class that represents the Helix Agent. * First class Object any process will interact with
* General flow
-<<<<<<< HEAD * -======= - * ->>>>>>> 77cc651... [HELIX-395] Remove old Helix alert/stat modules *
  * manager = HelixManagerFactory.getZKHelixManager(
  *    clusterName, instanceName, ROLE, zkAddr);
@@ -53,11 +49,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
  * FINALIZE -> will be invoked when listener is removed or session expires
  * manager.disconnect()
  * 
-<<<<<<< HEAD * -======= - * ->>>>>>> 77cc651... [HELIX-395] Remove old Helix alert/stat modules *
Default implementations available * @see HelixStateMachineEngine HelixStateMachineEngine for participant * @see RoutingTableProvider RoutingTableProvider for spectator http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/api/Resource.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java index 1153032..239748c 100644 --- a/helix-core/src/main/java/org/apache/helix/api/Resource.java +++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java @@ -42,7 +42,6 @@ import org.apache.helix.model.ResourceAssignment; */ public class Resource { private final ResourceConfig _config; - private final IdealState _idealState; private final ExternalView _externalView; private final ResourceAssignment _resourceAssignment; @@ -65,9 +64,8 @@ public class Resource { UserConfig userConfig, int bucketSize, boolean batchMessageMode) { SchedulerTaskConfig schedulerTaskConfig = schedulerTaskConfig(idealState); _config = - new ResourceConfig(id, type, schedulerTaskConfig, rebalancerConfig, provisionerConfig, - userConfig, bucketSize, batchMessageMode); - _idealState = idealState; + new ResourceConfig(id, type, idealState, schedulerTaskConfig, rebalancerConfig, + provisionerConfig, userConfig, bucketSize, batchMessageMode); _externalView = externalView; _resourceAssignment = resourceAssignment; } @@ -115,23 +113,6 @@ public class Resource { } /** - * Get the subunits of the resource - * @return map of subunit id to partition or empty map if none - */ - public Map getSubUnitMap() { - return _config.getSubUnitMap(); - } - - /** - * Get a subunit that the resource contains - * @param subUnitId the subunit id to look up - * @return Partition or null if none is present with the given id - */ - public Partition getSubUnit(PartitionId subUnitId) { - return _config.getSubUnit(subUnitId); - } - - /** * Get the set of subunit ids that the resource contains * @return subunit id set, or empty if none */ @@ -216,7 +197,7 @@ public class Resource { * @return IdealState instance */ public IdealState getIdealState() { - return _idealState; + return _config.getIdealState(); } /** http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java index 216b3ad..83fde95 100644 --- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java @@ -19,12 +19,7 @@ package org.apache.helix.api.accessor; * under the License. */ -import java.util.List; -import java.util.Map; -import java.util.Set; - import org.apache.helix.HelixDataAccessor; -import org.apache.helix.PropertyKey; import org.apache.helix.api.Cluster; import org.apache.helix.api.Participant; import org.apache.helix.api.Resource; @@ -39,9 +34,6 @@ import org.apache.helix.lock.HelixLock; import org.apache.helix.lock.HelixLockable; import org.apache.log4j.Logger; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - /** * An atomic version of the ClusterAccessor. If atomic operations are required, use instances of * this class. Atomicity is not guaranteed when using instances of ClusterAccessor alongside @@ -203,67 +195,102 @@ public class AtomicClusterAccessor extends ClusterAccessor { return null; } - /** - * Read resources atomically. This is resource-atomic, not cluster-atomic - */ @Override - public Map readResources() { - // read resources individually instead of together to maintain the equality link between ideal - // state and resource config + public Participant readParticipant(ParticipantId participantId) { ClusterId clusterId = clusterId(); - HelixDataAccessor dataAccessor = dataAccessor(); - PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder(); - Map resources = Maps.newHashMap(); - Set idealStateNames = - Sets.newHashSet(dataAccessor.getChildNames(keyBuilder.idealStates())); - Set resourceConfigNames = - Sets.newHashSet(dataAccessor.getChildNames(keyBuilder.resourceConfigs())); - resourceConfigNames.addAll(idealStateNames); - ResourceAccessor accessor = new AtomicResourceAccessor(clusterId, dataAccessor, _lockProvider); - for (String resourceName : resourceConfigNames) { - ResourceId resourceId = ResourceId.from(resourceName); - Resource resource = accessor.readResource(resourceId); - if (resource != null) { - resources.put(resourceId, resource); + HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId)); + boolean locked = lock.lock(); + if (locked) { + try { + return _clusterAccessor.readParticipant(participantId); + } finally { + lock.unlock(); } } - return resources; + return null; } - /** - * Read participants atomically. This is participant-atomic, not cluster-atomic - */ @Override - public Map readParticipants() { - // read participants individually to keep configs consistent with current state and messages + public boolean setParticipant(ParticipantConfig participantConfig) { + if (participantConfig == null) { + LOG.error("participant config cannot be null"); + return false; + } ClusterId clusterId = clusterId(); - HelixDataAccessor dataAccessor = dataAccessor(); - PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder(); - Map participants = Maps.newHashMap(); - ParticipantAccessor accessor = - new AtomicParticipantAccessor(clusterId, dataAccessor, _lockProvider); - List participantNames = dataAccessor.getChildNames(keyBuilder.instanceConfigs()); - for (String participantName : participantNames) { - ParticipantId participantId = ParticipantId.from(participantName); - Participant participant = accessor.readParticipant(participantId); - if (participant != null) { - participants.put(participantId, participant); + HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantConfig.getId())); + boolean locked = lock.lock(); + if (locked) { + try { + return _clusterAccessor.setParticipant(participantConfig); + } finally { + lock.unlock(); } } - return participants; + return false; } @Override - public void initClusterStructure() { + public ParticipantConfig updateParticipant(ParticipantId participantId, + ParticipantConfig.Delta participantDelta) { ClusterId clusterId = clusterId(); - HelixLock lock = _lockProvider.getLock(clusterId, Scope.cluster(clusterId)); + HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId)); + boolean locked = lock.lock(); + if (locked) { + try { + return _clusterAccessor.updateParticipant(participantId, participantDelta); + } finally { + lock.unlock(); + } + } + return null; + } + + @Override + public Resource readResource(ResourceId resourceId) { + ClusterId clusterId = clusterId(); + HelixLock lock = _lockProvider.getLock(clusterId, Scope.resource(resourceId)); + boolean locked = lock.lock(); + if (locked) { + try { + return _clusterAccessor.readResource(resourceId); + } finally { + lock.unlock(); + } + } + return null; + } + + @Override + public ResourceConfig updateResource(ResourceId resourceId, ResourceConfig.Delta resourceDelta) { + ClusterId clusterId = clusterId(); + HelixLock lock = _lockProvider.getLock(clusterId, Scope.resource(resourceId)); + boolean locked = lock.lock(); + if (locked) { + try { + return _clusterAccessor.updateResource(resourceId, resourceDelta); + } finally { + lock.unlock(); + } + } + return null; + } + + @Override + public boolean setResource(ResourceConfig resourceConfig) { + if (resourceConfig == null) { + LOG.error("resource config cannot be null"); + return false; + } + ClusterId clusterId = clusterId(); + HelixLock lock = _lockProvider.getLock(clusterId, Scope.resource(resourceConfig.getId())); boolean locked = lock.lock(); if (locked) { try { - _clusterAccessor.initClusterStructure(); + return _clusterAccessor.setResource(resourceConfig); } finally { lock.unlock(); } } + return false; } } http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java deleted file mode 100644 index 6b9b10e..0000000 --- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java +++ /dev/null @@ -1,217 +0,0 @@ -package org.apache.helix.api.accessor; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.Map; -import java.util.Set; - -import org.apache.helix.HelixDataAccessor; -import org.apache.helix.api.Participant; -import org.apache.helix.api.Scope; -import org.apache.helix.api.config.ParticipantConfig; -import org.apache.helix.api.id.ClusterId; -import org.apache.helix.api.id.MessageId; -import org.apache.helix.api.id.ParticipantId; -import org.apache.helix.lock.HelixLock; -import org.apache.helix.lock.HelixLockable; -import org.apache.helix.model.Message; -import org.apache.log4j.Logger; - -/** - * An atomic version of the ParticipantAccessor. If atomic operations are required, use instances of - * this class. Atomicity is not guaranteed when using instances of ParticipantAccessor alongside - * instances of this class. Furthermore, depending on the semantics of the lock, lock acquisition - * may fail, in which case users should handle the return value of each function if necessary.
- *
- * Using this class is quite expensive; it should thus be used sparingly and only in systems where - * contention on these operations is expected. For most systems running Helix, this is typically not - * the case. - */ -public class AtomicParticipantAccessor extends ParticipantAccessor { - private static final Logger LOG = Logger.getLogger(AtomicParticipantAccessor.class); - - private final HelixLockable _lockProvider; - - /** - * Non-atomic instance to protect against reentrant locking via polymorphism - */ - private final ParticipantAccessor _participantAccessor; - - /** - * Instantiate the accessor - * @param clusterId the cluster to access - * @param accessor a HelixDataAccessor for the physical properties - * @param lockProvider a lock provider - */ - public AtomicParticipantAccessor(ClusterId clusterId, HelixDataAccessor accessor, - HelixLockable lockProvider) { - super(clusterId, accessor); - _lockProvider = lockProvider; - _participantAccessor = new ParticipantAccessor(clusterId, accessor); - } - - @Override - boolean enableParticipant(ParticipantId participantId, boolean isEnabled) { - ClusterId clusterId = clusterId(); - HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId)); - boolean locked = lock.lock(); - if (locked) { - try { - return _participantAccessor.enableParticipant(participantId); - } finally { - lock.unlock(); - } - } - return false; - } - - @Override - public Participant readParticipant(ParticipantId participantId) { - ClusterId clusterId = clusterId(); - HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId)); - boolean locked = lock.lock(); - if (locked) { - try { - return _participantAccessor.readParticipant(participantId); - } finally { - lock.unlock(); - } - } - return null; - } - - @Override - public boolean setParticipant(ParticipantConfig participantConfig) { - if (participantConfig == null) { - LOG.error("participant config cannot be null"); - return false; - } - ClusterId clusterId = clusterId(); - HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantConfig.getId())); - boolean locked = lock.lock(); - if (locked) { - try { - return _participantAccessor.setParticipant(participantConfig); - } finally { - lock.unlock(); - } - } - return false; - } - - @Override - public ParticipantConfig updateParticipant(ParticipantId participantId, - ParticipantConfig.Delta participantDelta) { - ClusterId clusterId = clusterId(); - HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId)); - boolean locked = lock.lock(); - if (locked) { - try { - return _participantAccessor.updateParticipant(participantId, participantDelta); - } finally { - lock.unlock(); - } - } - return null; - } - - @Override - boolean dropParticipant(ParticipantId participantId) { - ClusterId clusterId = clusterId(); - HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId)); - boolean locked = lock.lock(); - if (locked) { - try { - return _participantAccessor.dropParticipant(participantId); - } finally { - lock.unlock(); - } - } - return false; - } - - @Override - public void insertMessagesToParticipant(ParticipantId participantId, - Map msgMap) { - ClusterId clusterId = clusterId(); - HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId)); - boolean locked = lock.lock(); - if (locked) { - try { - _participantAccessor.insertMessagesToParticipant(participantId, msgMap); - } finally { - lock.unlock(); - } - } - return; - } - - @Override - public void updateMessageStatus(ParticipantId participantId, Map msgMap) { - ClusterId clusterId = clusterId(); - HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId)); - boolean locked = lock.lock(); - if (locked) { - try { - _participantAccessor.updateMessageStatus(participantId, msgMap); - } finally { - lock.unlock(); - } - } - return; - } - - @Override - public void deleteMessagesFromParticipant(ParticipantId participantId, Set msgIdSet) { - ClusterId clusterId = clusterId(); - HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId)); - boolean locked = lock.lock(); - if (locked) { - try { - _participantAccessor.deleteMessagesFromParticipant(participantId, msgIdSet); - } finally { - lock.unlock(); - } - } - return; - } - - @Override - public boolean initParticipantStructure(ParticipantId participantId) { - ClusterId clusterId = clusterId(); - HelixLock lock = _lockProvider.getLock(clusterId, Scope.participant(participantId)); - boolean locked = lock.lock(); - if (locked) { - try { - return _participantAccessor.initParticipantStructure(participantId); - } finally { - lock.unlock(); - } - } - return false; - } - - @Override - protected ResourceAccessor resourceAccessor() { - ClusterId clusterId = clusterId(); - HelixDataAccessor accessor = dataAccessor(); - return new AtomicResourceAccessor(clusterId, accessor, _lockProvider); - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java deleted file mode 100644 index 65fda39..0000000 --- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java +++ /dev/null @@ -1,153 +0,0 @@ -package org.apache.helix.api.accessor; - -import org.apache.helix.HelixDataAccessor; -import org.apache.helix.api.Resource; -import org.apache.helix.api.Scope; -import org.apache.helix.api.config.ResourceConfig; -import org.apache.helix.api.id.ClusterId; -import org.apache.helix.api.id.ResourceId; -import org.apache.helix.controller.rebalancer.config.RebalancerConfig; -import org.apache.helix.lock.HelixLock; -import org.apache.helix.lock.HelixLockable; -import org.apache.log4j.Logger; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * An atomic version of the ResourceAccessor. If atomic operations are required, use instances of - * this class. Atomicity is not guaranteed when using instances of ResourceAccessor alongside - * instances of this class. Furthermore, depending on the semantics of the lock, lock acquisition - * may fail, in which case users should handle the return value of each function if necessary.
- *
- * Using this class is quite expensive; it should thus be used sparingly and only in systems where - * contention on these operations is expected. For most systems running Helix, this is typically not - * the case. - */ -public class AtomicResourceAccessor extends ResourceAccessor { - private static final Logger LOG = Logger.getLogger(AtomicResourceAccessor.class); - - private final HelixLockable _lockProvider; - - /** - * Non-atomic instance to protect against reentrant locking via polymorphism - */ - private final ResourceAccessor _resourceAccessor; - - /** - * Instantiate the accessor - * @param clusterId the cluster to access - * @param accessor a HelixDataAccessor for the physical properties - * @param lockProvider a lock provider - */ - public AtomicResourceAccessor(ClusterId clusterId, HelixDataAccessor accessor, - HelixLockable lockProvider) { - super(clusterId, accessor); - _lockProvider = lockProvider; - _resourceAccessor = new ResourceAccessor(clusterId, accessor); - } - - @Override - public Resource readResource(ResourceId resourceId) { - ClusterId clusterId = clusterId(); - HelixLock lock = _lockProvider.getLock(clusterId, Scope.resource(resourceId)); - boolean locked = lock.lock(); - if (locked) { - try { - return _resourceAccessor.readResource(resourceId); - } finally { - lock.unlock(); - } - } - return null; - } - - @Override - public ResourceConfig updateResource(ResourceId resourceId, ResourceConfig.Delta resourceDelta) { - ClusterId clusterId = clusterId(); - HelixLock lock = _lockProvider.getLock(clusterId, Scope.resource(resourceId)); - boolean locked = lock.lock(); - if (locked) { - try { - return _resourceAccessor.updateResource(resourceId, resourceDelta); - } finally { - lock.unlock(); - } - } - return null; - } - - @Override - public boolean setRebalancerConfig(ResourceId resourceId, RebalancerConfig config) { - ClusterId clusterId = clusterId(); - HelixLock lock = _lockProvider.getLock(clusterId, Scope.resource(resourceId)); - boolean locked = lock.lock(); - if (locked) { - try { - return _resourceAccessor.setRebalancerConfig(resourceId, config); - } finally { - lock.unlock(); - } - } - return false; - } - - @Override - public boolean setResource(ResourceConfig resourceConfig) { - if (resourceConfig == null) { - LOG.error("resource config cannot be null"); - return false; - } - ClusterId clusterId = clusterId(); - HelixLock lock = _lockProvider.getLock(clusterId, Scope.resource(resourceConfig.getId())); - boolean locked = lock.lock(); - if (locked) { - try { - return _resourceAccessor.setResource(resourceConfig); - } finally { - lock.unlock(); - } - } - return false; - } - - @Override - public boolean generateDefaultAssignment(ResourceId resourceId, int replicaCount, - String participantGroupTag) { - ClusterId clusterId = clusterId(); - HelixLock lock = _lockProvider.getLock(clusterId, Scope.cluster(clusterId)); - boolean locked = lock.lock(); - if (locked) { - try { - return _resourceAccessor.generateDefaultAssignment(resourceId, replicaCount, - participantGroupTag); - } finally { - lock.unlock(); - } - } - return false; - } - - @Override - protected ParticipantAccessor participantAccessor() { - ClusterId clusterId = clusterId(); - HelixDataAccessor accessor = dataAccessor(); - return new AtomicParticipantAccessor(clusterId, accessor, _lockProvider); - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java index 92fb636..21d40b1 100644 --- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java @@ -22,6 +22,7 @@ package org.apache.helix.api.accessor; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -34,15 +35,18 @@ import org.apache.helix.api.Cluster; import org.apache.helix.api.Controller; import org.apache.helix.api.Participant; import org.apache.helix.api.Resource; +import org.apache.helix.api.RunningInstance; import org.apache.helix.api.Scope; import org.apache.helix.api.config.ClusterConfig; +import org.apache.helix.api.config.ContainerConfig; import org.apache.helix.api.config.ParticipantConfig; import org.apache.helix.api.config.ResourceConfig; +import org.apache.helix.api.config.ResourceConfig.ResourceType; import org.apache.helix.api.config.UserConfig; import org.apache.helix.api.id.ClusterId; -import org.apache.helix.api.id.ConstraintId; import org.apache.helix.api.id.ContextId; import org.apache.helix.api.id.ControllerId; +import org.apache.helix.api.id.MessageId; import org.apache.helix.api.id.ParticipantId; import org.apache.helix.api.id.PartitionId; import org.apache.helix.api.id.ResourceId; @@ -50,6 +54,9 @@ import org.apache.helix.api.id.SessionId; import org.apache.helix.api.id.StateModelDefId; import org.apache.helix.controller.context.ControllerContext; import org.apache.helix.controller.context.ControllerContextHolder; +import org.apache.helix.controller.provisioner.ContainerId; +import org.apache.helix.controller.provisioner.ContainerSpec; +import org.apache.helix.controller.provisioner.ContainerState; import org.apache.helix.controller.provisioner.ProvisionerConfig; import org.apache.helix.controller.rebalancer.RebalancerRef; import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig; @@ -275,14 +282,6 @@ public class ClusterAccessor { /** * Get all the state model definitions for this cluster - * @return map of state model def id to state model definition - */ - public Map readStateModelDefinitions() { - return readStateModelDefinitions(false); - } - - /** - * Get all the state model definitions for this cluster * @param useCache Use the ClusterDataCache associated with this class rather than reading again * @return map of state model def id to state model definition */ @@ -302,14 +301,6 @@ public class ClusterAccessor { /** * Read all resources in the cluster - * @return map of resource id to resource - */ - public Map readResources() { - return readResources(false); - } - - /** - * Read all resources in the cluster * @param useCache Use the ClusterDataCache associated with this class rather than reading again * @return map of resource id to resource */ @@ -375,9 +366,11 @@ public class ClusterAccessor { Map resourceMap = Maps.newHashMap(); for (String resourceName : allResources) { ResourceId resourceId = ResourceId.from(resourceName); - resourceMap.put(resourceId, ResourceAccessor.createResource(resourceId, - resourceConfigMap.get(resourceName), idealStateMap.get(resourceName), - externalViewMap.get(resourceName), resourceAssignmentMap.get(resourceName))); + resourceMap.put( + resourceId, + createResource(resourceId, resourceConfigMap.get(resourceName), + idealStateMap.get(resourceName), externalViewMap.get(resourceName), + resourceAssignmentMap.get(resourceName))); } return resourceMap; @@ -385,14 +378,6 @@ public class ClusterAccessor { /** * Read all participants in the cluster - * @return map of participant id to participant, or empty map - */ - public Map readParticipants() { - return readParticipants(false); - } - - /** - * Read all participants in the cluster * @param useCache Use the ClusterDataCache associated with this class rather than reading again * @return map of participant id to participant, or empty map */ @@ -453,75 +438,16 @@ public class ClusterAccessor { ParticipantId participantId = ParticipantId.from(participantName); - participantMap.put(participantId, ParticipantAccessor.createParticipant(participantId, - instanceConfig, userConfig, liveInstance, instanceMsgMap, - currentStateMap.get(participantName))); + participantMap.put( + participantId, + createParticipant(participantId, instanceConfig, userConfig, liveInstance, + instanceMsgMap, currentStateMap.get(participantName))); } return participantMap; } /** - * Get cluster constraints of a given type - * @param type ConstraintType value - * @return ClusterConstraints, or null if none present - */ - public ClusterConstraints readConstraints(ConstraintType type) { - return _accessor.getProperty(_keyBuilder.constraint(type.toString())); - } - - /** - * Remove a constraint from the cluster - * @param type the constraint type - * @param constraintId the constraint id - * @return true if removed, false otherwise - */ - public boolean removeConstraint(ConstraintType type, ConstraintId constraintId) { - ClusterConstraints constraints = _accessor.getProperty(_keyBuilder.constraint(type.toString())); - if (constraints == null || constraints.getConstraintItem(constraintId) == null) { - LOG.error("Constraint with id " + constraintId + " not present"); - return false; - } - constraints.removeConstraintItem(constraintId); - return _accessor.setProperty(_keyBuilder.constraint(type.toString()), constraints); - } - - /** - * Read the user config of the cluster - * @return UserConfig, or null - */ - public UserConfig readUserConfig() { - ClusterConfiguration clusterConfig = _accessor.getProperty(_keyBuilder.clusterConfig()); - return clusterConfig != null ? clusterConfig.getUserConfig() : null; - } - - /** - * Set the user config of the cluster, overwriting existing user configs - * @param userConfig the new user config - * @return true if the user config was set, false otherwise - */ - public boolean setUserConfig(UserConfig userConfig) { - ClusterConfig.Delta delta = new ClusterConfig.Delta(_clusterId).setUserConfig(userConfig); - return updateCluster(delta) != null; - } - - /** - * Clear any user-specified configuration from the cluster - * @return true if the config was cleared, false otherwise - */ - public boolean dropUserConfig() { - return setUserConfig(new UserConfig(Scope.cluster(_clusterId))); - } - - /** - * Read the persisted controller contexts - * @return map of context id to controller context - */ - public Map readControllerContext() { - return readControllerContext(false); - } - - /** * Read the persisted controller contexts * @param useCache Use the ClusterDataCache associated with this class rather than reading again * @return map of context id to controller context @@ -541,18 +467,6 @@ public class ClusterAccessor { } /** - * Add user configuration to the existing cluster user configuration. Overwrites properties with - * the same key - * @param userConfig the user config key-value pairs to add - * @return true if the user config was updated, false otherwise - */ - public boolean updateUserConfig(UserConfig userConfig) { - ClusterConfiguration clusterConfig = new ClusterConfiguration(_clusterId); - clusterConfig.addNamespacedConfig(userConfig); - return _accessor.updateProperty(_keyBuilder.clusterConfig(), clusterConfig); - } - - /** * pause controller of cluster * @return true if cluster was paused, false if pause failed or already paused */ @@ -604,7 +518,7 @@ public class ClusterAccessor { // Create an IdealState from a RebalancerConfig (if the resource supports it) IdealState idealState = - ResourceAccessor.rebalancerConfigToIdealState(resource.getRebalancerConfig(), + PartitionedRebalancerConfig.rebalancerConfigToIdealState(resource.getRebalancerConfig(), resource.getBucketSize(), resource.getBatchMessageMode()); if (idealState != null) { _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState); @@ -653,14 +567,13 @@ public class ClusterAccessor { * check if cluster structure is valid * @return true if valid or false otherwise */ - public boolean isClusterStructureValid() { + protected boolean isClusterStructureValid() { List paths = HelixUtil.getRequiredPathsForCluster(_clusterId.toString()); BaseDataAccessor baseAccessor = _accessor.getBaseDataAccessor(); if (baseAccessor != null) { boolean[] existsResults = baseAccessor.exists(paths, 0); int ind = 0; for (boolean exists : existsResults) { - if (!exists) { LOG.warn("Path does not exist:" + paths.get(ind)); return false; @@ -674,7 +587,7 @@ public class ClusterAccessor { /** * Create empty persistent properties to ensure that there is a valid cluster structure */ - public void initClusterStructure() { + private void initClusterStructure() { BaseDataAccessor baseAccessor = _accessor.getBaseDataAccessor(); List paths = HelixUtil.getRequiredPathsForCluster(_clusterId.toString()); for (String path : paths) { @@ -709,19 +622,18 @@ public class ClusterAccessor { return false; } - ParticipantAccessor participantAccessor = new ParticipantAccessor(_clusterId, _accessor); ParticipantId participantId = participant.getId(); InstanceConfig existConfig = _accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())); - if (existConfig != null && participantAccessor.isParticipantStructureValid(participantId)) { + if (existConfig != null && isParticipantStructureValid(participantId)) { LOG.error("Config for participant: " + participantId + " already exists in cluster: " + _clusterId); return false; } // clear and rebuild the participant structure - participantAccessor.clearParticipantStructure(participantId); - participantAccessor.initParticipantStructure(participantId); + clearParticipantStructure(participantId); + initParticipantStructure(participantId); // add the config InstanceConfig instanceConfig = new InstanceConfig(participant.getId()); @@ -748,8 +660,20 @@ public class ClusterAccessor { * @return true if participant dropped, false if there was an error */ public boolean dropParticipantFromCluster(ParticipantId participantId) { - ParticipantAccessor accessor = new ParticipantAccessor(_clusterId, _accessor); - return accessor.dropParticipant(participantId); + if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) == null) { + LOG.error("Config for participant: " + participantId + " does NOT exist in cluster"); + } + + if (_accessor.getProperty(_keyBuilder.instance(participantId.stringify())) == null) { + LOG.error("Participant: " + participantId + " structure does NOT exist in cluster"); + } + + // delete participant config path + _accessor.removeProperty(_keyBuilder.instanceConfig(participantId.stringify())); + + // delete participant path + _accessor.removeProperty(_keyBuilder.instance(participantId.stringify())); + return true; } /** @@ -777,6 +701,332 @@ public class ClusterAccessor { } /** + * Read the leader controller if it is live + * @return Controller snapshot, or null + */ + public Controller readLeader() { + LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader()); + if (leader != null) { + ControllerId leaderId = ControllerId.from(leader.getId()); + return new Controller(leaderId, leader, true); + } + return null; + } + + /** + * Update a participant configuration + * @param participantId the participant to update + * @param participantDelta changes to the participant + * @return ParticipantConfig, or null if participant is not persisted + */ + public ParticipantConfig updateParticipant(ParticipantId participantId, + ParticipantConfig.Delta participantDelta) { + Participant participant = readParticipant(participantId); + if (participant == null) { + LOG.error("Participant " + participantId + " does not exist, cannot be updated"); + return null; + } + ParticipantConfig config = participantDelta.mergeInto(participant.getConfig()); + setParticipant(config); + return config; + } + + /** + * Set the configuration of an existing participant + * @param participantConfig participant configuration + * @return true if config was set, false if there was an error + */ + public boolean setParticipant(ParticipantConfig participantConfig) { + if (participantConfig == null) { + LOG.error("Participant config not initialized"); + return false; + } + InstanceConfig instanceConfig = new InstanceConfig(participantConfig.getId()); + instanceConfig.setHostName(participantConfig.getHostName()); + instanceConfig.setPort(Integer.toString(participantConfig.getPort())); + for (String tag : participantConfig.getTags()) { + instanceConfig.addTag(tag); + } + for (PartitionId partitionId : participantConfig.getDisabledPartitions()) { + instanceConfig.setParticipantEnabledForPartition(partitionId, false); + } + instanceConfig.setInstanceEnabled(participantConfig.isEnabled()); + instanceConfig.addNamespacedConfig(participantConfig.getUserConfig()); + _accessor.setProperty(_keyBuilder.instanceConfig(participantConfig.getId().stringify()), + instanceConfig); + return true; + } + + /** + * create a participant based on physical model + * @param participantId + * @param instanceConfig + * @param userConfig + * @param liveInstance + * @param instanceMsgMap map of message-id to message + * @param instanceCurStateMap map of resource-id to current-state + * @return participant + */ + private static Participant createParticipant(ParticipantId participantId, + InstanceConfig instanceConfig, UserConfig userConfig, LiveInstance liveInstance, + Map instanceMsgMap, Map instanceCurStateMap) { + + String hostName = instanceConfig.getHostName(); + + int port = -1; + try { + port = Integer.parseInt(instanceConfig.getPort()); + } catch (IllegalArgumentException e) { + // keep as -1 + } + if (port < 0 || port > 65535) { + port = -1; + } + boolean isEnabled = instanceConfig.getInstanceEnabled(); + + List disabledPartitions = instanceConfig.getDisabledPartitions(); + Set disabledPartitionIdSet = Collections.emptySet(); + if (disabledPartitions != null) { + disabledPartitionIdSet = new HashSet(); + for (String partitionId : disabledPartitions) { + disabledPartitionIdSet.add(PartitionId.from(PartitionId.extractResourceId(partitionId), + PartitionId.stripResourceId(partitionId))); + } + } + + Set tags = new HashSet(instanceConfig.getTags()); + + RunningInstance runningInstance = null; + if (liveInstance != null) { + runningInstance = + new RunningInstance(liveInstance.getTypedSessionId(), + liveInstance.getTypedHelixVersion(), liveInstance.getProcessId()); + } + + Map msgMap = new HashMap(); + if (instanceMsgMap != null) { + for (String msgId : instanceMsgMap.keySet()) { + Message message = instanceMsgMap.get(msgId); + msgMap.put(MessageId.from(msgId), message); + } + } + + Map curStateMap = new HashMap(); + if (instanceCurStateMap != null) { + + for (String resourceName : instanceCurStateMap.keySet()) { + curStateMap.put(ResourceId.from(resourceName), instanceCurStateMap.get(resourceName)); + } + } + + // set up the container config if it exists + ContainerConfig containerConfig = null; + ContainerSpec containerSpec = instanceConfig.getContainerSpec(); + ContainerState containerState = instanceConfig.getContainerState(); + ContainerId containerId = instanceConfig.getContainerId(); + if (containerSpec != null || containerState != null || containerId != null) { + containerConfig = new ContainerConfig(containerId, containerSpec, containerState); + } + + return new Participant(participantId, hostName, port, isEnabled, disabledPartitionIdSet, tags, + runningInstance, curStateMap, msgMap, userConfig, containerConfig); + } + + /** + * read participant related data + * @param participantId + * @return participant, or null if participant not available + */ + public Participant readParticipant(ParticipantId participantId) { + // read physical model + String participantName = participantId.stringify(); + InstanceConfig instanceConfig = + _accessor.getProperty(_keyBuilder.instanceConfig(participantName)); + + if (instanceConfig == null) { + LOG.error("Participant " + participantId + " is not present on the cluster"); + return null; + } + + UserConfig userConfig = instanceConfig.getUserConfig(); + LiveInstance liveInstance = _accessor.getProperty(_keyBuilder.liveInstance(participantName)); + + Map instanceMsgMap = Collections.emptyMap(); + Map instanceCurStateMap = Collections.emptyMap(); + if (liveInstance != null) { + SessionId sessionId = liveInstance.getTypedSessionId(); + + instanceMsgMap = _accessor.getChildValuesMap(_keyBuilder.messages(participantName)); + instanceCurStateMap = + _accessor.getChildValuesMap(_keyBuilder.currentStates(participantName, + sessionId.stringify())); + } + + return createParticipant(participantId, instanceConfig, userConfig, liveInstance, + instanceMsgMap, instanceCurStateMap); + } + + /** + * Read a single snapshot of a resource + * @param resourceId the resource id to read + * @return Resource or null if not present + */ + public Resource readResource(ResourceId resourceId) { + ResourceConfiguration config = + _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify())); + IdealState idealState = _accessor.getProperty(_keyBuilder.idealStates(resourceId.stringify())); + + if (config == null && idealState == null) { + LOG.error("Resource " + resourceId + " not present on the cluster"); + return null; + } + + ExternalView externalView = + _accessor.getProperty(_keyBuilder.externalView(resourceId.stringify())); + ResourceAssignment resourceAssignment = + _accessor.getProperty(_keyBuilder.resourceAssignment(resourceId.stringify())); + return createResource(resourceId, config, idealState, externalView, resourceAssignment); + } + + /** + * Update a resource configuration + * @param resourceId the resource id to update + * @param resourceDelta changes to the resource + * @return ResourceConfig, or null if the resource is not persisted + */ + public ResourceConfig updateResource(ResourceId resourceId, ResourceConfig.Delta resourceDelta) { + Resource resource = readResource(resourceId); + if (resource == null) { + LOG.error("Resource " + resourceId + " does not exist, cannot be updated"); + return null; + } + ResourceConfig config = resourceDelta.mergeInto(resource.getConfig()); + setResource(config); + return config; + } + + /** + * Set a physical resource configuration, which may include user-defined configuration, as well as + * rebalancer configuration + * @param resourceId + * @param configuration + * @return true if set, false otherwise + */ + private boolean setResourceConfiguration(ResourceId resourceId, + ResourceConfiguration configuration, RebalancerConfig rebalancerConfig) { + boolean status = true; + if (configuration != null) { + status = + _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration); + } + // set an ideal state if the resource supports it + IdealState idealState = + PartitionedRebalancerConfig.rebalancerConfigToIdealState(rebalancerConfig, + configuration.getBucketSize(), configuration.getBatchMessageMode()); + if (idealState != null) { + status = + status + && _accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState); + } + return status; + } + + /** + * Persist an existing resource's logical configuration + * @param resourceConfig logical resource configuration + * @return true if resource is set, false otherwise + */ + public boolean setResource(ResourceConfig resourceConfig) { + if (resourceConfig == null || resourceConfig.getRebalancerConfig() == null) { + LOG.error("Resource not fully defined with a rebalancer context"); + return false; + } + ResourceId resourceId = resourceConfig.getId(); + ResourceConfiguration config = new ResourceConfiguration(resourceId); + UserConfig userConfig = resourceConfig.getUserConfig(); + if (userConfig != null + && (!userConfig.getSimpleFields().isEmpty() || !userConfig.getListFields().isEmpty() || !userConfig + .getMapFields().isEmpty())) { + config.addNamespacedConfig(userConfig); + } else { + userConfig = null; + } + PartitionedRebalancerConfig partitionedConfig = + PartitionedRebalancerConfig.from(resourceConfig.getRebalancerConfig()); + if (partitionedConfig == null + || partitionedConfig.getRebalanceMode() == RebalanceMode.USER_DEFINED) { + // only persist if this is not easily convertible to an ideal state + config.addNamespacedConfig(new RebalancerConfigHolder(resourceConfig.getRebalancerConfig()) + .toNamespacedConfig()); + config.setBucketSize(resourceConfig.getBucketSize()); + config.setBatchMessageMode(resourceConfig.getBatchMessageMode()); + } else if (userConfig == null) { + config = null; + } + if (resourceConfig.getProvisionerConfig() != null) { + config.addNamespacedConfig(new ProvisionerConfigHolder(resourceConfig.getProvisionerConfig()) + .toNamespacedConfig()); + } + config.setBucketSize(resourceConfig.getBucketSize()); + config.setBatchMessageMode(resourceConfig.getBatchMessageMode()); + setResourceConfiguration(resourceId, config, resourceConfig.getRebalancerConfig()); + return true; + } + + /** + * Create a resource snapshot instance from the physical model + * @param resourceId the resource id + * @param resourceConfiguration physical resource configuration + * @param idealState ideal state of the resource + * @param externalView external view of the resource + * @param resourceAssignment current resource assignment + * @return Resource + */ + private static Resource createResource(ResourceId resourceId, + ResourceConfiguration resourceConfiguration, IdealState idealState, + ExternalView externalView, ResourceAssignment resourceAssignment) { + UserConfig userConfig; + ProvisionerConfig provisionerConfig = null; + RebalancerConfig rebalancerConfig = null; + ResourceType type = ResourceType.DATA; + if (resourceConfiguration != null) { + userConfig = resourceConfiguration.getUserConfig(); + type = resourceConfiguration.getType(); + } else { + userConfig = new UserConfig(Scope.resource(resourceId)); + } + int bucketSize = 0; + boolean batchMessageMode = false; + if (idealState != null) { + if (resourceConfiguration != null + && idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED) { + // prefer rebalancer config for user_defined data rebalancing + rebalancerConfig = + resourceConfiguration.getRebalancerConfig(PartitionedRebalancerConfig.class); + } + if (rebalancerConfig == null) { + // prefer ideal state for non-user_defined data rebalancing + rebalancerConfig = PartitionedRebalancerConfig.from(idealState); + } + bucketSize = idealState.getBucketSize(); + batchMessageMode = idealState.getBatchMessageMode(); + idealState.updateUserConfig(userConfig); + } else if (resourceConfiguration != null) { + bucketSize = resourceConfiguration.getBucketSize(); + batchMessageMode = resourceConfiguration.getBatchMessageMode(); + rebalancerConfig = resourceConfiguration.getRebalancerConfig(RebalancerConfig.class); + } + if (rebalancerConfig == null) { + rebalancerConfig = new PartitionedRebalancerConfig(); + } + if (resourceConfiguration != null) { + provisionerConfig = resourceConfiguration.getProvisionerConfig(ProvisionerConfig.class); + } + return new Resource(resourceId, type, idealState, resourceAssignment, externalView, + rebalancerConfig, provisionerConfig, userConfig, bucketSize, batchMessageMode); + } + + /** * Get the cluster ID this accessor is connected to * @return ClusterId */ @@ -791,4 +1041,65 @@ public class ClusterAccessor { protected HelixDataAccessor dataAccessor() { return _accessor; } + + /** + * Create empty persistent properties to ensure that there is a valid participant structure + * @param participantId the identifier under which to initialize the structure + * @return true if the participant structure exists at the end of this call, false otherwise + */ + private boolean initParticipantStructure(ParticipantId participantId) { + if (participantId == null) { + LOG.error("Participant ID cannot be null when clearing the participant in cluster " + + _clusterId + "!"); + return false; + } + List paths = + HelixUtil.getRequiredPathsForInstance(_clusterId.toString(), participantId.toString()); + BaseDataAccessor baseAccessor = _accessor.getBaseDataAccessor(); + for (String path : paths) { + boolean status = baseAccessor.create(path, null, AccessOption.PERSISTENT); + if (!status) { + if (LOG.isDebugEnabled()) { + LOG.debug(path + " already exists"); + } + } + } + return true; + } + + /** + * Clear properties for the participant + * @param participantId the participant for which to clear + * @return true if all paths removed, false otherwise + */ + private boolean clearParticipantStructure(ParticipantId participantId) { + List paths = + HelixUtil.getRequiredPathsForInstance(_clusterId.toString(), participantId.toString()); + BaseDataAccessor baseAccessor = _accessor.getBaseDataAccessor(); + boolean[] removeResults = baseAccessor.remove(paths, 0); + boolean result = true; + for (boolean removeResult : removeResults) { + result = result && removeResult; + } + return result; + } + + /** + * check if participant structure is valid + * @return true if valid or false otherwise + */ + private boolean isParticipantStructureValid(ParticipantId participantId) { + List paths = + HelixUtil.getRequiredPathsForInstance(_clusterId.toString(), participantId.toString()); + BaseDataAccessor baseAccessor = _accessor.getBaseDataAccessor(); + if (baseAccessor != null) { + boolean[] existsResults = baseAccessor.exists(paths, 0); + for (boolean exists : existsResults) { + if (!exists) { + return false; + } + } + } + return true; + } } http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/api/accessor/ControllerAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ControllerAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ControllerAccessor.java deleted file mode 100644 index 609e458..0000000 --- a/helix-core/src/main/java/org/apache/helix/api/accessor/ControllerAccessor.java +++ /dev/null @@ -1,49 +0,0 @@ -package org.apache.helix.api.accessor; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import org.apache.helix.HelixDataAccessor; -import org.apache.helix.PropertyKey; -import org.apache.helix.api.Controller; -import org.apache.helix.api.id.ControllerId; -import org.apache.helix.model.LiveInstance; - -public class ControllerAccessor { - private final HelixDataAccessor _accessor; - private final PropertyKey.Builder _keyBuilder; - - public ControllerAccessor(HelixDataAccessor accessor) { - _accessor = accessor; - _keyBuilder = accessor.keyBuilder(); - } - - /** - * Read the leader controller if it is live - * @return Controller snapshot, or null - */ - public Controller readLeader() { - LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader()); - if (leader != null) { - ControllerId leaderId = ControllerId.from(leader.getId()); - return new Controller(leaderId, leader, true); - } - return null; - } -}