myriad-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From smare...@apache.org
Subject incubator-myriad git commit: Myriad 162 This corrects how offers are handled when using the framework role parameter.
Date Thu, 05 Nov 2015 00:33:13 GMT
Repository: incubator-myriad
Updated Branches:
  refs/heads/master 87264ba5f -> 2e14b4ba3


Myriad 162 This corrects how offers are handled when using the framework role parameter.

Prior to this fix if Myriad recieved an offer with resource from the default role and it's
declared
role it incorrectly declared all cpu and memory as used by the default role.
This let to TASK_LOST errors, which the logs stated were a result of insufficient resources.


Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/2e14b4ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/2e14b4ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/2e14b4ba

Branch: refs/heads/master
Commit: 2e14b4ba362607e6fd4cd22adf0973a83ab25416
Parents: 87264ba
Author: DarinJ <darinj@apache.org>
Authored: Tue Nov 3 23:43:28 2015 -0500
Committer: Santosh Marella <marella@gmail.com>
Committed: Wed Nov 4 16:32:07 2015 -0800

----------------------------------------------------------------------
 .../scheduler/ServiceTaskFactoryImpl.java       | 12 ++---
 .../apache/myriad/scheduler/TaskFactory.java    | 51 ++++++++------------
 .../org/apache/myriad/scheduler/TaskUtils.java  | 40 +++++++++++++++
 .../scheduler/fgs/YarnNodeCapacityManager.java  | 15 +++---
 .../fgs/YarnNodeCapacityManagerSpec.groovy      |  9 +++-
 5 files changed, 78 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/2e14b4ba/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java
index 725e405..076ebbb 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java
@@ -34,7 +34,6 @@ import org.apache.mesos.Protos.Resource;
 import org.apache.mesos.Protos.TaskID;
 import org.apache.mesos.Protos.TaskInfo;
 import org.apache.mesos.Protos.Value;
-import org.apache.mesos.Protos.Value.Scalar;
 import org.apache.myriad.configuration.MyriadConfiguration;
 import org.apache.myriad.configuration.MyriadExecutorConfiguration;
 import org.apache.myriad.configuration.ServiceConfiguration;
