Return-Path: X-Original-To: apmail-helix-commits-archive@minotaur.apache.org Delivered-To: apmail-helix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id ABDE211803 for ; Wed, 23 Jul 2014 21:07:43 +0000 (UTC) Received: (qmail 85312 invoked by uid 500); 23 Jul 2014 21:07:43 -0000 Delivered-To: apmail-helix-commits-archive@helix.apache.org Received: (qmail 85229 invoked by uid 500); 23 Jul 2014 21:07:43 -0000 Mailing-List: contact commits-help@helix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@helix.apache.org Delivered-To: mailing list commits@helix.apache.org Received: (qmail 85202 invoked by uid 99); 23 Jul 2014 21:07:43 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Jul 2014 21:07:43 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 2A4D99B1FF2; Wed, 23 Jul 2014 21:07:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kanak@apache.org To: commits@helix.apache.org Date: Wed, 23 Jul 2014 21:07:44 -0000 Message-Id: <96d9315c6e404b3f86fbb3f6292586ff@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/4] [HELIX-389] Unify accessor classes into a single class http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java index dd661d9..934a9c2 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/config/PartitionedRebalancerConfig.java @@ -2,11 +2,13 @@ package org.apache.helix.controller.rebalancer.config; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Set; import org.apache.helix.HelixConstants.StateModelToken; import org.apache.helix.api.Partition; +import org.apache.helix.api.State; import org.apache.helix.api.id.ParticipantId; import org.apache.helix.api.id.PartitionId; import org.apache.helix.api.id.ResourceId; @@ -313,6 +315,69 @@ public class PartitionedRebalancerConfig extends BasicRebalancerConfig implement } /** + * Get an ideal state from a rebalancer config if the resource is partitioned + * @param config RebalancerConfig instance + * @param bucketSize bucket size to use + * @param batchMessageMode true if batch messaging allowed, false otherwise + * @return IdealState, or null + */ + public static IdealState rebalancerConfigToIdealState(RebalancerConfig config, int bucketSize, + boolean batchMessageMode) { + PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config); + if (partitionedConfig != null) { + if (!PartitionedRebalancerConfig.isBuiltinConfig(partitionedConfig.getClass())) { + // don't proceed if this resource cannot be described by an ideal state + return null; + } + IdealState idealState = new IdealState(partitionedConfig.getResourceId()); + idealState.setRebalanceMode(partitionedConfig.getRebalanceMode()); + + RebalancerRef ref = partitionedConfig.getRebalancerRef(); + if (ref != null) { + idealState.setRebalancerRef(partitionedConfig.getRebalancerRef()); + } + String replicas = null; + if (partitionedConfig.anyLiveParticipant()) { + replicas = StateModelToken.ANY_LIVEINSTANCE.toString(); + } else { + replicas = Integer.toString(partitionedConfig.getReplicaCount()); + } + idealState.setReplicas(replicas); + idealState.setNumPartitions(partitionedConfig.getPartitionSet().size()); + idealState.setInstanceGroupTag(partitionedConfig.getParticipantGroupTag()); + idealState.setMaxPartitionsPerInstance(partitionedConfig.getMaxPartitionsPerParticipant()); + idealState.setStateModelDefId(partitionedConfig.getStateModelDefId()); + idealState.setStateModelFactoryId(partitionedConfig.getStateModelFactoryId()); + idealState.setBucketSize(bucketSize); + idealState.setBatchMessageMode(batchMessageMode); + idealState.setRebalancerConfigClass(config.getClass()); + if (SemiAutoRebalancerConfig.class.equals(config.getClass())) { + SemiAutoRebalancerConfig semiAutoConfig = + BasicRebalancerConfig.convert(config, SemiAutoRebalancerConfig.class); + for (PartitionId partitionId : semiAutoConfig.getPartitionSet()) { + idealState.setPreferenceList(partitionId, semiAutoConfig.getPreferenceList(partitionId)); + } + } else if (CustomRebalancerConfig.class.equals(config.getClass())) { + CustomRebalancerConfig customConfig = + BasicRebalancerConfig.convert(config, CustomRebalancerConfig.class); + for (PartitionId partitionId : customConfig.getPartitionSet()) { + idealState + .setParticipantStateMap(partitionId, customConfig.getPreferenceMap(partitionId)); + } + } else { + for (PartitionId partitionId : partitionedConfig.getPartitionSet()) { + List preferenceList = Collections.emptyList(); + idealState.setPreferenceList(partitionId, preferenceList); + Map participantStateMap = Collections.emptyMap(); + idealState.setParticipantStateMap(partitionId, participantStateMap); + } + } + return idealState; + } + return null; + } + + /** * Builder for a basic data rebalancer config */ public static final class Builder extends AbstractBuilder { http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java index 364c370..6f34953 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java @@ -40,6 +40,7 @@ import org.apache.helix.controller.rebalancer.HelixRebalancer; import org.apache.helix.controller.rebalancer.RebalancerRef; import org.apache.helix.controller.rebalancer.config.RebalancerConfig; import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment; +import org.apache.helix.model.IdealState; import org.apache.helix.model.ResourceAssignment; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; @@ -221,14 +222,19 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { _rebalancerMap.put(resourceId, rebalancer); } ResourceAssignment currentAssignment = null; + IdealState idealState; Resource resourceSnapshot = cluster.getResource(resourceId); if (resourceSnapshot != null) { currentAssignment = resourceSnapshot.getResourceAssignment(); + idealState = resourceSnapshot.getIdealState(); + } else { + idealState = new IdealState(resourceId); } try { + resourceAssignment = - rebalancer.computeResourceMapping(rebalancerConfig, currentAssignment, cluster, - currentStateOutput); + rebalancer.computeResourceMapping(idealState, rebalancerConfig, currentAssignment, + cluster, currentStateOutput); } catch (Exception e) { LOG.error("Rebalancer for resource " + resourceId + " failed.", e); } http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index 8bcfaae..877baf2 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -338,13 +338,8 @@ public class ClusterDataCache { } /** -<<<<<<< HEAD - * Get all state model definitions - * @return map of name to state model definition -======= * Provides all state model definitions * @return state model definition map ->>>>>>> 8d5c27c... [HELIX-444] add per-participant partition count gauges to helix, rb=21419 */ public Map getStateModelDefMap() { return _stateModelDefMap; http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java index 64bf792..4cc1b9f 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java @@ -24,7 +24,6 @@ import java.util.Map; import org.apache.helix.api.Cluster; import org.apache.helix.api.Participant; -import org.apache.helix.api.Partition; import org.apache.helix.api.State; import org.apache.helix.api.config.ResourceConfig; import org.apache.helix.api.id.MessageId; @@ -36,6 +35,7 @@ import org.apache.helix.api.id.StateModelDefId; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.model.CurrentState; +import org.apache.helix.model.IdealState; import org.apache.helix.model.Message; import org.apache.helix.model.Message.MessageType; @@ -78,11 +78,11 @@ public class CurrentStateComputationStage extends AbstractBaseStage { if (resource == null) { continue; } + IdealState idealState = resource.getIdealState(); if (!message.getBatchMessageMode()) { PartitionId partitionId = message.getPartitionId(); - Partition partition = resource.getSubUnit(partitionId); - if (partition != null) { + if (idealState.getPartitionIdSet().contains(partitionId)) { currentStateOutput.setPendingState(resourceId, partitionId, participantId, message.getTypedToState()); } else { @@ -92,8 +92,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage { List partitionNames = message.getPartitionIds(); if (!partitionNames.isEmpty()) { for (PartitionId partitionId : partitionNames) { - Partition partition = resource.getSubUnit(partitionId); - if (partition != null) { + if (idealState.getPartitionIdSet().contains(partitionId)) { currentStateOutput.setPendingState(resourceId, partitionId, participantId, message.getTypedToState()); } else { http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java index fc247a6..9f2721f 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java @@ -101,7 +101,7 @@ public class ExternalViewComputeStage extends AbstractBaseStage { } else { view.setBucketSize(currentStateOutput.getBucketSize(resourceId)); } - for (PartitionId partitionId : resource.getSubUnitMap().keySet()) { + for (PartitionId partitionId : resource.getSubUnitSet()) { Map currentStateMap = currentStateOutput.getCurrentStateMap(resourceId, partitionId); if (currentStateMap != null && currentStateMap.size() > 0) { http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java index a49feae..893e116 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java @@ -141,7 +141,7 @@ public class MessageGenerationStage extends AbstractBaseStage { if (rebalancerConfig != null && rebalancerConfig.getStateModelDefId().equalsIgnoreCase( StateModelDefId.SchedulerTaskQueue)) { - if (resourceConfig.getSubUnitMap().size() > 0) { + if (resourceConfig.getSubUnitSet().size() > 0) { // TODO refactor it -- we need a way to read in scheduler tasks a priori Message innerMsg = resourceConfig.getSchedulerTaskConfig().getInnerMessage(subUnitId); http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java index 9f894e7..1fc7142 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java @@ -73,6 +73,7 @@ public class ResourceComputationStage extends AbstractBaseStage { resCfgBuilder.schedulerTaskConfig(resource.getSchedulerTaskConfig()); resCfgBuilder.rebalancerConfig(rebalancerCfg); resCfgBuilder.provisionerConfig(resource.getProvisionerConfig()); + resCfgBuilder.idealState(resource.getIdealState()); resCfgMap.put(resourceId, resCfgBuilder.build()); } @@ -134,7 +135,10 @@ public class ResourceComputationStage extends AbstractBaseStage { for (ResourceId resourceId : resCfgBuilderMap.keySet()) { ResourceConfig.Builder resCfgBuilder = resCfgBuilderMap.get(resourceId); PartitionedRebalancerConfig.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId); - resCfgBuilder.rebalancerConfig(rebCtxBuilder.build()); + RebalancerConfig rebalancerConfig = rebCtxBuilder.build(); + resCfgBuilder.rebalancerConfig(rebalancerConfig); + resCfgBuilder.idealState(PartitionedRebalancerConfig.rebalancerConfigToIdealState( + rebalancerConfig, 0, false)); resCfgMap.put(resourceId, resCfgBuilder.build()); } http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java index ea2536d..bec6f5c 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixConnection.java @@ -58,8 +58,6 @@ import org.apache.helix.PropertyType; import org.apache.helix.ScopedConfigChangeListener; import org.apache.helix.ZNRecord; import org.apache.helix.api.accessor.ClusterAccessor; -import org.apache.helix.api.accessor.ParticipantAccessor; -import org.apache.helix.api.accessor.ResourceAccessor; import org.apache.helix.api.id.ClusterId; import org.apache.helix.api.id.ControllerId; import org.apache.helix.api.id.ParticipantId; @@ -233,16 +231,6 @@ public class ZkHelixConnection implements HelixConnection, IZkStateListener { } @Override - public ResourceAccessor createResourceAccessor(ClusterId clusterId) { - return new ResourceAccessor(clusterId, createDataAccessor(clusterId)); - } - - @Override - public ParticipantAccessor createParticipantAccessor(ClusterId clusterId) { - return new ParticipantAccessor(clusterId, createDataAccessor(clusterId)); - } - - @Override public HelixAdmin createClusterManagementTool() { return new ZKHelixAdmin(_zkclient); } http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java index 475b3cf..f9529b7 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java @@ -118,7 +118,7 @@ public class ZkHelixController implements HelixController { * from here on, we are dealing with new session * init handlers */ - if (!_clusterAccessor.isClusterStructureValid()) { + if (!ZKUtil.isClusterSetup(_clusterId.toString(), _connection._zkclient)) { throw new HelixException("Cluster structure is not set up for cluster: " + _clusterId); } http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java index d42b7b7..674140e 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixParticipant.java @@ -40,7 +40,6 @@ import org.apache.helix.PreConnectCallback; import org.apache.helix.PropertyKey; import org.apache.helix.ZNRecord; import org.apache.helix.api.accessor.ClusterAccessor; -import org.apache.helix.api.accessor.ParticipantAccessor; import org.apache.helix.api.config.ParticipantConfig; import org.apache.helix.api.id.ClusterId; import org.apache.helix.api.id.Id; @@ -70,7 +69,6 @@ public class ZkHelixParticipant implements HelixParticipant { final PropertyKey.Builder _keyBuilder; final ConfigAccessor _configAccessor; final ClusterAccessor _clusterAccessor; - final ParticipantAccessor _participantAccessor; final DefaultMessagingService _messagingService; final List _preConnectCallbacks; final List _timerTasks; @@ -89,7 +87,6 @@ public class ZkHelixParticipant implements HelixParticipant { _baseAccessor = _accessor.getBaseDataAccessor(); _keyBuilder = _accessor.keyBuilder(); _clusterAccessor = connection.createClusterAccessor(clusterId); - _participantAccessor = connection.createParticipantAccessor(clusterId); _configAccessor = connection.getConfigAccessor(); _clusterId = clusterId; @@ -310,7 +307,8 @@ public class ZkHelixParticipant implements HelixParticipant { // autoJoin is false } - if (!_participantAccessor.isParticipantStructureValid(_participantId)) { + if (!ZKUtil.isInstanceSetup(_connection._zkclient, _clusterId.toString(), + _participantId.toString(), getType())) { if (!autoJoin) { throw new HelixException("Initial cluster structure is not set up for instance: " + _participantId + ", instanceType: " + getType()); @@ -358,7 +356,7 @@ public class ZkHelixParticipant implements HelixParticipant { /** * from here on, we are dealing with new session */ - if (!_clusterAccessor.isClusterStructureValid()) { + if (!ZKUtil.isClusterSetup(_clusterId.toString(), _connection._zkclient)) { throw new HelixException("Cluster structure is not set up for cluster: " + _clusterId); } @@ -470,8 +468,4 @@ public class ZkHelixParticipant implements HelixParticipant { return _clusterAccessor; } - public ParticipantAccessor getParticipantAccessor() { - return _participantAccessor; - } - } http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/model/IdealState.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java index dc42a52..88ec610 100644 --- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java +++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java @@ -478,6 +478,9 @@ public class IdealState extends HelixProperty { * @param preferenceList a list of participants that can serve replicas of the partition */ public void setPreferenceList(PartitionId partitionId, List preferenceList) { + if (preferenceList == null) { + preferenceList = Collections.emptyList(); + } List rawPreferenceList = new ArrayList(); for (ParticipantId participantId : preferenceList) { rawPreferenceList.add(participantId.stringify()); http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java index 0e11d21..29990ed 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java @@ -112,9 +112,9 @@ public abstract class TaskRebalancer implements HelixRebalancer { } @Override - public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig, - ResourceAssignment prevAssignment, Cluster cluster, ResourceCurrentState currentState) { - IdealState taskIs = cluster.getResource(rebalancerConfig.getResourceId()).getIdealState(); + public ResourceAssignment computeResourceMapping(IdealState taskIs, + RebalancerConfig rebalancerConfig, ResourceAssignment prevAssignment, Cluster cluster, + ResourceCurrentState currentState) { return computeBestPossiblePartitionState(cluster, taskIs, cluster.getResource(rebalancerConfig.getResourceId()), currentState); } http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java index 0a8a41d..6599b33 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java @@ -984,6 +984,7 @@ public class ClusterSetup { while (read < bytes.length && (numRead = dis.read(bytes, read, bytes.length - read)) >= 0) { read = read + numRead; } + dis.close(); return bytes; } http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java deleted file mode 100644 index c8281ba..0000000 --- a/helix-core/src/main/java/org/apache/helix/tools/NewClusterSetup.java +++ /dev/null @@ -1,1168 +0,0 @@ -package org.apache.helix.tools; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.io.DataInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.GnuParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.OptionGroup; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.helix.BaseDataAccessor; -import org.apache.helix.HelixDataAccessor; -import org.apache.helix.ZNRecord; -import org.apache.helix.api.Cluster; -import org.apache.helix.api.Participant; -import org.apache.helix.api.Resource; -import org.apache.helix.api.RunningInstance; -import org.apache.helix.api.Scope; -import org.apache.helix.api.Scope.ScopeType; -import org.apache.helix.api.State; -import org.apache.helix.api.accessor.ClusterAccessor; -import org.apache.helix.api.accessor.ParticipantAccessor; -import org.apache.helix.api.accessor.ResourceAccessor; -import org.apache.helix.api.config.ClusterConfig; -import org.apache.helix.api.config.ParticipantConfig; -import org.apache.helix.api.config.ResourceConfig; -import org.apache.helix.api.config.UserConfig; -import org.apache.helix.api.id.ClusterId; -import org.apache.helix.api.id.ConstraintId; -import org.apache.helix.api.id.ParticipantId; -import org.apache.helix.api.id.PartitionId; -import org.apache.helix.api.id.ResourceId; -import org.apache.helix.api.id.StateModelDefId; -import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig; -import org.apache.helix.controller.rebalancer.config.CustomRebalancerConfig; -import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig; -import org.apache.helix.controller.rebalancer.config.RebalancerConfig; -import org.apache.helix.controller.rebalancer.config.RebalancerConfigHolder; -import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig; -import org.apache.helix.manager.zk.ZKHelixDataAccessor; -import org.apache.helix.manager.zk.ZNRecordSerializer; -import org.apache.helix.manager.zk.ZkBaseDataAccessor; -import org.apache.helix.manager.zk.ZkClient; -import org.apache.helix.model.ClusterConstraints; -import org.apache.helix.model.ClusterConstraints.ConstraintType; -import org.apache.helix.model.ConstraintItem; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.IdealState.RebalanceMode; -import org.apache.helix.model.StateModelDefinition; -import org.apache.helix.util.HelixUtil; -import org.apache.log4j.Logger; - -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -/** - * Parse command line and call helix-admin - */ -public class NewClusterSetup { - - private static Logger LOG = Logger.getLogger(NewClusterSetup.class); - - /** - * List all helix cluster setup options - */ - public enum HelixOption { - // help - help(0, "", "Print command-line options"), - - // zookeeper address - zkSvr(1, true, "zookeeperServerAddress", "Zookeeper address (host:port, required)"), - - // list cluster/resource/instances - listClusters(0, "", "List clusters"), - listResources(1, "clusterId", "List resources in a cluster"), - listInstances(1, "clusterId", "List instances in a cluster"), - - // add, drop, and rebalance cluster - addCluster(1, "clusterId", "Add a new cluster"), - activateCluster(3, "clusterId grandClusterId true/false", - "Enable/disable a cluster in distributed controller mode"), - dropCluster(1, "clusterId", "Delete a cluster"), - dropResource(2, "clusterId resourceId", "Drop a resource from a cluster"), - addInstance(2, "clusterId instanceId", "Add an instance to a cluster"), - addResource(4, "clusterId resourceId partitionNumber stateModelDefId", - "Add a resource to a cluster"), - addStateModelDef(2, "clusterId jsonFileName", "Add a state model definition to a cluster"), - addIdealState(2, "clusterId resourceId jsonfileName", - "Add an ideal state of a resource in cluster"), - swapInstance(3, "clusterId oldInstanceId newInstanceId", - "Swap an old instance in cluster with a new instance"), - dropInstance(2, "clusterId instanceId", "Drop an instance from a cluster"), - rebalance(3, "clusterId resourceId replicas", "Rebalance a resource in cluster"), - expandCluster(1, "clusterId", "Expand a cluster"), - expandResource(2, "clusterId resourceId", "Expand resource to additional nodes"), - @Deprecated - mode(1, "rebalancerMode", "Specify rebalancer mode, used with " + addResource + " command"), - rebalancerMode(1, "rebalancerMode", "Specify rebalancer mode, used with " + addResource - + " command"), - instanceGroupTag(1, "instanceGroupTag", "Specify instance group tag, used with " + rebalance - + " command"), - bucketSize(1, "bucketSize", "Specify bucket size, used with " + addResource + " command"), - resourceKeyPrefix(1, "resourceKeyPrefix", "Specify resource key prefix, used with " + rebalance - + " command"), - maxPartitionsPerNode(1, "maxPartitionsPerNode", "Specify max partitions per node, used with " - + addResource + " command"), - addResourceProperty(4, "clusterId resourceId propertyName propertyValue", - "Add a resource property"), - removeResourceProperty(3, "clusterId resourceId propertyName", "Remove a resource property"), - addInstanceTag(3, "clusterId instanceId tag", "Add a tag to instance"), - removeInstanceTag(3, "clusterId instanceId tag", "Remove a tag from instance"), - - // query info - listClusterInfo(1, "clusterId", "Query informaton of a cluster"), - listInstanceInfo(2, "clusterId instanceId", "Query information of an instance in cluster"), - listResourceInfo(2, "clusterId resourceId", "Query information of a resource"), - listPartitionInfo(3, "clusterId resourceId partitionId", "Query information of a partition"), - listStateModels(1, "clusterId", "Query information of state models in a cluster"), - listStateModel(2, "clusterId stateModelDefId", "Query information of a state model in cluster"), - - // enable/disable/reset instances/cluster/resource/partition - enableInstance(3, "clusterId instanceId true/false", "Enable/disable an instance"), - enablePartition(-1, "true/false clusterId instanceId resourceId partitionId...", - "Enable/disable partitions"), - enableCluster(2, "clusterId true/false", "Pause/resume the controller of a cluster"), - resetPartition(4, "clusterId instanceId resourceId partitionName", - "Reset a partition in error state"), - resetInstance(2, "clusterId instanceId", "Reset all partitions in error state for an instance"), - resetResource(2, "clusterId resourceId", "Reset all partitions in error state for a resource"), - - // stats/alerts - addStat(2, "clusterId statName", "Add a persistent stat"), - addAlert(2, "clusterId alertName", "Add an alert"), - dropStat(2, "clusterId statName", "Drop a persistent stat"), - dropAlert(2, "clusterId alertName", "Drop an alert"), - - // set/set/remove configs - getConfig(3, "scope(e.g. RESOURCE) configScopeArgs(e.g. myCluster,testDB) keys(e.g. k1,k2)", - "Get configs"), - setConfig(3, - "scope(e.g. RESOURCE) configScopeArgs(e.g. myCluster,testDB) keyValues(e.g. k1=v1,k2=v2)", - "Set configs"), - removeConfig(3, "scope(e.g. RESOURCE) configScopeArgs(e.g. myCluster,testDB) keys(e.g. k1,k2)", - "Remove configs"), - - // get/set/remove constraints - getConstraints(2, "clusterId constraintType(e.g. MESSAGE_CONSTRAINT)", "Get constraints"), - setConstraint( - 4, - "clusterId constraintType(e.g. MESSAGE_CONSTRAINT) constraintId keyValues(e.g. k1=v1,k2=v2)", - "Set a constraint, create if not exist"), - removeConstraint(3, "clusterId constraintType(e.g. MESSAGE_CONSTRAINT) constraintId", - "Remove a constraint"); - - final int _argNum; - final boolean _isRequired; - final String _argName; - final String _description; - - private HelixOption(int argNum, boolean isRequired, String argName, String description) { - _argNum = argNum; - _isRequired = isRequired; - _argName = argName; - _description = description; - } - - private HelixOption(int argNum, String argName, String description) { - this(argNum, false, argName, description); - } - } - - private final ZkClient _zkclient; - private final BaseDataAccessor _baseAccessor; - - private NewClusterSetup(ZkClient zkclient) { - _zkclient = zkclient; - _baseAccessor = new ZkBaseDataAccessor(_zkclient); - } - - @SuppressWarnings("static-access") - static Options constructCommandLineOptions() { - Options options = new Options(); - - OptionGroup optionGroup = new OptionGroup(); - for (HelixOption option : HelixOption.values()) { - Option opt = - OptionBuilder.withLongOpt(option.name()).hasArgs(option._argNum) - .isRequired(option._isRequired).withArgName(option._argName) - .withDescription(option._description).create(); - if (option == HelixOption.help || option == HelixOption.zkSvr) { - options.addOption(opt); - } else { - optionGroup.addOption(opt); - } - } - options.addOptionGroup(optionGroup); - return options; - } - - /** - * Check if we have the right number of arguments - * @param opt - * @param optValues - */ - static void checkArgNum(HelixOption opt, String[] optValues) { - - if (opt._argNum != -1 && opt._argNum < optValues.length) { - throw new IllegalArgumentException(opt + " should have no less than " + opt._argNum - + " arguments, but was: " + optValues.length + ", " + Arrays.asList(optValues)); - } - } - - static void printUsage(Options cliOptions) { - HelpFormatter helpFormatter = new HelpFormatter(); - helpFormatter.setWidth(1000); - helpFormatter.printHelp("java " + NewClusterSetup.class.getName(), cliOptions); - } - - ClusterAccessor clusterAccessor(String clusterName) { - HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor); - return new ClusterAccessor(ClusterId.from(clusterName), accessor); - } - - ParticipantAccessor participantAccessor(String clusterName) { - return new ParticipantAccessor(ClusterId.from(clusterName), new ZKHelixDataAccessor( - clusterName, _baseAccessor)); - } - - ResourceAccessor resourceAccessor(String clusterName) { - return new ResourceAccessor(ClusterId.from(clusterName), new ZKHelixDataAccessor(clusterName, - _baseAccessor)); - } - - void addCluster(String[] optValues) { - String clusterName = optValues[0]; - - List defaultStateModelDefs = new ArrayList(); - defaultStateModelDefs.add(new StateModelDefinition(StateModelConfigGenerator - .generateConfigForMasterSlave())); - - ClusterConfig.Builder builder = - new ClusterConfig.Builder(ClusterId.from(clusterName)) - .addStateModelDefinitions(defaultStateModelDefs); - - ClusterAccessor accessor = clusterAccessor(clusterName); - accessor.createCluster(builder.build()); - } - - void addResource(String[] optValues, String[] rebalancerModeValues, String[] bucketSizeValues, - String[] maxPartitionsPerNodeValues) { - String clusterName = optValues[0]; - String resourceName = optValues[1]; - int partitionNumber = Integer.parseInt(optValues[2]); - String stateModelDefName = optValues[3]; - RebalanceMode rebalancerMode = - rebalancerModeValues == null ? RebalanceMode.SEMI_AUTO : RebalanceMode - .valueOf(rebalancerModeValues[0]); - - int bucketSize = bucketSizeValues == null ? 0 : Integer.parseInt(bucketSizeValues[0]); - - int maxPartitionsPerNode = - maxPartitionsPerNodeValues == null ? -1 : Integer.parseInt(maxPartitionsPerNodeValues[0]); - - ResourceId resourceId = ResourceId.from(resourceName); - StateModelDefId stateModelDefId = StateModelDefId.from(stateModelDefName); - - IdealState idealState = new IdealState(resourceName); - idealState.setRebalanceMode(rebalancerMode); - idealState.setNumPartitions(partitionNumber); - idealState.setMaxPartitionsPerInstance(maxPartitionsPerNode); - idealState.setStateModelDefId(stateModelDefId); - - RebalancerConfig rebalancerCtx = PartitionedRebalancerConfig.from(idealState); - ResourceConfig.Builder builder = - new ResourceConfig.Builder(resourceId).rebalancerConfig(rebalancerCtx).bucketSize( - bucketSize); - - ClusterAccessor accessor = clusterAccessor(clusterName); - accessor.addResourceToCluster(builder.build()); - - } - - void rebalance(String[] optValues, String[] groupTagValues) { - String clusterName = optValues[0]; - String resourceName = optValues[1]; - int replicaCount = Integer.parseInt(optValues[2]); - String groupTag = null; - if (groupTagValues != null && groupTagValues.length > 0) { - groupTag = groupTagValues[0]; - } - ResourceAccessor accessor = resourceAccessor(clusterName); - accessor.generateDefaultAssignment(ResourceId.from(resourceName), replicaCount, groupTag); - } - - void addInstance(String[] optValues) { - String clusterName = optValues[0]; - String[] instanceIds = optValues[1].split(";"); - - ClusterAccessor accessor = clusterAccessor(clusterName); - for (String instanceId : instanceIds) { - ParticipantConfig.Builder builder = - new ParticipantConfig.Builder(ParticipantId.from(instanceId)); - - accessor.addParticipantToCluster(builder.build()); - } - } - - void dropCluster(String[] optValues) { - String clusterName = optValues[0]; - ClusterAccessor accessor = clusterAccessor(clusterName); - accessor.dropCluster(); - } - - void dropResource(String[] optValues) { - String clusterName = optValues[0]; - String resourceName = optValues[1]; - - ClusterAccessor accessor = clusterAccessor(clusterName); - accessor.dropResourceFromCluster(ResourceId.from(resourceName)); - } - - void dropInstance(String[] optValues) { - String clusterName = optValues[0]; - String[] instanceIds = optValues[1].split(";"); - ClusterAccessor accessor = clusterAccessor(clusterName); - for (String instanceId : instanceIds) { - accessor.dropParticipantFromCluster(ParticipantId.from(instanceId)); - } - - } - - private static byte[] readFile(String filePath) throws IOException { - File file = new File(filePath); - - int size = (int) file.length(); - byte[] bytes = new byte[size]; - DataInputStream dis = null; - try { - dis = new DataInputStream(new FileInputStream(file)); - int read = 0; - int numRead = 0; - while (read < bytes.length && (numRead = dis.read(bytes, read, bytes.length - read)) >= 0) { - read = read + numRead; - } - return bytes; - } finally { - if (dis != null) { - dis.close(); - } - } - } - - void addStateModelDef(String[] optValues) { - String clusterName = optValues[0]; - String stateModelDefJsonFile = optValues[1]; - - try { - StateModelDefinition stateModelDef = - new StateModelDefinition( - (ZNRecord) (new ZNRecordSerializer().deserialize(readFile(stateModelDefJsonFile)))); - ClusterAccessor accessor = clusterAccessor(clusterName); - accessor.addStateModelDefinitionToCluster(stateModelDef); - - } catch (IOException e) { - LOG.error("Could not parse the state model", e); - } - - } - - void addIdealState(String[] optValues) { - String clusterName = optValues[0]; - String resourceName = optValues[1]; - String idealStateJsonFile = optValues[2]; - - try { - IdealState idealState = - new IdealState( - (ZNRecord) (new ZNRecordSerializer().deserialize(readFile(idealStateJsonFile)))); - - RebalancerConfig rebalancerCtx = PartitionedRebalancerConfig.from(idealState); - ResourceConfig.Builder builder = - new ResourceConfig.Builder(ResourceId.from(resourceName)).rebalancerConfig(rebalancerCtx) - .bucketSize(idealState.getBucketSize()); - - ClusterAccessor accessor = clusterAccessor(clusterName); - accessor.addResourceToCluster(builder.build()); - } catch (IOException e) { - e.printStackTrace(); - } - - } - - void addInstanceTag(String[] optValues) { - String clusterName = optValues[0]; - String participantName = optValues[1]; - String tag = optValues[2]; - - ParticipantAccessor accessor = participantAccessor(clusterName); - ParticipantId participantId = ParticipantId.from(participantName); - - ParticipantConfig.Delta delta = new ParticipantConfig.Delta(participantId); - delta.addTag(tag); - accessor.updateParticipant(participantId, delta); - } - - void removeInstanceTag(String[] optValues) { - String clusterName = optValues[0]; - String participantName = optValues[1]; - String tag = optValues[2]; - - ParticipantAccessor accessor = participantAccessor(clusterName); - ParticipantId participantId = ParticipantId.from(participantName); - - ParticipantConfig.Delta delta = new ParticipantConfig.Delta(participantId); - delta.removeTag(tag); - accessor.updateParticipant(participantId, delta); - } - - void listPartitionInfo(String[] optValues) { - String clusterName = optValues[0]; - String resourceName = optValues[1]; - String partitionName = optValues[2]; - - ResourceId resourceId = ResourceId.from(resourceName); - PartitionId partitionId = PartitionId.from(partitionName); - ResourceAccessor accessor = resourceAccessor(clusterName); - Resource resource = accessor.readResource(resourceId); - - StringBuilder sb = new StringBuilder(); - Map stateMap = resource.getExternalView().getStateMap(partitionId); - sb.append(resourceName + "/" + partitionName + ", externalView: " + stateMap); - PartitionedRebalancerConfig partitionedConfig = - PartitionedRebalancerConfig.from(resource.getRebalancerConfig()); - if (partitionedConfig != null) { - // for partitioned contexts, check the mode and apply mode-specific information if possible - if (partitionedConfig.getRebalanceMode() == RebalanceMode.SEMI_AUTO) { - SemiAutoRebalancerConfig semiAutoConfig = - BasicRebalancerConfig.convert(resource.getRebalancerConfig(), - SemiAutoRebalancerConfig.class); - sb.append(", preferenceList: " + semiAutoConfig.getPreferenceList(partitionId)); - } else if (partitionedConfig.getRebalanceMode() == RebalanceMode.CUSTOMIZED) { - CustomRebalancerConfig customConfig = - BasicRebalancerConfig.convert(resource.getRebalancerConfig(), - CustomRebalancerConfig.class); - sb.append(", preferenceMap: " + customConfig.getPreferenceMap(partitionId)); - } - if (partitionedConfig.anyLiveParticipant()) { - sb.append(", anyLiveParticipant: " + partitionedConfig.anyLiveParticipant()); - } else { - sb.append(", replicaCount: " + partitionedConfig.getReplicaCount()); - } - } - - System.out.println(sb.toString()); - } - - void enableInstance(String[] optValues) { - String clusterName = optValues[0]; - String instanceId = optValues[1]; - if (instanceId.indexOf(":") != -1) { - instanceId = instanceId.replaceAll(":", "_"); - } - boolean enabled = Boolean.parseBoolean(optValues[2].toLowerCase()); - - ParticipantAccessor accessor = participantAccessor(clusterName); - if (enabled) { - accessor.enableParticipant(ParticipantId.from(instanceId)); - } else { - accessor.disableParticipant(ParticipantId.from(instanceId)); - } - } - - void enablePartition(String[] optValues) { - boolean enabled = Boolean.parseBoolean(optValues[0].toLowerCase()); - String clusterName = optValues[1]; - ParticipantId participantId = ParticipantId.from(optValues[2]); - ResourceId resourceId = ResourceId.from(optValues[3]); - - Set partitionIdSet = new HashSet(); - for (int i = 4; i < optValues.length; i++) { - partitionIdSet.add(PartitionId.from(optValues[i])); - } - - ParticipantAccessor accessor = participantAccessor(clusterName); - if (enabled) { - accessor.enablePartitionsForParticipant(participantId, resourceId, partitionIdSet); - } else { - accessor.disablePartitionsForParticipant(participantId, resourceId, partitionIdSet); - } - } - - void enableCluster(String[] optValues) { - String clusterName = optValues[0]; - boolean enabled = Boolean.parseBoolean(optValues[1].toLowerCase()); - - ClusterAccessor accessor = clusterAccessor(clusterName); - if (enabled) { - accessor.resumeCluster(); - } else { - accessor.pauseCluster(); - } - } - - /** - * Convert user config to key value map - * @param userConfig - * @param mapKey - * @param keys - * @return - */ - private Map keyValueMap(UserConfig userConfig, String mapKey, String[] keys) { - Map results = new HashMap(); - - for (String key : keys) { - if (mapKey == null) { - results.put(key, userConfig.getSimpleField(key)); - } else { - results.put(key, userConfig.getMapField(mapKey).get(key)); - } - } - return results; - } - - void getConfig(String[] optValues) { - ScopeType scopeType = ScopeType.valueOf(optValues[0].toUpperCase()); - String[] scopeArgs = optValues[1].split("[\\s,]"); - String[] keys = optValues[2].split("[\\s,]"); - - String clusterName = scopeArgs[0]; - Map results = null; - switch (scopeType) { - case CLUSTER: { - ClusterAccessor accessor = clusterAccessor(clusterName); - results = keyValueMap(accessor.readUserConfig(), null, keys); - break; - } - case PARTICIPANT: { - ParticipantId participantId = ParticipantId.from(scopeArgs[1]); - ParticipantAccessor accessor = participantAccessor(clusterName); - results = keyValueMap(accessor.readUserConfig(participantId), null, keys); - break; - } - case RESOURCE: { - ResourceId resourceId = ResourceId.from(scopeArgs[1]); - ResourceAccessor accessor = resourceAccessor(clusterName); - results = keyValueMap(accessor.readUserConfig(resourceId), null, keys); - break; - } - case PARTITION: { - ResourceId resourceId = ResourceId.from(scopeArgs[1]); - String partitionId = scopeArgs[2]; - ResourceAccessor accessor = resourceAccessor(clusterName); - results = keyValueMap(accessor.readUserConfig(resourceId), partitionId, keys); - break; - } - default: - System.err.println("Non-recognized scopeType: " + scopeType); - break; - } - - System.out.println(results); - } - - /** - * Convert key-value map to user-config - * @param scope - * @param mapKey - * @param keyValues - * @return - */ - private UserConfig userConfig(Scope scope, String mapKey, String[] keyValues) { - UserConfig userConfig = new UserConfig(scope); - - for (String keyValue : keyValues) { - String[] splits = keyValue.split("="); - String key = splits[0]; - String value = splits[1]; - if (mapKey == null) { - userConfig.setSimpleField(key, value); - } else { - if (userConfig.getMapField(mapKey) == null) { - userConfig.setMapField(mapKey, new TreeMap()); - } - userConfig.getMapField(mapKey).put(key, value); - } - } - return userConfig; - } - - void setConfig(String[] optValues) { - ScopeType scopeType = ScopeType.valueOf(optValues[0].toUpperCase()); - String[] scopeArgs = optValues[1].split("[\\s,]"); - String[] keyValues = optValues[2].split("[\\s,]"); - - String clusterName = scopeArgs[0]; - Map results = new HashMap(); - switch (scopeType) { - case CLUSTER: { - ClusterAccessor accessor = clusterAccessor(clusterName); - Scope scope = Scope.cluster(ClusterId.from(clusterName)); - UserConfig userConfig = userConfig(scope, null, keyValues); - accessor.setUserConfig(userConfig); - break; - } - case PARTICIPANT: { - ParticipantId participantId = ParticipantId.from(scopeArgs[1]); - ParticipantAccessor accessor = participantAccessor(clusterName); - Scope scope = Scope.participant(participantId); - UserConfig userConfig = userConfig(scope, null, keyValues); - accessor.setUserConfig(participantId, userConfig); - break; - } - case RESOURCE: { - ResourceId resourceId = ResourceId.from(scopeArgs[1]); - ResourceAccessor accessor = resourceAccessor(clusterName); - Scope scope = Scope.resource(resourceId); - UserConfig userConfig = userConfig(scope, null, keyValues); - accessor.setUserConfig(resourceId, userConfig); - break; - } - case PARTITION: { - ResourceId resourceId = ResourceId.from(scopeArgs[1]); - String partitionId = scopeArgs[2]; - ResourceAccessor accessor = resourceAccessor(clusterName); - Scope scope = Scope.resource(resourceId); - UserConfig userConfig = userConfig(scope, partitionId, keyValues); - accessor.setUserConfig(resourceId, userConfig); - break; - } - default: - System.err.println("Non-recognized scopeType: " + scopeType); - break; - } - - System.out.println(results); - } - - void setConstraint(String[] optValues) { - String clusterName = optValues[0]; - String constraintType = optValues[1]; - String constraintId = optValues[2]; - String constraintAttributesMap = optValues[3]; - if (clusterName == null || constraintType == null || constraintId == null - || constraintAttributesMap == null) { - System.err - .println("fail to set constraint. missing clusterName|constraintType|constraintId|constraintAttributesMap"); - return; - } - ClusterId clusterId = ClusterId.from(clusterName); - ClusterAccessor accessor = clusterAccessor(clusterName); - Map constraintAttributes = - HelixUtil.parseCsvFormatedKeyValuePairs(constraintAttributesMap); - ConstraintItem item = new ConstraintItem(constraintAttributes); - ClusterConfig.Delta delta = - new ClusterConfig.Delta(clusterId).addConstraintItem( - ConstraintType.valueOf(constraintType), ConstraintId.from(constraintId), item); - accessor.updateCluster(delta); - } - - void getConstraints(String[] optValues) { - String clusterName = optValues[0]; - ConstraintType constraintType = ConstraintType.valueOf(optValues[1]); - ClusterAccessor accessor = clusterAccessor(clusterName); - ClusterConstraints constraints = accessor.readConstraints(constraintType); - System.out.println(constraints.toString()); - } - - void removeConstraint(String[] optValues) { - String clusterName = optValues[0]; - ConstraintType constraintType = ConstraintType.valueOf(optValues[1]); - ConstraintId constraintId = ConstraintId.from(optValues[2]); - ClusterAccessor accessor = clusterAccessor(clusterName); - accessor.removeConstraint(constraintType, constraintId); - } - - void listClusterInfo(String[] optValues) { - String clusterName = optValues[0]; - ClusterAccessor accessor = clusterAccessor(clusterName); - Set resources = accessor.readResources().keySet(); - StringBuilder sb = - new StringBuilder("Existing resources in cluster ").append(clusterName).append(":\n"); - for (ResourceId resourceId : resources) { - sb.append(resourceId.stringify()).append('\n'); - } - Set participants = accessor.readParticipants().keySet(); - sb.append("Participants in cluster ").append(clusterName).append(":\n"); - for (ParticipantId participantId : participants) { - sb.append(participantId.stringify()).append('\n'); - } - System.out.print(sb.toString()); - } - - void listParticipantInfo(String[] optValues) { - String clusterName = optValues[0]; - String participantName = optValues[1]; - ParticipantAccessor accessor = participantAccessor(clusterName); - ParticipantId participantId = ParticipantId.from(participantName); - Participant participant = accessor.readParticipant(participantId); - StringBuilder sb = - new StringBuilder("Participant ").append(participantName).append(" in cluster ") - .append(clusterName).append(":\n").append("hostName: ") - .append(participant.getHostName()).append(", port: ").append(participant.getPort()) - .append(", enabled: ").append(participant.isEnabled()).append(", disabledPartitions: ") - .append(participant.getDisabledPartitionIds().toString()).append(", tags:") - .append(participant.getTags().toString()).append(", currentState: ") - .append(", messages: ").append(participant.getMessageMap().toString()) - .append(participant.getCurrentStateMap().toString()).append(", alive: ") - .append(participant.isAlive()).append(", userConfig: ") - .append(participant.getUserConfig().toString()); - if (participant.isAlive()) { - RunningInstance runningInstance = participant.getRunningInstance(); - sb.append(", sessionId: ").append(runningInstance.getSessionId().stringify()) - .append(", processId: ").append(runningInstance.getPid().stringify()) - .append(", helixVersion: ").append(runningInstance.getVersion().toString()); - } - System.out.println(sb.toString()); - } - - void listResourceInfo(String[] optValues) { - String clusterName = optValues[0]; - String resourceName = optValues[1]; - ResourceAccessor accessor = resourceAccessor(clusterName); - ResourceId resourceId = ResourceId.from(resourceName); - Resource resource = accessor.readResource(resourceId); - RebalancerConfigHolder holder = new RebalancerConfigHolder(resource.getRebalancerConfig()); - StringBuilder sb = - new StringBuilder("Resource ").append(resourceName).append(" in cluster ") - .append(clusterName).append(":\n").append("externalView: ") - .append(resource.getExternalView()).append(", userConfig: ") - .append(resource.getUserConfig()).append(", rebalancerConfig: ") - .append(holder.getSerializedConfig()); - System.out.println(sb.toString()); - } - - void listResources(String[] optValues) { - String clusterName = optValues[0]; - ClusterAccessor accessor = clusterAccessor(clusterName); - Set resources = accessor.readResources().keySet(); - StringBuilder sb = - new StringBuilder("Existing resources in cluster ").append(clusterName).append(":\n"); - for (ResourceId resourceId : resources) { - sb.append(resourceId.stringify()).append('\n'); - } - System.out.print(sb.toString()); - } - - void listParticipants(String[] optValues) { - String clusterName = optValues[0]; - ClusterAccessor accessor = clusterAccessor(clusterName); - Set participants = accessor.readParticipants().keySet(); - StringBuilder sb = - new StringBuilder("Participants in cluster ").append(clusterName).append(":\n"); - for (ParticipantId participantId : participants) { - sb.append(participantId.stringify()).append('\n'); - } - System.out.print(sb.toString()); - } - - void listStateModels(String[] optValues) { - String clusterName = optValues[0]; - ClusterAccessor accessor = clusterAccessor(clusterName); - Set stateModelDefs = accessor.readStateModelDefinitions().keySet(); - StringBuilder sb = - new StringBuilder("State models in cluster ").append(clusterName).append(":\n"); - for (StateModelDefId stateModelDefId : stateModelDefs) { - sb.append(stateModelDefId.stringify()).append('\n'); - } - System.out.print(sb.toString()); - } - - void listStateModel(String[] optValues) { - String clusterName = optValues[0]; - String stateModel = optValues[1]; - StateModelDefId stateModelDefId = StateModelDefId.from(stateModel); - ClusterAccessor accessor = clusterAccessor(clusterName); - Map stateModelDefs = - accessor.readStateModelDefinitions(); - StateModelDefinition stateModelDef = stateModelDefs.get(stateModelDefId); - StringBuilder sb = new StringBuilder("StateModelDefinition: ").append(stateModelDef.toString()); - System.out.println(sb.toString()); - } - - void listClusters(String[] optValues) { - List result = Lists.newArrayList(); - List clusterNames = _baseAccessor.getChildNames("/", 0); - for (String clusterName : clusterNames) { - ClusterAccessor accessor = clusterAccessor(clusterName); - if (accessor.isClusterStructureValid()) { - result.add(ClusterId.from(clusterName)); - } - } - System.out.println("Existing clusters: " + result); - } - - void removeConfig(String[] optValues) { - ScopeType type = ScopeType.valueOf(optValues[0].toUpperCase()); - String[] scopeArgs = optValues[1].split("[\\s,]"); - String[] keys = optValues[2].split("[\\s,]"); - String clusterName = scopeArgs[0]; - UserConfig userConfig; - switch (type) { - case CLUSTER: - ClusterAccessor clusterAccessor = clusterAccessor(clusterName); - userConfig = clusterAccessor.readUserConfig(); - removeKeysFromUserConfig(userConfig, keys); - clusterAccessor.setUserConfig(userConfig); - break; - case RESOURCE: - ResourceAccessor resourceAccessor = resourceAccessor(clusterName); - ResourceId resourceId = ResourceId.from(scopeArgs[1]); - userConfig = resourceAccessor.readUserConfig(resourceId); - removeKeysFromUserConfig(userConfig, keys); - resourceAccessor.setUserConfig(resourceId, userConfig); - break; - case PARTICIPANT: - ParticipantAccessor participantAccessor = participantAccessor(clusterName); - ParticipantId participantId = ParticipantId.from(scopeArgs[1]); - userConfig = participantAccessor.readUserConfig(participantId); - removeKeysFromUserConfig(userConfig, keys); - participantAccessor.setUserConfig(participantId, userConfig); - break; - case PARTITION: - ResourceAccessor resourcePartitionAccessor = resourceAccessor(clusterName); - PartitionId partitionId = PartitionId.from(scopeArgs[1]); - userConfig = resourcePartitionAccessor.readUserConfig(partitionId.getResourceId()); - removePartitionFromResourceUserConfig(userConfig, partitionId, keys); - resourcePartitionAccessor.setUserConfig(partitionId.getResourceId(), userConfig); - break; - } - } - - private void removeKeysFromUserConfig(UserConfig userConfig, String[] keys) { - Map simpleFields = Maps.newHashMap(userConfig.getSimpleFields()); - for (String key : keys) { - simpleFields.remove(key); - } - userConfig.setSimpleFields(simpleFields); - } - - private void removePartitionFromResourceUserConfig(UserConfig userConfig, - PartitionId partitionId, String[] keys) { - Map fields = Maps.newHashMap(userConfig.getMapField(partitionId.stringify())); - for (String key : keys) { - fields.remove(key); - } - userConfig.setMapField(partitionId.stringify(), fields); - } - - void swapParticipants(String[] optValues) { - String clusterName = optValues[0]; - String oldParticipantName = optValues[1]; - String newParticipantName = optValues[2]; - ParticipantAccessor accessor = participantAccessor(clusterName); - accessor.swapParticipants(ParticipantId.from(oldParticipantName), - ParticipantId.from(newParticipantName)); - } - - void resetPartition(String[] optValues) { - String clusterName = optValues[0]; - String participantName = optValues[1]; - String resourceName = optValues[2]; - String partitionName = optValues[3]; - - Set partitionIds = ImmutableSet.of(PartitionId.from(partitionName)); - ParticipantAccessor accessor = participantAccessor(clusterName); - accessor.resetPartitionsForParticipant(ParticipantId.from(participantName), - ResourceId.from(resourceName), partitionIds); - } - - void resetResource(String[] optValues) { - String clusterName = optValues[0]; - String resourceName = optValues[1]; - Set resourceIds = ImmutableSet.of(ResourceId.from(resourceName)); - ResourceAccessor accessor = resourceAccessor(clusterName); - accessor.resetResources(resourceIds); - } - - void resetParticipant(String[] optValues) { - String clusterName = optValues[0]; - String participantName = optValues[1]; - Set participantIds = ImmutableSet.of(ParticipantId.from(participantName)); - ParticipantAccessor accessor = participantAccessor(clusterName); - accessor.resetParticipants(participantIds); - } - - void expandResource(String[] optValues) { - String clusterName = optValues[0]; - String resourceName = optValues[1]; - expandResource(ClusterId.from(clusterName), ResourceId.from(resourceName)); - } - - void expandCluster(String[] optValues) { - String clusterName = optValues[0]; - ClusterAccessor accessor = clusterAccessor(clusterName); - Cluster cluster = accessor.readCluster(); - for (ResourceId resourceId : cluster.getResourceMap().keySet()) { - expandResource(ClusterId.from(clusterName), resourceId); - } - } - - private void expandResource(ClusterId clusterId, ResourceId resourceId) { - ResourceAccessor accessor = resourceAccessor(clusterId.stringify()); - Resource resource = accessor.readResource(resourceId); - SemiAutoRebalancerConfig config = - BasicRebalancerConfig.convert(resource.getRebalancerConfig(), - SemiAutoRebalancerConfig.class); - if (config == null) { - LOG.info("Only SEMI_AUTO mode supported for resource expansion"); - return; - } - if (config.anyLiveParticipant()) { - LOG.info("Resource uses ANY_LIVE_PARTICIPANT, skipping default assignment"); - return; - } - if (config.getPreferenceLists().size() == 0) { - LOG.info("No preference lists have been set yet, skipping default assignment"); - return; - } - accessor.generateDefaultAssignment(resourceId, -1, null); - } - - static int processCommandLineArgs(String[] cliArgs) { - CommandLineParser cliParser = new GnuParser(); - Options cliOptions = constructCommandLineOptions(); - CommandLine cmd = null; - - try { - cmd = cliParser.parse(cliOptions, cliArgs); - } catch (ParseException pe) { - System.err.println("CommandLineClient: failed to parse command-line options: " + pe); - printUsage(cliOptions); - System.exit(1); - } - - String zkAddr = cmd.getOptionValue(HelixOption.zkSvr.name()); - ZkClient zkclient = null; - - try { - zkclient = - new ZkClient(zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_SESSION_TIMEOUT, - new ZNRecordSerializer()); - - NewClusterSetup setup = new NewClusterSetup(zkclient); - - Option[] options = cmd.getOptions(); - - for (Option option : options) { - if (option.getLongOpt().equals(HelixOption.zkSvr.name())) { - continue; - } - - HelixOption opt = HelixOption.valueOf(option.getLongOpt()); - String[] optValues = cmd.getOptionValues(option.getLongOpt()); - - checkArgNum(opt, optValues); - - switch (opt) { - case listClusters: - setup.listClusters(optValues); - break; - case listResources: - setup.listResources(optValues); - break; - case listInstances: - setup.listParticipants(optValues); - break; - case addCluster: - setup.addCluster(optValues); - break; - case activateCluster: - break; - case dropCluster: - setup.dropCluster(optValues); - break; - case dropResource: - setup.dropResource(optValues); - break; - case addInstance: - setup.addInstance(optValues); - break; - case addResource: - String[] rebalancerModeValues = null; - if (cmd.hasOption(HelixOption.rebalancerMode.name())) { - rebalancerModeValues = cmd.getOptionValues(HelixOption.rebalancerMode.name()); - checkArgNum(HelixOption.rebalancerMode, rebalancerModeValues); - } - String[] bucketSizeValues = null; - if (cmd.hasOption(HelixOption.bucketSize.name())) { - bucketSizeValues = cmd.getOptionValues(HelixOption.bucketSize.name()); - checkArgNum(HelixOption.bucketSize, bucketSizeValues); - } - String[] maxPartitionsPerNodeValues = null; - if (cmd.hasOption(HelixOption.maxPartitionsPerNode.name())) { - maxPartitionsPerNodeValues = - cmd.getOptionValues(HelixOption.maxPartitionsPerNode.name()); - checkArgNum(HelixOption.maxPartitionsPerNode, maxPartitionsPerNodeValues); - } - setup.addResource(optValues, rebalancerModeValues, bucketSizeValues, - maxPartitionsPerNodeValues); - break; - case addStateModelDef: - setup.addStateModelDef(optValues); - break; - case addIdealState: - setup.addIdealState(optValues); - break; - case swapInstance: - setup.swapParticipants(optValues); - break; - case dropInstance: - setup.dropInstance(optValues); - break; - case rebalance: - String[] groupTagValues = null; - if (cmd.hasOption(HelixOption.instanceGroupTag.name())) { - groupTagValues = cmd.getOptionValues(HelixOption.instanceGroupTag.name()); - checkArgNum(HelixOption.instanceGroupTag, groupTagValues); - } - setup.rebalance(optValues, groupTagValues); - break; - case expandCluster: - setup.expandCluster(optValues); - break; - case expandResource: - setup.expandResource(optValues); - break; - case mode: - case rebalancerMode: - case bucketSize: - case maxPartitionsPerNode: - // always used with addResource command - continue; - case instanceGroupTag: - // always used with rebalance command - continue; - case resourceKeyPrefix: - throw new UnsupportedOperationException(HelixOption.resourceKeyPrefix - + " is not supported, please set partition names directly"); - case addResourceProperty: - throw new UnsupportedOperationException(HelixOption.addResourceProperty - + " is not supported, please use setConfig"); - case removeResourceProperty: - throw new UnsupportedOperationException(HelixOption.removeResourceProperty - + " is not supported, please use removeConfig"); - case addInstanceTag: - setup.addInstanceTag(optValues); - break; - case removeInstanceTag: - setup.removeInstanceTag(optValues); - break; - case listClusterInfo: - setup.listClusterInfo(optValues); - break; - case listInstanceInfo: - setup.listParticipantInfo(optValues); - break; - case listResourceInfo: - setup.listResourceInfo(optValues); - break; - case listPartitionInfo: - setup.listPartitionInfo(optValues); - break; - case listStateModels: - setup.listStateModels(optValues); - break; - case listStateModel: - setup.listStateModel(optValues); - break; - case enableInstance: - setup.enableInstance(optValues); - break; - case enablePartition: - setup.enablePartition(optValues); - break; - case enableCluster: - setup.enableCluster(optValues); - break; - case resetPartition: - setup.resetPartition(optValues); - break; - case resetInstance: - setup.resetParticipant(optValues); - break; - case resetResource: - setup.resetResource(optValues); - break; - case getConfig: - setup.getConfig(optValues); - break; - case setConfig: - setup.setConfig(optValues); - break; - case removeConfig: - setup.removeConfig(optValues); - break; - case getConstraints: - setup.getConstraints(optValues); - break; - case setConstraint: - setup.setConstraint(optValues); - break; - case removeConstraint: - setup.removeConstraint(optValues); - break; - default: - System.err.println("Non-recognized option: " + opt); - break; - } - - // process 1 option only - break; - } - - return 0; - } finally { - if (zkclient != null) { - zkclient.close(); - } - } - } - - public static void main(String[] args) { - // if (args.length == 1 && args[0].equals("setup-test-cluster")) { - // System.out - // .println("By default setting up TestCluster with 6 instances, 10 partitions, Each partition will have 3 replicas"); - // new ClusterSetup("localhost:2181").setupTestCluster("TestCluster"); - // System.exit(0); - // } - - int ret = processCommandLineArgs(args); - System.exit(ret); - - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java index 2b94f7b..a35a47b 100644 --- a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java +++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java @@ -168,8 +168,8 @@ public class TestNewStages extends ZkTestBase { .getRebalancerConfig() .getRebalancerRef() .getRebalancer() - .computeResourceMapping(resource.getRebalancerConfig(), null, cluster, - currentStateOutput); + .computeResourceMapping(resource.getIdealState(), resource.getRebalancerConfig(), null, + cluster, currentStateOutput); verifySemiAutoRebalance(resource, semiAutoResult); System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis())); http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java b/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java index 3c8fb2c..a8d1589 100644 --- a/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java +++ b/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java @@ -11,6 +11,7 @@ import org.apache.helix.api.id.ResourceId; import org.apache.helix.api.id.StateModelDefId; import org.apache.helix.controller.rebalancer.config.BasicRebalancerConfig; import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig; +import org.apache.helix.controller.rebalancer.config.PartitionedRebalancerConfig; import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig; import org.testng.Assert; import org.testng.annotations.Test; @@ -93,11 +94,15 @@ public class TestUpdateConfig { UserConfig userConfig = new UserConfig(Scope.resource(resourceId)); userConfig.setSimpleField("key1", "value1"); SemiAutoRebalancerConfig rebalancerContext = - new SemiAutoRebalancerConfig.Builder(resourceId).build(); + new SemiAutoRebalancerConfig.Builder(resourceId).stateModelDefId( + StateModelDefId.from("MasterSlave")).build(); ResourceConfig config = - new ResourceConfig.Builder(resourceId).userConfig(userConfig) - .rebalancerConfig(rebalancerContext).bucketSize(OLD_BUCKET_SIZE).batchMessageMode(true) - .build(); + new ResourceConfig.Builder(resourceId) + .userConfig(userConfig) + .rebalancerConfig(rebalancerContext) + .idealState( + PartitionedRebalancerConfig.rebalancerConfigToIdealState(rebalancerContext, 0, + false)).bucketSize(OLD_BUCKET_SIZE).batchMessageMode(true).build(); // update: overwrite user config, change to full auto rebalancer context, and change the bucket // size http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java index 22904fb..c068132 100644 --- a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java +++ b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java @@ -117,8 +117,7 @@ public class TestAccessorRecreate extends ZkTestBase { Assert.assertTrue(created); // read the participant - ParticipantAccessor participantAccessor = new ParticipantAccessor(clusterId, helixAccessor); - Participant participantSnapshot = participantAccessor.readParticipant(participantId); + Participant participantSnapshot = accessor.readParticipant(participantId); Assert.assertEquals(participantSnapshot.getUserConfig().getIntField(MODIFIER, -1), 1); // create a participant with the same id @@ -133,7 +132,7 @@ public class TestAccessorRecreate extends ZkTestBase { Assert.assertTrue(created2); // read the cluster again - participantSnapshot = participantAccessor.readParticipant(participantId); + participantSnapshot = accessor.readParticipant(participantId); Assert.assertEquals(participantSnapshot.getUserConfig().getIntField(MODIFIER, -1), 2); accessor.dropCluster(); http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java index 096a883..29228e4 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java @@ -30,10 +30,9 @@ import java.util.TreeMap; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; import org.apache.helix.api.State; -import org.apache.helix.api.accessor.ClusterAccessor; -import org.apache.helix.api.id.ClusterId; import org.apache.helix.api.id.MessageId; import org.apache.helix.api.id.ParticipantId; import org.apache.helix.api.id.PartitionId; @@ -50,13 +49,10 @@ import org.apache.helix.model.Message.MessageType; import org.apache.helix.model.ResourceAssignment; import org.apache.helix.testutil.HelixTestUtil; import org.apache.helix.testutil.ZkTestBase; -import org.apache.log4j.Logger; import org.testng.Assert; import org.testng.annotations.Test; public class TestMessageThrottleStage extends ZkTestBase { - private static Logger LOG = Logger.getLogger(TestMessageThrottleStage.class); - final String _className = "TestMessageThrottleStage"; @Test @@ -65,6 +61,7 @@ public class TestMessageThrottleStage extends ZkTestBase { System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor); + TestHelper.setupEmptyCluster(_zkclient, clusterName); HelixManager manager = new DummyClusterManager(clusterName, accessor); // ideal state: node0 is MASTER, node1 is SLAVE @@ -80,9 +77,6 @@ public class TestMessageThrottleStage extends ZkTestBase { }); HelixTestUtil.setupStateModel(_baseAccessor, clusterName); - ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor); - clusterAccessor.initClusterStructure(); - ClusterEvent event = new ClusterEvent("testEvent"); event.addAttribute("helixmanager", manager); @@ -145,6 +139,8 @@ public class TestMessageThrottleStage extends ZkTestBase { System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor); + TestHelper.setupEmptyCluster(_zkclient, clusterName); + HelixManager manager = new DummyClusterManager(clusterName, accessor); // ideal state: node0 is MASTER, node1 is SLAVE @@ -160,9 +156,6 @@ public class TestMessageThrottleStage extends ZkTestBase { }); HelixTestUtil.setupStateModel(_baseAccessor, clusterName); - ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor); - clusterAccessor.initClusterStructure(); - // setup constraints ZNRecord record = new ZNRecord(ConstraintType.MESSAGE_CONSTRAINT.toString()); http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java index a5cc614..922dde6 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java @@ -29,8 +29,6 @@ import org.apache.helix.PropertyKey.Builder; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; import org.apache.helix.api.State; -import org.apache.helix.api.accessor.ClusterAccessor; -import org.apache.helix.api.id.ClusterId; import org.apache.helix.api.id.PartitionId; import org.apache.helix.api.id.ResourceId; import org.apache.helix.api.id.SessionId; @@ -45,21 +43,18 @@ import org.apache.helix.model.Message.Attributes; import org.apache.helix.testutil.HelixTestUtil; import org.apache.helix.testutil.TestUtil; import org.apache.helix.testutil.ZkTestBase; -import org.apache.log4j.Logger; import org.testng.Assert; import org.testng.annotations.Test; public class TestRebalancePipeline extends ZkTestBase { - private static Logger LOG = Logger.getLogger(TestMessageThrottleStage.class); - @Test public void testDuplicateMsg() { String clusterName = TestUtil.getTestName(); System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); - HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, _baseAccessor); + TestHelper.setupEmptyCluster(_zkclient, clusterName); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor); HelixManager manager = new DummyClusterManager(clusterName, accessor); ClusterEvent event = new ClusterEvent("testEvent"); @@ -82,9 +77,6 @@ public class TestRebalancePipeline extends ZkTestBase { }); HelixTestUtil.setupStateModel(_baseAccessor, clusterName); - ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor); - clusterAccessor.initClusterStructure(); - // cluster data cache refresh pipeline Pipeline dataRefresh = new Pipeline(); dataRefresh.addStage(new ReadClusterDataStage()); @@ -139,16 +131,14 @@ public class TestRebalancePipeline extends ZkTestBase { String clusterName = TestUtil.getTestName(); System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); - HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, _baseAccessor); + TestHelper.setupEmptyCluster(_zkclient, clusterName); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor); final String resourceName = "testResource_dup"; String[] resourceGroups = new String[] { resourceName }; - TestHelper.setupEmptyCluster(_zkclient, clusterName); - // ideal state: node0 is MASTER, node1 is SLAVE // replica=2 means 1 master and 1 slave HelixTestUtil.setupIdealState(_baseAccessor, clusterName, new int[] { @@ -162,9 +152,6 @@ public class TestRebalancePipeline extends ZkTestBase { 0, 1 }); - ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor); - clusterAccessor.initClusterStructure(); - ClusterControllerManager controller = new ClusterControllerManager(_zkaddr, clusterName, "controller_0"); controller.syncStart(); @@ -216,8 +203,8 @@ public class TestRebalancePipeline extends ZkTestBase { String clusterName = TestUtil.getTestName(); System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); - HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, _baseAccessor); + TestHelper.setupEmptyCluster(_zkclient, clusterName); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor); HelixManager manager = new DummyClusterManager(clusterName, accessor); ClusterEvent event = new ClusterEvent("testEvent"); event.addAttribute("helixmanager", manager); @@ -242,9 +229,6 @@ public class TestRebalancePipeline extends ZkTestBase { }); HelixTestUtil.setupStateModel(_baseAccessor, clusterName); - ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor); - clusterAccessor.initClusterStructure(); - // cluster data cache refresh pipeline Pipeline dataRefresh = new Pipeline(); dataRefresh.addStage(new ReadClusterDataStage()); @@ -326,8 +310,8 @@ public class TestRebalancePipeline extends ZkTestBase { System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); - HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, _baseAccessor); + TestHelper.setupEmptyCluster(_zkclient, clusterName); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor); HelixManager manager = new DummyClusterManager(clusterName, accessor); ClusterEvent event = new ClusterEvent("testEvent"); event.addAttribute("helixmanager", manager); @@ -349,9 +333,6 @@ public class TestRebalancePipeline extends ZkTestBase { }); HelixTestUtil.setupStateModel(_baseAccessor, clusterName); - ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor); - clusterAccessor.initClusterStructure(); - // cluster data cache refresh pipeline Pipeline dataRefresh = new Pipeline(); dataRefresh.addStage(new ReadClusterDataStage()); @@ -407,8 +388,7 @@ public class TestRebalancePipeline extends ZkTestBase { private void setCurrentState(String clusterName, String instance, String resourceGroupName, String resourceKey, String sessionId, String state) { - ZKHelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, _baseAccessor); + ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseAccessor); Builder keyBuilder = accessor.keyBuilder(); CurrentState curState = new CurrentState(resourceGroupName); http://git-wip-us.apache.org/repos/asf/helix/blob/ce1e926c/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java index 90ea393..04a25e9 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java @@ -191,12 +191,6 @@ public class TestResourceComputationStage extends BaseStageTest { .getStateModelDefId(), currentState.getStateModelDefId()); AssertJUnit.assertEquals(resourceMap.get(oldResourceId).getSubUnitSet().size(), currentState .getTypedPartitionStateMap().size()); - AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getSubUnit( - PartitionId.from("testResourceOld_0"))); - AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getSubUnit( - PartitionId.from("testResourceOld_1"))); - AssertJUnit.assertNotNull(resourceMap.get(oldResourceId).getSubUnit( - PartitionId.from("testResourceOld_2"))); }