Return-Path: X-Original-To: apmail-myriad-commits-archive@minotaur.apache.org Delivered-To: apmail-myriad-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CAA6D18584 for ; Fri, 25 Sep 2015 22:02:34 +0000 (UTC) Received: (qmail 82158 invoked by uid 500); 25 Sep 2015 22:02:34 -0000 Delivered-To: apmail-myriad-commits-archive@myriad.apache.org Received: (qmail 82129 invoked by uid 500); 25 Sep 2015 22:02:34 -0000 Mailing-List: contact commits-help@myriad.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@myriad.incubator.apache.org Delivered-To: mailing list commits@myriad.incubator.apache.org Received: (qmail 82114 invoked by uid 99); 25 Sep 2015 22:02:34 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Sep 2015 22:02:34 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 2BED6C3BF8 for ; Fri, 25 Sep 2015 22:02:34 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.782 X-Spam-Level: * X-Spam-Status: No, score=1.782 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=0.001, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id F81f3mE4RFlk for ; Fri, 25 Sep 2015 22:02:32 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 8739E439DC for ; Fri, 25 Sep 2015 22:02:28 +0000 (UTC) Received: (qmail 81580 invoked by uid 99); 25 Sep 2015 22:02:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Sep 2015 22:02:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C1FA7E04C9; Fri, 25 Sep 2015 22:02:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mohit@apache.org To: commits@myriad.incubator.apache.org Date: Fri, 25 Sep 2015 22:02:39 -0000 Message-Id: <73e49db7ca054ff4867185c62617020d@git.apache.org> In-Reply-To: <2fe1ca9b45b04e28b97bd09e979fd9ff@git.apache.org> References: <2fe1ca9b45b04e28b97bd09e979fd9ff@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [13/28] incubator-myriad git commit: Additional changes for getting Myriad HA to work Additional changes for getting Myriad HA to work * Myriad Executor + NM (merged) now sends TASK_RUNNING and TASK_FINISHED messages to mesos for Mesos tasks corresponding to yarn containers. This is independent of the RM. * Entire ExecutorInfo object for NM tasks is being preserved and recovered from the state store. This is being done because mesos requires all tasks run on the same executor to have the same executor info objects. The Myriad Executor + NM (merged) also runs tasks corresponding to yarn containers. These tasks also need to be provided the same ExecutorInfo object. This ExecutorInfo object cannot be obtained across an RM restart without being preserved into the state store. Made code changes to store ExecutorInfo into the scheduler state and serialize and deserialize it to the state store. * Made sure that the RM's view of NM capacity is updated correctly after an RM restart. RM's view is not regenerated atomically, so assumptions about data being available are not always true. Fixed a few NullPointerExceptions here. Testing done * Run a job with one node using Course Grain Scaling(CGS) and one flexed up node using Fine Grained Scaling(FGS). On completion of the job kill RM. RM launches on another node. Delete output directory of first job and execute same job again. THis tests 1. That the RM successfully recovers the list of NM Tasks that it has launched before restart. 2. The executorInfo is stored and retrieved from the state store. * Run a long running job. Kill the RM while the job is running. RM launches on another node and the job continues progress. 1. That the RM successfully recovers the list of NM Tasks that it has launched before restart. 2. The executorInfo is stored and retrieved from the state store. 3. RM recovers and job makes forward progress. Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/0fa49c26 Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/0fa49c26 Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/0fa49c26 Branch: refs/heads/phase1 Commit: 0fa49c26bc0492ef4b69a85c219eab53f1cc7f0a Parents: 23e01ec Author: Swapnil Daingade Authored: Sat Aug 15 05:23:32 2015 -0700 Committer: Swapnil Daingade Committed: Sat Aug 29 11:41:33 2015 -0700 ---------------------------------------------------------------------- .../executor/MyriadExecutorAuxService.java | 28 ++++++++- .../handlers/ResourceOffersEventHandler.java | 10 ++++ .../scheduler/fgs/NMHeartBeatHandler.java | 60 ++++---------------- .../scheduler/fgs/YarnNodeCapacityManager.java | 7 ++- .../java/com/ebay/myriad/state/NodeTask.java | 13 +++++ .../com/ebay/myriad/state/SchedulerState.java | 14 ++++- .../myriad/state/utils/ByteBufferSupport.java | 30 ++++++++++ 7 files changed, 109 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0fa49c26/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java ---------------------------------------------------------------------- diff --git a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java index 2c7d87d..a6d126a 100644 --- a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java +++ b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java @@ -20,12 +20,17 @@ package com.ebay.myriad.executor; import java.nio.ByteBuffer; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; import org.apache.hadoop.yarn.server.api.AuxiliaryService; +import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; import org.apache.mesos.MesosExecutorDriver; import org.apache.mesos.Protos.Status; +import org.apache.mesos.Protos.TaskState; +import org.apache.mesos.Protos.TaskStatus; +import org.apache.mesos.Protos; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +42,9 @@ public class MyriadExecutorAuxService extends AuxiliaryService { private static final Logger LOGGER = LoggerFactory.getLogger(MyriadExecutor.class); private static final String SERVICE_NAME = "myriad_service"; + public static final String YARN_CONTAINER_TASK_ID_PREFIX = "yarn_"; + + private MesosExecutorDriver driver; protected MyriadExecutorAuxService() { super(SERVICE_NAME); @@ -48,7 +56,7 @@ public class MyriadExecutorAuxService extends AuxiliaryService { new Thread(new Runnable() { public void run() { - MesosExecutorDriver driver = new MesosExecutorDriver(new MyriadExecutor()); + driver = new MesosExecutorDriver(new MyriadExecutor()); LOGGER.error("MyriadExecutor exit with status " + Integer.toString(driver.run() == Status.DRIVER_STOPPED ? 0 : 1)); } @@ -72,4 +80,22 @@ public class MyriadExecutorAuxService extends AuxiliaryService { return null; } + @Override + public void stopContainer(ContainerTerminationContext stopContainerContext) { + sendStatus(stopContainerContext.getContainerId(), TaskState.TASK_FINISHED); + } + + private void sendStatus(ContainerId containerId, TaskState taskState) { + Protos.TaskID taskId = Protos.TaskID.newBuilder() + .setValue(YARN_CONTAINER_TASK_ID_PREFIX + containerId.toString()) + .build(); + + TaskStatus status = TaskStatus.newBuilder() + .setTaskId(taskId) + .setState(taskState) + .build(); + driver.sendStatusUpdate(status); + LOGGER.debug("Sent status " + taskState + " for taskId " + taskId); + } + } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0fa49c26/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java index 51730ac..915bd2f 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java @@ -94,6 +94,7 @@ public class ResourceOffersEventHandler implements EventHandler offerIds = new ArrayList<>(); offerIds.add(offer.getId()); List tasks = new ArrayList<>(); @@ -104,6 +105,15 @@ public class ResourceOffersEventHandler implements EventHandler entry : tasks.entrySet()) { + if (entry.getValue().getSlaveId() != null && + entry.getValue().getSlaveId().equals(slaveId)) { + return entry.getValue(); + } + } + return null; + } + public Set getStagingTaskIds() { return this.stagingTasks; } @@ -226,7 +238,7 @@ public class SchedulerState { updateStateStore(); } - private void updateStateStore() { + public void updateStateStore() { if (!isMyriadStateStore()) { return; } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/0fa49c26/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java index e1081f0..3d8d57e 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/utils/ByteBufferSupport.java @@ -116,6 +116,12 @@ public class ByteBufferSupport { } else { size += INT_SIZE; } + + if (nt.getExecutorInfo() != null) { + size += nt.getExecutorInfo().getSerializedSize() + INT_SIZE; + } else { + size += INT_SIZE; + } // Allocate and populate the buffer. ByteBuffer bb = createBuffer(size); @@ -123,6 +129,7 @@ public class ByteBufferSupport { putBytes(bb, hostname); putBytes(bb, getSlaveBytes(nt)); putBytes(bb, getTaskBytes(nt)); + putBytes(bb, getExecutorInfoBytes(nt)); // Make sure the buffer is at the beginning bb.rewind(); return bb; @@ -170,6 +177,7 @@ public class ByteBufferSupport { nt.setHostname(toString(bb)); nt.setSlaveId(toSlaveId(bb)); nt.setTaskStatus(toTaskStatus(bb)); + nt.setExecutorInfo(toExecutorInfo(bb)); } return nt; } @@ -182,6 +190,14 @@ public class ByteBufferSupport { } } + public static byte[] getExecutorInfoBytes(NodeTask nt) { + if (nt.getExecutorInfo() != null) { + return nt.getExecutorInfo().toByteArray(); + } else { + return ZERO_BYTES; + } + } + public static byte[] getSlaveBytes(NodeTask nt) { if (nt.getSlaveId() != null) { return nt.getSlaveId().toByteArray(); @@ -272,6 +288,20 @@ public class ByteBufferSupport { } } + public static Protos.ExecutorInfo toExecutorInfo(ByteBuffer bb) { + int size = bb.getInt(); + if (size > 0) { + try { + return Protos.ExecutorInfo.parseFrom(getBytes(bb, size)); + } catch (Exception e) { + throw new RuntimeException("ByteBuffer not in expected format," + + " failed to parse ExecutorInfo bytes", e); + } + } else { + return null; + } + } + public static ByteBuffer fillBuffer(byte src[]) { ByteBuffer bb = createBuffer(src.length); bb.put(src);