helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject git commit: [HELIX-109] Review Helix model package, fix failing tests
Date Tue, 10 Sep 2013 22:10:28 GMT
Updated Branches:
  refs/heads/helix-logical-model 73b82fc05 -> aaf9fd57b


[HELIX-109] Review Helix model package, fix failing tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/aaf9fd57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/aaf9fd57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/aaf9fd57

Branch: refs/heads/helix-logical-model
Commit: aaf9fd57bc482fe90ddd39627b7a57d905e3fa73
Parents: 73b82fc
Author: zzhang <zzhang@apache.org>
Authored: Tue Sep 10 15:10:14 2013 -0700
Committer: zzhang <zzhang@apache.org>
Committed: Tue Sep 10 15:10:14 2013 -0700

----------------------------------------------------------------------
 .../org/apache/helix/api/ClusterAccessor.java   |  2 +-
 .../apache/helix/api/ParticipantAccessor.java   |  2 +-
 .../util/NewConstraintBasedAssignment.java      |  4 +-
 .../stages/NewBestPossibleStateOutput.java      |  5 +++
 .../stages/NewResourceComputationStage.java     |  2 +
 .../stages/NewTaskAssignmentStage.java          |  2 +-
 .../builder/ResourceAssignmentBuilder.java      |  7 ++--
 .../helix/tools/ClusterStateVerifier.java       | 41 +++++++++-----------
 .../stages/TestRebalancePipeline.java           | 28 +++++++++++++
 9 files changed, 62 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/aaf9fd57/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
