helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject git commit: [HELIX-243] Fix failing tests related to helix model overhaul
Date Mon, 16 Sep 2013 17:21:20 GMT
Updated Branches:
  refs/heads/helix-logical-model 0a8baa12f -> 01ed356f7


[HELIX-243] Fix failing tests related to helix model overhaul


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/01ed356f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/01ed356f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/01ed356f

Branch: refs/heads/helix-logical-model
Commit: 01ed356f7157e095877d40e591f5a2abd9cd597d
Parents: 0a8baa1
Author: zzhang <zzhang@apache.org>
Authored: Mon Sep 16 10:20:38 2013 -0700
Committer: zzhang <zzhang@apache.org>
Committed: Mon Sep 16 10:20:38 2013 -0700

----------------------------------------------------------------------
 .../org/apache/helix/api/RebalancerConfig.java  | 79 ++++++++++++++++++++
 .../java/org/apache/helix/api/Resource.java     | 23 +++---
 .../stages/NewBestPossibleStateCalcStage.java   | 26 +++----
 .../stages/NewMessageGenerationStage.java       |  8 +-
 .../stages/NewResourceComputationStage.java     | 36 ++++++---
 .../java/org/apache/helix/model/Message.java    |  3 +
 6 files changed, 137 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/01ed356f/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
