myriad-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mo...@apache.org
Subject [35/50] [abbrv] incubator-myriad git commit: Additional changes for getting Myriad HA to work
Date Fri, 25 Sep 2015 22:46:15 GMT
Additional changes for getting Myriad HA to work

* Myriad Executor + NM (merged) now sends TASK_RUNNING and TASK_FINISHED messages to mesos
for Mesos tasks
  corresponding to yarn containers. This is independent of the RM.
* Entire ExecutorInfo object for NM tasks is being preserved and recovered from the state
store.
  This is being done because mesos requires all tasks run on the same executor to have the
same executor info
  objects. The Myriad Executor + NM (merged) also runs tasks corresponding to yarn containers.
These tasks also
  need to be provided the same ExecutorInfo object. This ExecutorInfo object cannot be obtained
across an RM restart
  without being preserved into the state store. Made code changes to store ExecutorInfo into
the scheduler state and
  serialize and deserialize it to the state store.
* Made sure that the RM's view of NM capacity is updated correctly after an RM restart. RM's
view is not regenerated
  atomically, so assumptions about data being available are not always true. Fixed a few NullPointerExceptions
here.

Testing done

* Run a job with one node using Course Grain Scaling(CGS) and one flexed up node using Fine
Grained Scaling(FGS).
  On completion of the job kill RM. RM launches on another node. Delete output directory of
first job and execute
  same job again.
  THis tests
  1. That the RM successfully recovers the list of NM Tasks that it has launched before restart.
  2. The executorInfo is stored and retrieved from the state store.

* Run a long running job. Kill the RM while the job is running. RM launches on another node
and the job continues
  progress.
  1. That the RM successfully recovers the list of NM Tasks that it has launched before restart.
  2. The executorInfo is stored and retrieved from the state store.
  3. RM recovers and job makes forward progress.


Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/0fa49c26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/0fa49c26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/0fa49c26

Branch: refs/heads/master
Commit: 0fa49c26bc0492ef4b69a85c219eab53f1cc7f0a
Parents: 23e01ec
Author: Swapnil Daingade <sdaingade@maprtech.com>
Authored: Sat Aug 15 05:23:32 2015 -0700
Committer: Swapnil Daingade <sdaingade@maprtech.com>
Committed: Sat Aug 29 11:41:33 2015 -0700

----------------------------------------------------------------------
 .../executor/MyriadExecutorAuxService.java      | 28 ++++++++-
 .../handlers/ResourceOffersEventHandler.java    | 10 ++++
 .../scheduler/fgs/NMHeartBeatHandler.java       | 60 ++++----------------
 .../scheduler/fgs/YarnNodeCapacityManager.java  |  7 ++-
 .../java/com/ebay/myriad/state/NodeTask.java    | 13 +++++
 .../com/ebay/myriad/state/SchedulerState.java   | 14 ++++-
 .../myriad/state/utils/ByteBufferSupport.java   | 30 ++++++++++
 7 files changed, 109 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0fa49c26/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java
----------------------------------------------------------------------
diff --git a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java
b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java
index 2c7d87d..a6d126a 100644
--- a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java
+++ b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java
@@ -20,12 +20,17 @@ package com.ebay.myriad.executor;
 
 import java.nio.ByteBuffer;
 
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
 import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
 import org.apache.hadoop.yarn.server.api.AuxiliaryService;
+import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
 
 import org.apache.mesos.MesosExecutorDriver;
 import org.apache.mesos.Protos.Status;