index a4a79a0..7fcbc37 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
@@ -185,7 +185,7 @@ public class ClusterAccessor {
       LiveInstance liveInstance = liveInstanceMap.get(participantName);
       Map<String, Message> instanceMsgMap = messageMap.get(participantName);
 
-      ParticipantId participantId = new ParticipantId(participantName);
+      ParticipantId participantId = Id.participant(participantName);
 
       participantMap.put(participantId, ParticipantAccessor.createParticipant(participantId,
           instanceConfig, liveInstance, instanceMsgMap, currentStateMap.get(participantName)));

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/aaf9fd57/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
index da2c433..b9d06ef 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
@@ -292,7 +292,7 @@ public class ParticipantAccessor {
     if (disabledPartitions != null) {
       disabledPartitionIdSet = new HashSet<PartitionId>();
       for (String partitionId : disabledPartitions) {
-        disabledPartitionIdSet.add(new PartitionId(PartitionId.extractResourceId(partitionId),
+        disabledPartitionIdSet.add(Id.partition(PartitionId.extractResourceId(partitionId),
             PartitionId.stripResourceId(partitionId)));
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/aaf9fd57/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
index 07856ca..69d9fcf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/NewConstraintBasedAssignment.java
@@ -61,7 +61,9 @@ public class NewConstraintBasedAssignment {
         Sets.filter(participantSet, new Predicate<ParticipantId>() {
           @Override
           public boolean apply(ParticipantId participantId) {
-            return participantMap.get(participantId).getDisablePartitionIds().contains(partitionId);
+            Participant participant = participantMap.get(participantId);
+            return !participant.isEnabled()
+                || participant.getDisablePartitionIds().contains(partitionId);
           }
         });
     return disabledParticipantsForPartition;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/aaf9fd57/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
index 8c5005d..fbdea55 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
@@ -40,4 +40,9 @@ public class NewBestPossibleStateOutput {
   public Set<ResourceId> getAssignedResources() {
     return _resourceAssignmentMap.keySet();
   }
+
+  @Override
+  public String toString() {
+    return _resourceAssignmentMap.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/aaf9fd57/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
index a6d9db4..2c42e8f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
@@ -65,6 +65,8 @@ public class NewResourceComputationStage extends AbstractBaseStage {
       resourceBuilder.rebalancerConfig(rebalancerConfig);
       Set<Partition> partitionSet = new HashSet<Partition>(resource.getPartitionMap().values());
       resourceBuilder.addPartitions(partitionSet);
+      resourceBuilder.bucketSize(resource.getBucketSize());
+      resourceBuilder.batchMessageMode(resource.getBatchMessageMode());
       resourceBuilderMap.put(resourceId, resourceBuilder);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/aaf9fd57/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
index 31a0614..b259847 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
@@ -141,7 +141,7 @@ public class NewTaskAssignmentStage extends AbstractBaseStage {
 
       // System.out.println("[dbg] Sending Message " + message.getMsgId() + " to "
       // + message.getTgtName() + " transit " + message.getPartitionId() + "|"
-      // + message.getPartitionId() + " from: " + message.getFromState() + " to: "
+      // + message.getPartitionIds() + " from: " + message.getFromState() + " to: "
       // + message.getToState());
 
       keys.add(keyBuilder.message(message.getTgtName(), message.getId()));

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/aaf9fd57/helix-core/src/main/java/org/apache/helix/model/builder/ResourceAssignmentBuilder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/builder/ResourceAssignmentBuilder.java
b/helix-core/src/main/java/org/apache/helix/model/builder/ResourceAssignmentBuilder.java
index 735dbae..9b0f18a 100644
--- a/helix-core/src/main/java/org/apache/helix/model/builder/ResourceAssignmentBuilder.java
+++ b/helix-core/src/main/java/org/apache/helix/model/builder/ResourceAssignmentBuilder.java
@@ -52,11 +52,10 @@ public class ResourceAssignmentBuilder {
    */
   public ResourceAssignmentBuilder addAssignments(PartitionId partitionId,
       Map<ParticipantId, State> replicaMap) {
-    if (_mapping.containsKey(partitionId)) {
-      _mapping.get(partitionId).putAll(replicaMap);
-    } else {
-      _mapping.put(partitionId, replicaMap);
+    if (!_mapping.containsKey(partitionId)) {
+      _mapping.put(partitionId, new HashMap<ParticipantId, State>());
     }
+    _mapping.get(partitionId).putAll(replicaMap);
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/aaf9fd57/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index b371c6a..1defc9f 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -48,8 +48,10 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.ClusterAccessor;
 import org.apache.helix.api.Id;
+import org.apache.helix.api.ParticipantId;
 import org.apache.helix.api.PartitionId;
 import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.State;
 import org.apache.helix.api.StateModelDefId;
 import org.apache.helix.controller.pipeline.Stage;
 import org.apache.helix.controller.pipeline.StageContext;
@@ -72,6 +74,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.builder.ResourceAssignmentBuilder;
 import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.store.PropertyJsonComparator;
@@ -275,36 +278,29 @@ public class ClusterStateVerifier {
       // calculate best possible state
       NewBestPossibleStateOutput bestPossOutput =
           ClusterStateVerifier.calcBestPossState(cluster, convertedDefs);
-      Map<String, Map<String, Map<String, String>>> bestPossStateMap =
-          new HashMap<String, Map<String, Map<String, String>>>();
-      for (ResourceId resourceId : bestPossOutput.getAssignedResources()) {
-        ResourceAssignment resourceAssignment = bestPossOutput.getResourceAssignment(resourceId);
-        Map<String, Map<String, String>> resourceMap = new HashMap<String,
Map<String, String>>();
-        for (PartitionId partitionId : resourceAssignment.getMappedPartitions()) {
-          Map<String, String> replicaMap =
-              ResourceAssignment.stringMapFromReplicaMap(resourceAssignment
-                  .getReplicaMap(partitionId));
-          resourceMap.put(partitionId.stringify(), replicaMap);
-        }
-        bestPossStateMap.put(resourceId.stringify(), resourceMap);
-      }
 
       // set error states
       if (errStates != null) {
         for (String resourceName : errStates.keySet()) {
+          ResourceId resourceId = Id.resource(resourceName);
           Map<String, String> partErrStates = errStates.get(resourceName);
+          ResourceAssignment resourceAssignment = bestPossOutput.getResourceAssignment(resourceId);
+
+          ResourceAssignmentBuilder raBuilder = new ResourceAssignmentBuilder(resourceId);
+          List<PartitionId> mappedPartitions = resourceAssignment.getMappedPartitions();
+          for (PartitionId partitionId : mappedPartitions) {
+            raBuilder.addAssignments(partitionId, resourceAssignment.getReplicaMap(partitionId));
+          }
+
           for (String partitionName : partErrStates.keySet()) {
             String instanceName = partErrStates.get(partitionName);
-
-            if (!bestPossStateMap.containsKey(resourceName)) {
-              bestPossStateMap.put(resourceName, new HashMap<String, Map<String, String>>());
-            }
-            if (!bestPossStateMap.get(resourceName).containsKey(partitionName)) {
-              bestPossStateMap.get(resourceName).put(partitionName, new HashMap<String,
String>());
-            }
-            bestPossStateMap.get(resourceName).get(partitionName)
-                .put(instanceName, HelixDefinedState.ERROR.toString());
+            PartitionId partitionId = Id.partition(partitionName);
+            ParticipantId participantId = Id.participant(instanceName);
+            raBuilder.addAssignment(partitionId, participantId,
+                new State(HelixDefinedState.ERROR.toString()));
           }
+          bestPossOutput.setResourceAssignment(resourceId, raBuilder.build());
+
         }
       }
 
@@ -464,7 +460,6 @@ public class ClusterStateVerifier {
     NewBestPossibleStateOutput output =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
 
-    // System.out.println("output:" + output);
     return output;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/aaf9fd57/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index a3f38ea..44c6510 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -37,6 +37,7 @@ import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.Attributes;
 import org.apache.log4j.Logger;
@@ -68,6 +69,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     setupIdealState(clusterName, new int[] {
         0, 1
     }, resourceGroups, 1, 2);
+    setupInstances(clusterName, new int[] {
+        0, 1
+    });
     setupLiveInstances(clusterName, new int[] {
         0, 1
     });
@@ -212,6 +216,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     setupIdealState(clusterName, new int[] {
         0, 1
     }, resourceGroups, 1, 2);
+    setupInstances(clusterName, new int[] {
+        0, 1
+    });
     setupLiveInstances(clusterName, new int[] {
         0, 1
     });
@@ -308,6 +315,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     setupIdealState(clusterName, new int[] {
         0, 1
     }, resourceGroups, 1, 2);
+    setupInstances(clusterName, new int[] {
+      1
+    });
     setupLiveInstances(clusterName, new int[] {
       1
     });
@@ -344,6 +354,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
 
     // round2: updates node0 currentState to SLAVE but keep the
     // message, make sure controller should not send S->M until removal is done
+    setupInstances(clusterName, new int[] {
+      0
+    });
     setupLiveInstances(clusterName, new int[] {
       0
     });
@@ -373,4 +386,19 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     curState.setStateModelDefRef("MasterSlave");
     accessor.setProperty(keyBuilder.currentState(instance, sessionId, resourceGroupName),
curState);
   }
+
+  @Override
+  protected void setupInstances(String clusterName, int[] instances) {
+    ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    Builder keyBuilder = accessor.keyBuilder();
+    for (int i = 0; i < instances.length; i++) {
+      String instance = "localhost_" + instances[i];
+      InstanceConfig instanceConfig = new InstanceConfig(instance);
+      instanceConfig.setHostName("localhost");
+      instanceConfig.setPort("" + instances[i]);
+      instanceConfig.setInstanceEnabled(true);
+      accessor.setProperty(keyBuilder.instanceConfig(instance), instanceConfig);
+    }
+  }
 }


Mime
View raw message