@@ -125,14 +124,11 @@ public class ServiceTaskFactoryImpl implements TaskFactory {
 
     LOGGER.info("Command line for service: {} is: {}", nodeTask.getTaskPrefix(), strB.toString());
 
-    Scalar taskMemory = Scalar.newBuilder().setValue(nodeTask.getProfile().getMemory()).build();
-    Scalar taskCpus = Scalar.newBuilder().setValue(nodeTask.getProfile().getCpus()).build();
-
     TaskInfo.Builder taskBuilder = TaskInfo.newBuilder();
 
-    taskBuilder.setName(nodeTask.getTaskPrefix()).setTaskId(taskId).setSlaveId(offer.getSlaveId()).addResources(
-        Resource.newBuilder().setName("cpus").setType(Value.Type.SCALAR).setScalar(taskCpus).build()).addResources(
-        Resource.newBuilder().setName("mem").setType(Value.Type.SCALAR).setScalar(taskMemory).build());
+    taskBuilder.setName(nodeTask.getTaskPrefix()).setTaskId(taskId).setSlaveId(offer.getSlaveId())
+        .addAllResources(taskUtils.getScalarResource(offer, "cpus", nodeTask.getProfile().getCpus(),
0.0))
+        .addAllResources(taskUtils.getScalarResource(offer, "mem", nodeTask.getProfile().getMemory(),
0.0));
 
     if (additionalPortsNumbers != null && !additionalPortsNumbers.isEmpty()) {
       // set ports
@@ -216,7 +212,7 @@ public class ServiceTaskFactoryImpl implements TaskFactory {
     }
     final List<Long> returnedPorts = new ArrayList<>();
     for (Resource resource : offer.getResourcesList()) {
-      if (resource.getName().equals("ports")) {
+      if (resource.getName().equals("ports") && (!resource.hasRole() || resource.getRole().equals("*")))
{
         Iterator<Value.Range> itr = resource.getRanges().getRangeList().iterator();
         while (itr.hasNext()) {
           Value.Range range = itr.next();

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/2e14b4ba/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java
index 391cb32..4990d42 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java
@@ -37,7 +37,6 @@ import org.apache.mesos.Protos.TaskID;
 import org.apache.mesos.Protos.TaskInfo;
 import org.apache.mesos.Protos.Value;
 import org.apache.mesos.Protos.Value.Range;
-import org.apache.mesos.Protos.Value.Scalar;
 import org.apache.myriad.configuration.MyriadConfiguration;
 import org.apache.myriad.configuration.MyriadExecutorConfiguration;
 import org.apache.myriad.state.NodeTask;
@@ -90,7 +89,7 @@ public interface TaskFactory {
     @VisibleForTesting
     protected static HashSet<Long> getNMPorts(Resource resource) {
       HashSet<Long> ports = new HashSet<>();
-      if (resource.getName().equals("ports")){
+      if (resource.getName().equals("ports")) {
         /*
         ranges.getRangeList() returns a list of ranges, each range specifies a begin and
end only.
         so must loop though each range until we get all ports needed.  We exit each loop
as soon as all
@@ -122,7 +121,7 @@ public interface TaskFactory {
     protected static NMPorts getPorts(Offer offer) {
       HashSet<Long> ports = new HashSet<>();
       for (Resource resource : offer.getResourcesList()) {
-        if (resource.getName().equals("ports")) {
+        if (resource.getName().equals("ports") && (!resource.hasRole() || resource.getRole().equals("*")))
{
           ports = getNMPorts(resource);
           break;
         }
@@ -179,8 +178,8 @@ public interface TaskFactory {
       LOGGER.debug(ports.toString());
 
       ServiceResourceProfile serviceProfile = nodeTask.getProfile();
-      Scalar taskMemory = Scalar.newBuilder().setValue(serviceProfile.getAggregateMemory()).build();
-      Scalar taskCpus = Scalar.newBuilder().setValue(serviceProfile.getAggregateCpu()).build();
+      Double taskMemory = serviceProfile.getAggregateMemory();
+      Double taskCpus = serviceProfile.getAggregateCpu();
 
       CommandInfo commandInfo = getCommandInfo(serviceProfile, ports);
       ExecutorInfo executorInfo = getExecutorInfoForSlave(frameworkId, offer, commandInfo);
@@ -188,39 +187,27 @@ public interface TaskFactory {
       TaskInfo.Builder taskBuilder = TaskInfo.newBuilder().setName("task-" + taskId.getValue()).setTaskId(taskId).setSlaveId(
           offer.getSlaveId());
 
-      return taskBuilder.addResources(Resource.newBuilder().setName("cpus").setType(Value.Type.SCALAR).setScalar(taskCpus).build())
-          .addResources(Resource.newBuilder().setName("mem").setType(Value.Type.SCALAR).setScalar(taskMemory).build())
-          .addResources(Resource.newBuilder().setName("ports").setType(Value.Type.RANGES).setRanges(
-              Value.Ranges.newBuilder().addRange(Value.Range.newBuilder()
-                  .setBegin(ports.getRpcPort())
-                  .setEnd(ports.getRpcPort())
-                  .build()).addRange(Value.Range.newBuilder()
-                  .setBegin(ports.getLocalizerPort())
-                  .setEnd(ports.getLocalizerPort())
-                  .build()).addRange(Value.Range.newBuilder()
-                  .setBegin(ports.getWebAppHttpPort())
-                  .setEnd(ports.getWebAppHttpPort())
-                  .build()).addRange(Value.Range.newBuilder()
-                  .setBegin(ports.getShufflePort())
-                  .setEnd(ports.getShufflePort())
-                  .build())))
+      return taskBuilder
+          .addAllResources(taskUtils.getScalarResource(offer, "cpus", taskCpus, taskUtils.getExecutorCpus()))
+          .addAllResources(taskUtils.getScalarResource(offer, "mem", taskMemory, taskUtils.getExecutorMemory()))
+          .addResources(Resource.newBuilder().setName("ports").setType(Value.Type.RANGES).setRanges(Value.Ranges.newBuilder()
+              .addRange(Range.newBuilder().setBegin(ports.getRpcPort()).setEnd(ports.getRpcPort()).build())
+              .addRange(Range.newBuilder().setBegin(ports.getLocalizerPort()).setEnd(ports.getLocalizerPort()).build())
+              .addRange(Range.newBuilder().setBegin(ports.getWebAppHttpPort()).setEnd(ports.getWebAppHttpPort()).build())
+              .addRange(Range.newBuilder().setBegin(ports.getShufflePort()).setEnd(ports.getShufflePort()).build())))
           .setExecutor(executorInfo)
           .build();
     }
 
     @Override
     public ExecutorInfo getExecutorInfoForSlave(FrameworkID frameworkId, Offer offer, CommandInfo
commandInfo) {
-      Scalar executorMemory = Scalar.newBuilder().setValue(taskUtils.getExecutorMemory()).build();
-      Scalar executorCpus = Scalar.newBuilder().setValue(taskUtils.getExecutorCpus()).build();
-
-      ExecutorID executorId = ExecutorID.newBuilder().setValue(EXECUTOR_PREFIX + frameworkId.getValue()
+
-                                                               offer.getId().getValue() +
offer.getSlaveId().getValue()).build();
-      return ExecutorInfo.newBuilder().setCommand(commandInfo).setName(EXECUTOR_NAME).addResources(Resource.newBuilder().setName(
-          "cpus").setType(Value.Type.SCALAR).setScalar(executorCpus).build()).addResources(Resource.newBuilder()
-          .setName("mem")
-          .setType(Value.Type.SCALAR)
-          .setScalar(executorMemory)
-          .build()).setExecutorId(executorId).build();
+      ExecutorID executorId = ExecutorID.newBuilder()
+          .setValue(EXECUTOR_PREFIX + frameworkId.getValue() + offer.getId().getValue() +
offer.getSlaveId().getValue())
+          .build();
+      return ExecutorInfo.newBuilder().setCommand(commandInfo).setName(EXECUTOR_NAME).setExecutorId(executorId)
+          .addAllResources(taskUtils.getScalarResource(offer, "cpus", taskUtils.getExecutorCpus(),
0.0))
+          .addAllResources(taskUtils.getScalarResource(offer, "mem", taskUtils.getExecutorMemory(),
0.0))
+          .build();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/2e14b4ba/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java
index f632d84..9b18d18 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java
@@ -22,6 +22,7 @@ import com.google.common.base.Optional;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.StringWriter;
+import java.util.ArrayList;
 import javax.inject.Inject;
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
@@ -38,6 +39,8 @@ import javax.xml.xpath.XPathConstants;
 import javax.xml.xpath.XPathExpression;
 import javax.xml.xpath.XPathExpressionException;
 import javax.xml.xpath.XPathFactory;
+
+import org.apache.mesos.Protos;
 import org.apache.myriad.configuration.MyriadBadConfigurationException;
 import org.apache.myriad.configuration.MyriadConfiguration;
 import org.apache.myriad.configuration.MyriadExecutorConfiguration;
@@ -203,6 +206,43 @@ public class TaskUtils {
       throw new MyriadBadConfigurationException("memory is not defined for task with name:
" + taskName);
     }
     return auxConf.getJvmMaxMemoryMB().get();
+  }
 
+  /**
+   * Helper function that returns all scalar resources of a given name in an offer up to
a given value.  Attempts to
+   * take resource from the prescribed role first and then from the default role.  The variable
used indicated any
+   * resources previously requested.   Assumes enough resources are present.
+   * @param offer - An offer by Mesos, assumed to have enough resources.
+   * @param name  - The name of the SCALAR resource, i.e. cpus or mem
+   * @param value - The amount of SCALAR resources needed.
+   * @param used - The amount of SCALAR resources already removed from this offer.
+   * @return An Iterable containing one or two scalar resources of a given name in an offer
up to a given value.
+   */
+  public Iterable<Protos.Resource> getScalarResource(Protos.Offer offer, String name,
Double value, Double used) {
+    String role = cfg.getFrameworkRole();
+    ArrayList<Protos.Resource> resources = new ArrayList<>();
+    double resourceDoubleValue = 0;
+    //Find role by name, must loop through resources
+    for (Protos.Resource r : offer.getResourcesList()) {
+      if (r.getName().equals(name) && r.hasRole() && r.getRole().equals(role)
&& r.hasScalar()) {
+        resourceDoubleValue = r.getScalar().getValue();
+        if (resourceDoubleValue - used > 0) {
+          resources.add(Protos.Resource.newBuilder().setName(name).setType(Protos.Value.Type.SCALAR)
+              .setScalar(Protos.Value.Scalar.newBuilder().setValue(Math.min(value, resourceDoubleValue
- used)).build())
+              .setRole(role).build());
+        }
+        break;
+      } else if (r.getName().equals(name) && r.hasRole() && r.getRole().equals(role))
{
+        //Should never get here, there must be a miss configured slave
+        LOGGER.warn("Resource with name: " + name + "expected type to be SCALAR check configuration
on: " + offer.getHostname());
+      }
+    }
+    //Assume enough resources are present in default value, if not we shouldn't of gotten
to this function.
+    if (value - (resourceDoubleValue - used) > 0) {
+      resources.add(Protos.Resource.newBuilder().setName(name).setType(Protos.Value.Type.SCALAR)
+          .setScalar(Protos.Value.Scalar.newBuilder().setValue(value - (resourceDoubleValue
- used)).build())
+          .build()); //no role assumes default
+    }
+    return resources;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/2e14b4ba/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
index f058d42..15f4b47 100644
--- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
+++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java
@@ -44,6 +44,7 @@ import org.apache.myriad.configuration.NodeManagerConfiguration;
 import org.apache.myriad.executor.ContainerTaskStatusRequest;
 import org.apache.myriad.scheduler.MyriadDriver;
 import org.apache.myriad.scheduler.SchedulerUtils;
+import org.apache.myriad.scheduler.TaskUtils;
 import org.apache.myriad.scheduler.yarn.interceptor.BaseInterceptor;
 import org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry;
 import org.apache.myriad.state.SchedulerState;
@@ -70,11 +71,11 @@ public class YarnNodeCapacityManager extends BaseInterceptor {
   private final OfferLifecycleManager offerLifecycleMgr;
   private final NodeStore nodeStore;
   private final SchedulerState state;
-
+  private TaskUtils taskUtils;
   @Inject
   public YarnNodeCapacityManager(InterceptorRegistry registry, AbstractYarnScheduler yarnScheduler,
RMContext rmContext,
                                  MyriadDriver myriadDriver, OfferLifecycleManager offerLifecycleMgr,
NodeStore nodeStore,
-                                 SchedulerState state) {
+                                 SchedulerState state, TaskUtils taskUtils) {
     if (registry != null) {
       registry.register(this);
     }
@@ -84,6 +85,7 @@ public class YarnNodeCapacityManager extends BaseInterceptor {
     this.offerLifecycleMgr = offerLifecycleMgr;
     this.nodeStore = nodeStore;
     this.state = state;
+    this.taskUtils = taskUtils;
   }
 
   @Override
@@ -220,13 +222,10 @@ public class YarnNodeCapacityManager extends BaseInterceptor {
     }
 
     return Protos.TaskInfo.newBuilder()
-        .setName("task_" + taskId.getValue())
-        .setTaskId(taskId)
+        .setName("task_" + taskId.getValue()).setTaskId(taskId)
         .setSlaveId(offer.getSlaveId())
-        .addResources(Protos.Resource.newBuilder().setName("cpus").setType(Protos.Value.Type.SCALAR).setScalar(
-            Protos.Value.Scalar.newBuilder().setValue(container.getResource().getVirtualCores())))
-        .addResources(Protos.Resource.newBuilder().setName("mem").setType(Protos.Value.Type.SCALAR).setScalar(
-            Protos.Value.Scalar.newBuilder().setValue(container.getResource().getMemory())))
+        .addAllResources(taskUtils.getScalarResource(offer, "cpus", (double) container.getResource().getVirtualCores(),
0.0))
+        .addAllResources(taskUtils.getScalarResource(offer, "mem", (double) container.getResource().getMemory(),
0.0))
         .setExecutor(executorInfo)
         .build();
   }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/2e14b4ba/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
index bdebef2..57e6384 100644
--- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
+++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerSpec.groovy
@@ -22,7 +22,9 @@ import org.apache.hadoop.yarn.api.records.ContainerState
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent
 import org.apache.hadoop.yarn.util.resource.Resources
 import org.apache.mesos.Protos
+import org.apache.myriad.configuration.MyriadConfiguration
 import org.apache.myriad.configuration.NodeManagerConfiguration
+import org.apache.myriad.scheduler.TaskUtils
 import org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry
 import org.apache.myriad.state.NodeTask
 import org.apache.myriad.state.SchedulerState
@@ -128,8 +130,13 @@ class YarnNodeCapacityManagerSpec extends FGSTestBaseSpec {
         def state = Mock(SchedulerState) {
             getNodeTask(_, NodeManagerConfiguration.NM_TASK_PREFIX) >> nodeTask
         }
+        def cfg = Mock(MyriadConfiguration) {
+            getFrameworkRole() >> "some_role"
+        }
+        print(cfg.getFrameworkRole())
+        def taskUtils = new TaskUtils(cfg)
         return new YarnNodeCapacityManager(registry, yarnScheduler, rmContext,
-                myriadDriver, offerLifecycleManager, nodeStore, state)
+                myriadDriver, offerLifecycleManager, nodeStore, state, taskUtils)
 
     }
 }


Mime
View raw message