+import org.apache.mesos.Protos.TaskState;
+import org.apache.mesos.Protos.TaskStatus;
+import org.apache.mesos.Protos;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,6 +42,9 @@ public class MyriadExecutorAuxService  extends AuxiliaryService {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(MyriadExecutor.class);
   private static final String SERVICE_NAME = "myriad_service";
+  public static final String YARN_CONTAINER_TASK_ID_PREFIX = "yarn_";
+
+  private MesosExecutorDriver driver;
 
   protected MyriadExecutorAuxService() {
     super(SERVICE_NAME);
@@ -48,7 +56,7 @@ public class MyriadExecutorAuxService  extends AuxiliaryService {
 
     new Thread(new Runnable() {
       public void run() {
-        MesosExecutorDriver driver = new MesosExecutorDriver(new MyriadExecutor());
+        driver = new MesosExecutorDriver(new MyriadExecutor());
         LOGGER.error("MyriadExecutor exit with status " +
         Integer.toString(driver.run() == Status.DRIVER_STOPPED ? 0 : 1));
       }
@@ -72,4 +80,22 @@ public class MyriadExecutorAuxService  extends AuxiliaryService {
     return null;
   }
 
+  @Override
+  public void stopContainer(ContainerTerminationContext stopContainerContext) {
+    sendStatus(stopContainerContext.getContainerId(), TaskState.TASK_FINISHED);
+  }
+
+  private void sendStatus(ContainerId containerId, TaskState taskState) {
+    Protos.TaskID taskId = Protos.TaskID.newBuilder()
+      .setValue(YARN_CONTAINER_TASK_ID_PREFIX + containerId.toString())
+      .build();
+
+    TaskStatus status = TaskStatus.newBuilder()
+      .setTaskId(taskId)
+      .setState(taskState)
+      .build();
+    driver.sendStatusUpdate(status);
+    LOGGER.debug("Sent status " + taskState + " for taskId " + taskId);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0fa49c26/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
index 51730ac..915bd2f 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
@@ -94,6 +94,7 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv
                 schedulerState.getActiveTasks())) {
               TaskInfo task = taskFactory.createTask(offer, pendingTaskId,
                   taskToLaunch);
+
               List<OfferID> offerIds = new ArrayList<>();
               offerIds.add(offer.getId());
               List<TaskInfo> tasks = new ArrayList<>();
@@ -104,6 +105,15 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv
               driver.launchTasks(offerIds, tasks);
               launchedTaskId = pendingTaskId;
 
+              // TODO (sdaingade) For every NM Task that we launch, we currently
+              // need to backup the ExecutorInfo for that NM Task in the State Store.
+              // Without this, we will not be able to launch tasks corresponding to yarn
+              // containers. This is specially important in case the RM restarts.
+              if (task.hasExecutor() && taskToLaunch.getExecutorInfo() == null) {
+                  taskToLaunch.setExecutorInfo(task.getExecutor());
+                  schedulerState.updateStateStore();
+              }
+
               taskToLaunch.setHostname(offer.getHostname());
               taskToLaunch.setSlaveId(offer.getSlaveId());
               offerMatch = true;

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0fa49c26/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java
index 9fd97ba..47393a4 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java
@@ -1,23 +1,20 @@
 package com.ebay.myriad.scheduler.fgs;
 
-import com.ebay.myriad.executor.ContainerTaskStatusRequest;
 import com.ebay.myriad.scheduler.MyriadDriver;
 import com.ebay.myriad.scheduler.SchedulerUtils;
 import com.ebay.myriad.scheduler.TaskFactory;
 import com.ebay.myriad.scheduler.yarn.interceptor.BaseInterceptor;
 import com.ebay.myriad.scheduler.yarn.interceptor.InterceptorRegistry;
 import com.ebay.myriad.state.SchedulerState;
-import com.google.gson.Gson;
-import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 import javax.inject.Inject;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
@@ -118,8 +115,10 @@ public class NMHeartBeatHandler extends BaseInterceptor {
     RMNode rmNode = context.getRMNodes().get(event.getNodeId());
     String hostName = rmNode.getNodeID().getHost();
 
-    nodeStore.getNode(hostName).snapshotRunningContainers();
-    sendStatusUpdatesToMesosForCompletedContainers(statusEvent);
+    Node host = nodeStore.getNode(hostName);
+    if (host != null) {
+      host.snapshotRunningContainers();
+    }
 
     // New capacity of the node =
     // resources under use on the node (due to previous offers) +
@@ -155,54 +154,17 @@ public class NMHeartBeatHandler extends BaseInterceptor {
     Resource usedResources = Resource.newInstance(0, 0);
     for (ContainerStatus status : statusEvent.getContainers()) {
       if (status.getState() == ContainerState.NEW || status.getState() == ContainerState.RUNNING)
{
-        Resources.addTo(usedResources, yarnScheduler.getRMContainer(status.getContainerId()).getAllocatedResource());
+        RMContainer rmContainer = yarnScheduler.getRMContainer(status.getContainerId());
+        // (sdaingade) This check is needed as RMContainer information may not be populated
+        // immediately after a RM restart.
+        if (rmContainer != null) {
+          Resources.addTo(usedResources, rmContainer.getAllocatedResource());
+        }
       }
     }
     return usedResources;
   }
 
-  private void sendStatusUpdatesToMesosForCompletedContainers(RMNodeStatusEvent statusEvent)
{
-    // Send task update to Mesos
-    Protos.SlaveID slaveId = nodeStore.getNode(statusEvent.getNodeId().getHost()).getSlaveId();
-    for (ContainerStatus status : statusEvent.getContainers()) {
-      ContainerId containerId = status.getContainerId();
-      if (status.getState() == ContainerState.COMPLETE) {
-        requestExecutorToSendTaskStatusUpdate(slaveId, containerId, Protos.TaskState.TASK_FINISHED);
-      } else { // state == NEW | RUNNING
-        requestExecutorToSendTaskStatusUpdate(slaveId, containerId, Protos.TaskState.TASK_RUNNING);
-      }
-    }
-  }
-
-
-  /**
-   * sends a request to executor on the given slave to send back a status update
-   * for the mesos task launched for this container.
-   *
-   * TODO(Santosh):
-   *  Framework messages are unreliable. Try a NM auxiliary service that can help
-   *  send out the status messages from NM itself. NM and MyriadExecutor would need
-   *  to be merged into a single process.
-   *
-   * @param slaveId
-   * @param containerId
-   * @param taskState
-   */
-  private void requestExecutorToSendTaskStatusUpdate(Protos.SlaveID slaveId,
-      ContainerId containerId,
-      Protos.TaskState taskState) {
-    final String mesosTaskId = ContainerTaskStatusRequest.YARN_CONTAINER_TASK_ID_PREFIX +
containerId.toString();
-    if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug("Sending out framework message requesting the executor to send {} status
for task: {}",
-          taskState.name(), mesosTaskId);
-    }
-    ContainerTaskStatusRequest containerTaskStatusRequest = new ContainerTaskStatusRequest();
-    containerTaskStatusRequest.setMesosTaskId(mesosTaskId);
-    containerTaskStatusRequest.setState(taskState.name());
-    myriadDriver.getDriver().sendFrameworkMessage(getExecutorId(slaveId), slaveId,
-        new Gson().toJson(containerTaskStatusRequest).getBytes(Charset.defaultCharset()));
-  }
-
   private Protos.ExecutorID getExecutorId(Protos.SlaveID slaveId) {
     return Protos.ExecutorID.newBuilder().setValue(
         TaskFactory.NMTaskFactoryImpl.EXECUTOR_PREFIX + slaveId.getValue()).build();

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0fa49c26/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
index 497b43d..12bbe73 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
@@ -12,6 +12,7 @@ import com.google.common.collect.Sets;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+
 import javax.inject.Inject;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -196,7 +197,7 @@ public class YarnNodeCapacityManager extends BaseInterceptor {
   public void setNodeCapacity(RMNode rmNode, Resource newCapacity) {
     rmNode.getTotalCapability().setMemory(newCapacity.getMemory());
     rmNode.getTotalCapability().setVirtualCores(newCapacity.getVirtualCores());
-
+    LOGGER.info("Setting capacity for node {} to {}", rmNode.getHostName(), newCapacity);
     // updates the scheduler with the new capacity for the NM.
     // the event is handled by the scheduler asynchronously
     rmContext.getDispatcher().getEventHandler().handle(
@@ -213,10 +214,12 @@ public class YarnNodeCapacityManager extends BaseInterceptor {
         Protos.TaskID taskId = Protos.TaskID.newBuilder()
             .setValue(ContainerTaskStatusRequest.YARN_CONTAINER_TASK_ID_PREFIX + container.getId().toString()).build();
 
+        // TODO (sdaingade) Remove ExecutorInfo from the Node object
+        // as this is now cached in the NodeTask object in scheduler state.
         Protos.ExecutorInfo executorInfo = node.getExecInfo();
         if (executorInfo == null) {
             executorInfo = Protos.ExecutorInfo.newBuilder(
-                taskFactory.getExecutorInfoForSlave(offer.getSlaveId(), null))
+                 state.getNodeTask(offer.getSlaveId()).getExecutorInfo())
                 .setFrameworkId(offer.getFrameworkId()).build();
             node.setExecInfo(executorInfo);
         }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0fa49c26/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java
index 5b8b87d..8191eed 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/NodeTask.java
@@ -32,6 +32,11 @@ public class NodeTask {
     @JsonProperty
     private Protos.TaskStatus taskStatus;
 
+    /**
+     * Mesos executor for this node.
+     */
+    private Protos.ExecutorInfo executorInfo;
+
     public NodeTask(NMProfile profile) {
         this.profile = profile;
         this.hostname = "";
@@ -68,4 +73,12 @@ public class NodeTask {
     public void setTaskStatus(Protos.TaskStatus taskStatus) {
         this.taskStatus = taskStatus;
     }
+
+    public Protos.ExecutorInfo getExecutorInfo() {
+        return executorInfo;
+    }
+
+    public void setExecutorInfo(Protos.ExecutorInfo executorInfo) {
+        this.executorInfo = executorInfo;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0fa49c26/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
index 75503b6..e27e976 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
@@ -31,6 +31,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.SlaveID;
+
 import com.ebay.myriad.state.utils.StoreContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 
@@ -188,6 +190,16 @@ public class SchedulerState {
         return activeNodeTasks;
     }
 
+    public NodeTask getNodeTask(SlaveID slaveId) {
+        for (Map.Entry<Protos.TaskID, NodeTask> entry : tasks.entrySet()) {
+            if (entry.getValue().getSlaveId() != null &&
+                entry.getValue().getSlaveId().equals(slaveId)) {
+                return entry.getValue(); 
+            }
+        }
+        return null;
+    }
+
     public Set<Protos.TaskID> getStagingTaskIds() {
         return this.stagingTasks;
     }
@@ -226,7 +238,7 @@ public class SchedulerState {
         updateStateStore();
     }
 
-    private void updateStateStore() {
+    public void updateStateStore() {
         if (!isMyriadStateStore()) {
             return;
         }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0fa49c26/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
----------------------------------------------------------------------
diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
index e1081f0..3d8d57e 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java
@@ -116,6 +116,12 @@ public class ByteBufferSupport {
     } else {
       size += INT_SIZE;
     }
+    
+    if (nt.getExecutorInfo() != null) {
+        size += nt.getExecutorInfo().getSerializedSize() + INT_SIZE;
+    } else {
+        size += INT_SIZE;
+    }
 
     // Allocate and populate the buffer.
     ByteBuffer bb = createBuffer(size);
@@ -123,6 +129,7 @@ public class ByteBufferSupport {
     putBytes(bb, hostname);
     putBytes(bb, getSlaveBytes(nt));
     putBytes(bb, getTaskBytes(nt));
+    putBytes(bb, getExecutorInfoBytes(nt));
     // Make sure the buffer is at the beginning
     bb.rewind();
     return bb;
@@ -170,6 +177,7 @@ public class ByteBufferSupport {
       nt.setHostname(toString(bb));
       nt.setSlaveId(toSlaveId(bb));
       nt.setTaskStatus(toTaskStatus(bb));
+      nt.setExecutorInfo(toExecutorInfo(bb));
     }
     return nt;
   }
@@ -182,6 +190,14 @@ public class ByteBufferSupport {
     }
   }
 
+  public static byte[] getExecutorInfoBytes(NodeTask nt) {
+    if (nt.getExecutorInfo() != null) {
+      return nt.getExecutorInfo().toByteArray();
+    } else {
+      return ZERO_BYTES;
+    }
+  }
+
   public static byte[] getSlaveBytes(NodeTask nt) {
     if (nt.getSlaveId() != null) {
       return nt.getSlaveId().toByteArray();
@@ -272,6 +288,20 @@ public class ByteBufferSupport {
     }
   }
 
+  public static Protos.ExecutorInfo toExecutorInfo(ByteBuffer bb) {
+    int size = bb.getInt();
+    if (size > 0) {
+      try {
+        return Protos.ExecutorInfo.parseFrom(getBytes(bb, size));
+      } catch (Exception e) {
+        throw new RuntimeException("ByteBuffer not in expected format," +
+          " failed to parse ExecutorInfo bytes", e);
+      }
+    } else {
+      return null;
+    }
+  }
+
   public static ByteBuffer fillBuffer(byte src[]) {
     ByteBuffer bb = createBuffer(src.length);
     bb.put(src);


Mime
View raw message