index 1816d91..ec61533 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
@@ -64,6 +64,28 @@ public class RebalancerConfig extends NamespacedConfig {
    * @param resourceId the resource to rebalance
    * @param rebalancerMode the mode to rebalance with
    * @param stateModelDefId the state model that the resource uses
+   * @param stateModelFacotryId the state model factory that the resource uses
+   * @param partitionMap partitions of the resource
+   */
+  public RebalancerConfig(ResourceId resourceId, RebalanceMode rebalancerMode,
+      StateModelDefId stateModelDefId, StateModelFactoryId stateModelFactoryId,
+      Map<PartitionId, Partition> partitionMap) {
+    super(resourceId, RebalancerConfig.class.getSimpleName());
+    _resourceId = resourceId;
+    _fieldsSet =
+        ImmutableSet.copyOf(Lists.transform(Arrays.asList(Fields.values()),
+            Functions.toStringFunction()));
+    setEnumField(Fields.REBALANCE_MODE.toString(), rebalancerMode);
+    setSimpleField(Fields.STATE_MODEL_DEFINITION.toString(), stateModelDefId.stringify());
+    setSimpleField(Fields.STATE_MODEL_FACTORY.name(), stateModelFactoryId.stringify());
+    _partitionMap = ImmutableMap.copyOf(partitionMap);
+  }
+
+  /**
+   * Instantiate a RebalancerConfig.
+   * @param resourceId the resource to rebalance
+   * @param rebalancerMode the mode to rebalance with
+   * @param stateModelDefId the state model that the resource uses
    * @param partitionMap partitions of the resource
    */
   public RebalancerConfig(ResourceId resourceId, RebalanceMode rebalancerMode,
@@ -461,4 +483,61 @@ public class RebalancerConfig extends NamespacedConfig {
      */
     public abstract RebalancerConfig build();
   }
+
+  /**
+   * Simple non-mode builder for rebalancer config
+   */
+  public static class SimpleBuilder {
+    private final ResourceId _resourceId;
+    private StateModelDefId _stateModelDefId;
+    private StateModelFactoryId _stateModelFactoryId;
+    private final Map<PartitionId, Partition> _partitionMap;
+
+    /**
+     * Construct with a resource-id
+     * @param resourceId
+     */
+    public SimpleBuilder(ResourceId resourceId) {
+      _resourceId = resourceId;
+      _partitionMap = new HashMap<PartitionId, Partition>();
+    }
+
+    /**
+     * Set state model definition id
+     * @param stateModelDefId
+     * @return
+     */
+    public SimpleBuilder stateModelDefId(StateModelDefId stateModelDefId) {
+      _stateModelDefId = stateModelDefId;
+      return this;
+    }
+
+    /**
+     * Add a partition that the resource serves
+     * @param partition fully-qualified partition
+     * @return Builder
+     */
+    public SimpleBuilder addPartition(Partition partition) {
+      _partitionMap.put(partition.getId(), partition);
+      return this;
+    }
+
+    /**
+     * Set state model factory
+     * @param stateModelFactoryId
+     * @return Builder
+     */
+    public SimpleBuilder stateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
+      _stateModelFactoryId = stateModelFactoryId;
+      return this;
+    }
+
+    /**
+     * Build a rebalancer config
+     * @return
+     */
+    public RebalancerConfig build() {
+      return new RebalancerConfig(_resourceId, RebalanceMode.NONE, _stateModelDefId, _partitionMap);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/01ed356f/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index c598550..f5c44e7 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -131,9 +131,6 @@ public class Resource {
    * @return scheduler-task config or null if state-model-def is not SchedulerTaskQueue
    */
   SchedulerTaskConfig schedulerTaskConfig(IdealState idealState) {
-    if (!idealState.getStateModelDefId().equalsIgnoreCase(StateModelDefId.SchedulerTaskQueue))
{
-      return null;
-    }
 
     // TODO refactor get timeout
     Map<String, Integer> transitionTimeoutMap = new HashMap<String, Integer>();
@@ -150,17 +147,21 @@ public class Resource {
     }
 
     Map<PartitionId, Message> innerMsgMap = new HashMap<PartitionId, Message>();
-    for (PartitionId partitionId : idealState.getPartitionSet()) {
-      // TODO refactor: scheduler-task-queue state model uses map-field to store inner-messages
-      // this is different from all other state-models
-      Map<String, String> innerMsgStrMap =
-          idealState.getRecord().getMapField(partitionId.stringify());
-      if (innerMsgStrMap != null) {
-        Message innerMsg = Message.toMessage(innerMsgStrMap);
-        innerMsgMap.put(partitionId, innerMsg);
+    if (idealState.getStateModelDefId().equalsIgnoreCase(StateModelDefId.SchedulerTaskQueue))
{
+      for (PartitionId partitionId : idealState.getPartitionSet()) {
+        // TODO refactor: scheduler-task-queue state model uses map-field to store inner-messages
+        // this is different from all other state-models
+        Map<String, String> innerMsgStrMap =
+            idealState.getRecord().getMapField(partitionId.stringify());
+        if (innerMsgStrMap != null) {
+          Message innerMsg = Message.toMessage(innerMsgStrMap);
+          innerMsgMap.put(partitionId, innerMsg);
+        }
       }
     }
 
+    // System.out.println("transitionTimeoutMap: " + transitionTimeoutMap);
+    // System.out.println("innerMsgMap: " + innerMsgMap);
     return new SchedulerTaskConfig(transitionTimeoutMap, innerMsgMap);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/01ed356f/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
index 7c4341b..a89707b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
@@ -117,19 +117,19 @@ public class NewBestPossibleStateCalcStage extends AbstractBaseStage
{
       LOG.debug("Processing resource:" + resourceId);
       // Resource may be gone. In that case we need to get the state model name
       // from the current state
-      if (cluster.getResource(resourceId) == null) {
-        // if resource is deleted, then we do not know which rebalancer to use
-        // instead, just mark all partitions of the resource as dropped
-        if (LOG.isInfoEnabled()) {
-          LOG.info("resource:" + resourceId + " does not exist anymore");
-        }
-        StateModelDefinition stateModelDef =
-            stateModelDefs.get(currentStateOutput.getResourceStateModelDef(resourceId));
-        ResourceAssignment droppedAssignment =
-            mapDroppedResource(cluster, resourceId, currentStateOutput, stateModelDef);
-        output.setResourceAssignment(resourceId, droppedAssignment);
-        continue;
-      }
+      // if (cluster.getResource(resourceId) == null) {
+      // // if resource is deleted, then we do not know which rebalancer to use
+      // // instead, just mark all partitions of the resource as dropped
+      // if (LOG.isInfoEnabled()) {
+      // LOG.info("resource:" + resourceId + " does not exist anymore");
+      // }
+      // StateModelDefinition stateModelDef =
+      // stateModelDefs.get(currentStateOutput.getResourceStateModelDef(resourceId));
+      // ResourceAssignment droppedAssignment =
+      // mapDroppedResource(cluster, resourceId, currentStateOutput, stateModelDef);
+      // output.setResourceAssignment(resourceId, droppedAssignment);
+      // continue;
+      // }
 
       ResourceConfig resourceConfig = resourceMap.get(resourceId);
       RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/01ed356f/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
index c74d577..76362bc 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationStage.java
@@ -146,10 +146,10 @@ public class NewMessageGenerationStage extends AbstractBaseStage {
                     StateModelDefId.SchedulerTaskQueue)) {
               if (resourceConfig.getPartitionMap().size() > 0) {
                 // TODO refactor it -- we need a way to read in scheduler tasks a priori
-                Resource activeResource = cluster.getResource(resourceId);
-                if (activeResource != null) {
-                  message.setInnerMessage(activeResource.getSchedulerTaskConfig().getInnerMessage(
-                      partitionId));
+                Message innerMsg =
+                    resourceConfig.getSchedulerTaskConfig().getInnerMessage(partitionId);
+                if (innerMsg != null) {
+                  message.setInnerMessage(innerMsg);
                 }
               }
             }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/01ed356f/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
index 8e3006f..62768e0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
@@ -70,6 +70,9 @@ public class NewResourceComputationStage extends AbstractBaseStage {
     }
 
     // include all partitions from CurrentState as well since idealState might be removed
+    Map<ResourceId, RebalancerConfig.SimpleBuilder> rebalancerConfigBuilderMap =
+        new HashMap<ResourceId, RebalancerConfig.SimpleBuilder>();
+
     for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
       for (ResourceId resourceId : liveParticipant.getCurrentStateMap().keySet()) {
         CurrentState currentState = liveParticipant.getCurrentStateMap().get(resourceId);
@@ -84,28 +87,41 @@ public class NewResourceComputationStage extends AbstractBaseStage {
 
         // don't overwrite ideal state configs
         if (!resourceBuilderMap.containsKey(resourceId)) {
-          Map<PartitionId, Partition> partitionMap = new HashMap<PartitionId, Partition>();
-          for (PartitionId partitionId : currentState.getPartitionStateMap().keySet()) {
-            partitionMap.put(partitionId, new Partition(partitionId));
+          if (!rebalancerConfigBuilderMap.containsKey(resourceId)) {
+            RebalancerConfig.SimpleBuilder rebalancerConfigBuilder =
+                new RebalancerConfig.SimpleBuilder(resourceId);
+            rebalancerConfigBuilder.stateModelDefId(currentState.getStateModelDefId());
+            rebalancerConfigBuilder.stateModelFactoryId(Id.stateModelFactory(currentState
+                .getStateModelFactoryName()));
+            rebalancerConfigBuilderMap.put(resourceId, rebalancerConfigBuilder);
           }
-          RebalancerConfig rebalancerConfig =
-              new RebalancerConfig(resourceId, RebalanceMode.NONE,
-                  currentState.getStateModelDefId(), partitionMap);
-          rebalancerConfig.setStateModelFactoryId(Id.stateModelFactory(currentState
-              .getStateModelFactoryName()));
+
           ResourceConfig.Builder resourceBuilder = new ResourceConfig.Builder(resourceId);
-          resourceBuilder.rebalancerConfig(rebalancerConfig);
           resourceBuilder.bucketSize(currentState.getBucketSize());
           resourceBuilder.batchMessageMode(currentState.getBatchMessageMode());
           resourceBuilderMap.put(resourceId, resourceBuilder);
         }
+
+        // add all partitions in current-state
+        if (rebalancerConfigBuilderMap.containsKey(resourceId)) {
+          RebalancerConfig.SimpleBuilder rebalancerConfigBuilder = rebalancerConfigBuilderMap.get(resourceId);
+          for (PartitionId partitionId : currentState.getPartitionStateMap().keySet()) {
+            rebalancerConfigBuilder.addPartition(new Partition(partitionId));
+          }
+        }
       }
+
     }
 
     // convert builder-map to resource-map
     Map<ResourceId, ResourceConfig> resourceMap = new LinkedHashMap<ResourceId,
ResourceConfig>();
     for (ResourceId resourceId : resourceBuilderMap.keySet()) {
-      resourceMap.put(resourceId, resourceBuilderMap.get(resourceId).build());
+      ResourceConfig.Builder resourceConfigBuilder = resourceBuilderMap.get(resourceId);
+      if (rebalancerConfigBuilderMap.containsKey(resourceId)) {
+        RebalancerConfig.SimpleBuilder rebalancerConfigBuilder = rebalancerConfigBuilderMap.get(resourceId);
+        resourceConfigBuilder.rebalancerConfig(rebalancerConfigBuilder.build());
+      }
+      resourceMap.put(resourceId, resourceConfigBuilder.build());
     }
 
     event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/01ed356f/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index 189c239..5704b83 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.InstanceType;
@@ -582,6 +583,8 @@ public class Message extends HelixProperty {
   public void setStateModelFactoryId(StateModelFactoryId factoryId) {
     if (factoryId != null) {
       setStateModelFactoryName(factoryId.stringify());
+    } else {
+      setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
     }
   }
 


Mime
View raw message