helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [1/2] fixing compile errors
Date Wed, 20 Nov 2013 22:33:29 GMT
Updated Branches:
  refs/heads/helix-yarn 1ec06f55f -> 925b7e94e


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/925b7e94/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
index 17592b7..054367f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java
@@ -3,125 +3,116 @@
  */
 package org.apache.helix.integration.task;
 
-
 import java.util.Map;
+
 import org.apache.helix.HelixManager;
-import org.apache.helix.InstanceType;
-import org.apache.helix.TestHelper;
-import org.apache.helix.ZkHelixTestManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.mock.participant.DummyProcess;
 import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.task.*;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.WorkflowContext;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
 
-
 /**
  * Static test utility methods.
- *
  * @author Abe <asebasti@linkedin.com>
  * @version $Revision$
  */
-public class TestUtil
-{
+public class TestUtil {
   private static final Logger LOG = Logger.getLogger(TestUtil.class);
 
-  public static TestHelper.StartCMResult startDummyProcess(String zkAddr,
-                                                            String clusterName,
-                                                            String instanceName,
-                                                            Map<String, TaskFactory>
taskFactoryMap)
-      throws Exception
-  {
-    TestHelper.StartCMResult result = new TestHelper.StartCMResult();
-    ZkHelixTestManager manager = new ZkHelixTestManager(clusterName,
-                                                        instanceName,
-                                                        InstanceType.PARTICIPANT,
-                                                        zkAddr);
-    result._manager = manager;
-    Thread thread = new Thread(new MockInstanceThread(manager, instanceName, taskFactoryMap));
-    result._thread = thread;
-    thread.start();
+  public static MockParticipantManager startDummyProcess(String zkAddr, String clusterName,
+      String instanceName, Map<String, TaskFactory> taskFactoryMap) throws Exception
{
+    // TestHelper.StartCMResult result = new TestHelper.StartCMResult();
+    // ZkHelixTestManager manager =
+    // new ZkHelixTestManager(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddr);
+    // result._manager = manager;
+    MockParticipantManager participant =
+        new MockParticipantManager(zkAddr, clusterName, instanceName);
+    // Thread thread = new Thread(new MockInstanceThread(participant, instanceName,
+    // taskFactoryMap));
+    // result._thread = thread;
+    // thread.start();
+    // Register a Task state model factory.
+    participant.getStateMachineEngine().registerStateModelFactory("Task",
+        new TaskStateModelFactory(participant, taskFactoryMap));
+    participant.syncStart();
 
-    return result;
+    return participant;
   }
 
   /**
-   * Polls {@link org.apache.helix.task.TaskContext} for given task resource until a timeout
is reached.
+   * Polls {@link org.apache.helix.task.TaskContext} for given task resource until a timeout
is
+   * reached.
    * If the task has not reached target state by then, an error is thrown
-   *
    * @param workflowResource Resource to poll for completeness
    * @throws InterruptedException
    */
-  public static void pollForWorkflowState(HelixManager manager, String workflowResource,
TaskState state)
-          throws InterruptedException
-  {
+  public static void pollForWorkflowState(HelixManager manager, String workflowResource,
+      TaskState state) throws InterruptedException {
     // Wait for completion.
     long st = System.currentTimeMillis();
     WorkflowContext ctx;
-    do
-    {
+    do {
       Thread.sleep(100);
       ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
-    }
-    while ((ctx == null || ctx.getWorkflowState() == null || ctx.getWorkflowState() != state)
-            && System.currentTimeMillis() < st + 2 * 60 * 1000 /* 2 mins */);
+    } while ((ctx == null || ctx.getWorkflowState() == null || ctx.getWorkflowState() !=
state)
+        && System.currentTimeMillis() < st + 2 * 60 * 1000 /* 2 mins */);
 
     Assert.assertNotNull(ctx);
     Assert.assertEquals(ctx.getWorkflowState(), state);
   }
 
-  public static void pollForTaskState(HelixManager manager, String workflowResource, String
taskName, TaskState state)
-          throws InterruptedException
-  {
+  public static void pollForTaskState(HelixManager manager, String workflowResource,
+      String taskName, TaskState state) throws InterruptedException {
     // Wait for completion.
     long st = System.currentTimeMillis();
     WorkflowContext ctx;
-    do
-    {
+    do {
       Thread.sleep(100);
       ctx = TaskUtil.getWorkflowContext(manager, workflowResource);
-    }
-    while ((ctx == null || ctx.getTaskState(taskName) == null || ctx.getTaskState(taskName)
!= state)
-            && System.currentTimeMillis() < st + 2 * 60 * 1000 /* 2 mins */);
+    } while ((ctx == null || ctx.getTaskState(taskName) == null || ctx.getTaskState(taskName)
!= state)
+        && System.currentTimeMillis() < st + 2 * 60 * 1000 /* 2 mins */);
 
     Assert.assertNotNull(ctx);
     Assert.assertEquals(ctx.getWorkflowState(), state);
   }
 
-  private static class MockInstanceThread implements Runnable
-  {
+  private static class MockInstanceThread implements Runnable {
     private final HelixManager _manager;
     private final String _instanceName;
     private final Map<String, TaskFactory> _factoryMap;
 
-    public MockInstanceThread(HelixManager manager, String instanceName, Map<String, TaskFactory>
factoryMap)
-    {
+    public MockInstanceThread(HelixManager manager, String instanceName,
+        Map<String, TaskFactory> factoryMap) {
       _manager = manager;
       _instanceName = instanceName;
       _factoryMap = factoryMap;
     }
 
     @Override
-    public void run()
-    {
-      try
-      {
+    public void run() {
+      try {
         StateMachineEngine stateMach = _manager.getStateMachineEngine();
         // Register dummy MasterSlave state model factory.
-        stateMach.registerStateModelFactory("MasterSlave", new DummyProcess.DummyStateModelFactory(0));
+        stateMach.registerStateModelFactory("MasterSlave", new DummyProcess.DummyStateModelFactory(
+            0));
         // Register a Task state model factory.
-        stateMach.registerStateModelFactory("Task", new TaskStateModelFactory(_manager, _factoryMap));
+        stateMach.registerStateModelFactory("Task",
+            new TaskStateModelFactory(_manager, _factoryMap));
 
         _manager.connect();
         Thread.currentThread().join();
-      }
-      catch (InterruptedException e)
-      {
-        LOG.info("participant:" + _instanceName + ", " + Thread.currentThread().getName()
+ " interrupted");
-      }
-      catch (Exception e)
-      {
-        LOG.error("participant:" + _instanceName + ", " + Thread.currentThread().getName()
+ " interrupted", e);
+      } catch (InterruptedException e) {
+        LOG.info("participant:" + _instanceName + ", " + Thread.currentThread().getName()
+            + " interrupted");
+      } catch (Exception e) {
+        LOG.error("participant:" + _instanceName + ", " + Thread.currentThread().getName()
+            + " interrupted", e);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/925b7e94/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancer.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancer.java
b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancer.java
index 44e0762..70700a2 100644
--- a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancer.java
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancer.java
@@ -10,15 +10,17 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.autoscale.StatusProvider;
 import org.apache.helix.autoscale.TargetProvider;
-import org.apache.helix.controller.rebalancer.Rebalancer;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
+import org.apache.helix.controller.rebalancer.HelixRebalancer;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.stages.ResourceCurrentState;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.log4j.Logger;
 
@@ -40,7 +42,7 @@ import com.google.common.collect.Sets;
  * instance = container provider<br/>
  * status = physical container instance presence<br/>
  */
-public class ProviderRebalancer implements Rebalancer {
+public class ProviderRebalancer implements HelixRebalancer {
 
     static final Logger log                 = Logger.getLogger(ProviderRebalancer.class);
 
@@ -61,17 +63,21 @@ public class ProviderRebalancer implements Rebalancer {
     }
 
     @Override
-    public ResourceAssignment computeResourceMapping(String resourceName, IdealState idealState,
CurrentStateOutput currentStateOutput,
-            ClusterDataCache clusterData) {
-
+    public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+        Cluster cluster, ResourceCurrentState currentStateOutput) {
+      PartitionedRebalancerContext context = rebalancerConfig.getRebalancerContext(PartitionedRebalancerContext.class);
+      ResourceId resourceId = context.getResourceId();
+      String resourceName = resourceId.toString();
         final String containerType = resourceName;
 
         final SortedSet<String> allContainers = Sets.newTreeSet(new IndexedNameComparator());
-        allContainers.addAll(idealState.getPartitionSet());
+        for (PartitionId partitionId : context.getPartitionSet()) {
+          allContainers.add(partitionId.toString());
+        }
 
         final SortedSet<String> allProviders = Sets.newTreeSet(new IndexedNameComparator());
-        for (LiveInstance instance : clusterData.getLiveInstances().values()) {
-            allProviders.add(instance.getId());
+        for (ParticipantId instance : cluster.getLiveParticipantMap().keySet()) {
+            allProviders.add(instance.toString());
         }
 
         final ResourceState currentState = new ResourceState(resourceName, currentStateOutput);
@@ -83,13 +89,13 @@ public class ProviderRebalancer implements Rebalancer {
             targetCount = targetProvider.getTargetContainerCount(containerType);
         } catch (Exception e) {
             log.error(String.format("Could not retrieve target count for '%s'", containerType),
e);
-            return new ResourceAssignment(resourceName);
+            return new ResourceAssignment(resourceId);
         }
 
         // provider sanity check
         if (allProviders.isEmpty()) {
             log.warn(String.format("Could not find any providers"));
-            return new ResourceAssignment(resourceName);
+            return new ResourceAssignment(resourceId);
         }
 
         // all containers
@@ -107,25 +113,25 @@ public class ProviderRebalancer implements Rebalancer {
         // assignment
         int maxCountPerProvider = (int) Math.ceil(targetCount / (float) allProviders.size());
 
-        ResourceAssignment assignment = new ResourceAssignment(resourceName);
+        ResourceAssignment assignment = new ResourceAssignment(resourceId);
         CountMap counts = new CountMap(allProviders);
         int assignmentCount = 0;
 
         // currently assigned
         for (String containerName : assignedContainers) {
             String providerName = getProvider(currentState, containerName);
-            Partition partition = new Partition(containerName);
+            PartitionId partition = PartitionId.from(containerName);
 
             if (failedContainers.contains(containerName)) {
                 log.warn(String.format("Container '%s:%s' failed, going offline", providerName,
containerName));
-                assignment.addReplicaMap(partition, Collections.singletonMap(providerName,
"OFFLINE"));
+                assignment.addReplicaMap(partition, Collections.singletonMap(ParticipantId.from(providerName),
State.from("OFFLINE")));
 
             } else if (counts.get(providerName) >= maxCountPerProvider) {
                 log.warn(String.format("Container '%s:%s' misassigned, going offline", providerName,
containerName));
-                assignment.addReplicaMap(partition, Collections.singletonMap(providerName,
"OFFLINE"));
+                assignment.addReplicaMap(partition, Collections.singletonMap(ParticipantId.from(providerName),
State.from("OFFLINE")));
 
             } else {
-                assignment.addReplicaMap(partition, Collections.singletonMap(providerName,
"ONLINE"));
+                assignment.addReplicaMap(partition, Collections.singletonMap(ParticipantId.from(providerName),
State.from("ONLINE")));
             }
 
             counts.increment(providerName);
@@ -142,14 +148,14 @@ public class ProviderRebalancer implements Rebalancer {
                 break;
 
             String providerName = counts.getMinKey();
-            Partition partition = new Partition(containerName);
+            PartitionId partition = PartitionId.from(containerName);
 
             if (failedContainers.contains(containerName)) {
                 log.warn(String.format("Container '%s:%s' failed and unassigned, going offline",
providerName, containerName));
-                assignment.addReplicaMap(partition, Collections.singletonMap(providerName,
"OFFLINE"));
+                assignment.addReplicaMap(partition, Collections.singletonMap(ParticipantId.from(providerName),
State.from("OFFLINE")));
 
             } else {
-                assignment.addReplicaMap(partition, Collections.singletonMap(providerName,
"ONLINE"));
+                assignment.addReplicaMap(partition, Collections.singletonMap(ParticipantId.from(providerName),
State.from("ONLINE")));
             }
 
             counts.increment(providerName);
@@ -165,32 +171,32 @@ public class ProviderRebalancer implements Rebalancer {
     }
 
     boolean hasProvider(ResourceState state, String containerName) {
-        Map<String, String> currentStateMap = state.getCurrentStateMap(containerName);
-        Map<String, String> pendingStateMap = state.getPendingStateMap(containerName);
-        return hasInstance(currentStateMap, "ONLINE") || hasInstance(pendingStateMap, "ONLINE");
+        Map<ParticipantId, State> currentStateMap = state.getCurrentStateMap(containerName);
+        Map<ParticipantId, State> pendingStateMap = state.getPendingStateMap(containerName);
+        return hasInstance(currentStateMap, State.from("ONLINE")) || hasInstance(pendingStateMap,
State.from("ONLINE"));
     }
 
     String getProvider(ResourceState state, String containerName) {
-        Map<String, String> currentStateMap = state.getCurrentStateMap(containerName);
-        if (hasInstance(currentStateMap, "ONLINE"))
-            return getInstance(currentStateMap, "ONLINE");
+        Map<ParticipantId, State> currentStateMap = state.getCurrentStateMap(containerName);
+        if (hasInstance(currentStateMap, State.from("ONLINE")))
+            return getInstance(currentStateMap, State.from("ONLINE")).toString();
 
-        Map<String, String> pendingStateMap = state.getPendingStateMap(containerName);
-        return getInstance(pendingStateMap, "ONLINE");
+        Map<ParticipantId, State> pendingStateMap = state.getPendingStateMap(containerName);
+        return getInstance(pendingStateMap, State.from("ONLINE")).toString();
     }
 
     SortedSet<String> getFailedContainers(ResourceState state, Collection<String>
containers) {
         SortedSet<String> failedContainers = Sets.newTreeSet(new IndexedNameComparator());
         for (String containerName : containers) {
-            Map<String, String> currentStateMap = state.getCurrentStateMap(containerName);
-            Map<String, String> pendingStateMap = state.getPendingStateMap(containerName);
+            Map<ParticipantId, State> currentStateMap = state.getCurrentStateMap(containerName);
+            Map<ParticipantId, State> pendingStateMap = state.getPendingStateMap(containerName);
 
-            if (hasInstance(currentStateMap, "ERROR")) {
+            if (hasInstance(currentStateMap, State.from("ERROR"))) {
                 failedContainers.add(containerName);
                 continue;
             }
 
-            if (!hasInstance(currentStateMap, "ONLINE") || hasInstance(pendingStateMap, "OFFLINE"))
+            if (!hasInstance(currentStateMap, State.from("ONLINE")) || hasInstance(pendingStateMap,
State.from("OFFLINE")))
                 continue;
 
             // container listed online and not in transition, but not active
@@ -214,9 +220,9 @@ public class ProviderRebalancer implements Rebalancer {
         return assignedContainers;
     }
 
-    boolean hasInstance(Map<String, String> stateMap, String state) {
+    boolean hasInstance(Map<ParticipantId, State> stateMap, State state) {
         if (!stateMap.isEmpty()) {
-            for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+            for (Map.Entry<ParticipantId, State> entry : stateMap.entrySet()) {
                 if (entry.getValue().equals(state)) {
                     return true;
                 }
@@ -225,9 +231,9 @@ public class ProviderRebalancer implements Rebalancer {
         return false;
     }
 
-    String getInstance(Map<String, String> stateMap, String state) {
+    ParticipantId getInstance(Map<ParticipantId, State> stateMap, State state) {
         if (!stateMap.isEmpty()) {
-            for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+            for (Map.Entry<ParticipantId, State> entry : stateMap.entrySet()) {
                 if (entry.getValue().equals(state)) {
                     return entry.getKey();
                 }
@@ -332,20 +338,20 @@ public class ProviderRebalancer implements Rebalancer {
     }
 
     class ResourceState {
-        final String             resourceName;
-        final CurrentStateOutput state;
+        final ResourceId             resourceId;
+        final ResourceCurrentState state;
 
-        public ResourceState(String resourceName, CurrentStateOutput state) {
-            this.resourceName = resourceName;
+        public ResourceState(String resourceName, ResourceCurrentState state) {
+            this.resourceId = ResourceId.from(resourceName);
             this.state = state;
         }
 
-        Map<String, String> getCurrentStateMap(String partitionName) {
-            return state.getCurrentStateMap(resourceName, new Partition(partitionName));
+        Map<ParticipantId, State> getCurrentStateMap(String partitionName) {
+            return state.getCurrentStateMap(resourceId, PartitionId.from(partitionName));
         }
 
-        Map<String, String> getPendingStateMap(String partitionName) {
-            return state.getPendingStateMap(resourceName, new Partition(partitionName));
+        Map<ParticipantId, State> getPendingStateMap(String partitionName) {
+            return state.getPendingStateMap(resourceId, PartitionId.from(partitionName));
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/925b7e94/recipes/auto-scale/src/test/java/org/apache/helix/autoscale/HelixYarnTest.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/test/java/org/apache/helix/autoscale/HelixYarnTest.java
b/recipes/auto-scale/src/test/java/org/apache/helix/autoscale/HelixYarnTest.java
index 04bce10..4258b99 100644
--- a/recipes/auto-scale/src/test/java/org/apache/helix/autoscale/HelixYarnTest.java
+++ b/recipes/auto-scale/src/test/java/org/apache/helix/autoscale/HelixYarnTest.java
@@ -6,6 +6,7 @@ import java.util.Properties;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.autoscale.TestUtils;
 import org.apache.helix.autoscale.impl.yarn.YarnContainerProviderProcess;
 import org.apache.helix.autoscale.impl.yarn.YarnStatusProvider;
 import org.apache.helix.integration.task.WorkflowGenerator;


Mime
View raw message