Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 177EF200BA0 for ; Thu, 29 Sep 2016 20:13:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 16572160AE9; Thu, 29 Sep 2016 18:13:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1CECB160AE3 for ; Thu, 29 Sep 2016 20:13:43 +0200 (CEST) Received: (qmail 80074 invoked by uid 500); 29 Sep 2016 18:13:26 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 76738 invoked by uid 99); 29 Sep 2016 18:13:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 Sep 2016 18:13:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 95C6BE08DD; Thu, 29 Sep 2016 18:13:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aengineer@apache.org To: common-commits@hadoop.apache.org Date: Thu, 29 Sep 2016 18:14:09 -0000 Message-Id: <0e41c0052bd8488199dc4cb6c8c871be@git.apache.org> In-Reply-To: <39da8aa61d8343e2bce9694dc79cd0c6@git.apache.org> References: <39da8aa61d8343e2bce9694dc79cd0c6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [47/50] [abbrv] hadoop git commit: YARN-4205. Add a service for monitoring application life time out. Contributed by Rohith Sharma K S archived-at: Thu, 29 Sep 2016 18:13:46 -0000 YARN-4205. Add a service for monitoring application life time out. Contributed by Rohith Sharma K S Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2ae5a3a5 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2ae5a3a5 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2ae5a3a5 Branch: refs/heads/HDFS-7240 Commit: 2ae5a3a5bf5ea355370469a53eeccff0b5220081 Parents: 1518cb9 Author: Jian He Authored: Thu Sep 29 22:00:31 2016 +0800 Committer: Jian He Committed: Thu Sep 29 22:00:31 2016 +0800 ---------------------------------------------------------------------- .../records/ApplicationSubmissionContext.java | 21 +++ .../api/records/ApplicationTimeoutType.java | 41 +++++ .../hadoop/yarn/conf/YarnConfiguration.java | 6 + .../src/main/proto/yarn_protos.proto | 10 ++ .../pb/ApplicationSubmissionContextPBImpl.java | 83 ++++++++++ .../yarn/api/records/impl/pb/ProtoUtils.java | 19 +++ .../yarn/util/AbstractLivelinessMonitor.java | 32 ++-- .../src/main/resources/yarn-default.xml | 9 + .../hadoop/yarn/api/TestPBImplRecords.java | 2 +- .../resourcemanager/RMActiveServiceContext.java | 16 ++ .../server/resourcemanager/RMAppManager.java | 4 + .../yarn/server/resourcemanager/RMContext.java | 5 + .../server/resourcemanager/RMContextImpl.java | 12 ++ .../server/resourcemanager/RMServerUtils.java | 16 ++ .../server/resourcemanager/ResourceManager.java | 9 + .../server/resourcemanager/rmapp/RMAppImpl.java | 47 +++++- .../rmapp/monitor/RMAppLifetimeMonitor.java | 130 +++++++++++++++ .../rmapp/monitor/RMAppToMonitor.java | 77 +++++++++ .../rmapp/monitor/package-info.java | 28 ++++ .../yarn/server/resourcemanager/MockRM.java | 22 ++- .../rmapp/TestApplicationLifetimeMonitor.java | 165 +++++++++++++++++++ 21 files changed, 738 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ae5a3a5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index 21cd1bb..83f601a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.api.records; +import java.util.Map; import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; @@ -535,4 +536,24 @@ public abstract class ApplicationSubmissionContext { @Public @Unstable public abstract void setReservationID(ReservationId reservationID); + + /** + * Get ApplicationTimeouts of the application. Timeout value is + * in seconds. + * @return all ApplicationTimeouts of the application. + */ + @Public + @Unstable + public abstract Map getApplicationTimeouts(); + + /** + * Set the ApplicationTimeouts for the application in seconds. + * All pre-existing Map entries are cleared before adding the new Map. + * @param applicationTimeouts ApplicationTimeoutss for the + * application + */ + @Public + @Unstable + public abstract void setApplicationTimeouts( + Map applicationTimeouts); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ae5a3a5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationTimeoutType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationTimeoutType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationTimeoutType.java new file mode 100644 index 0000000..edde1b0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationTimeoutType.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Application timeout type. + */ +@Public +@Unstable +public enum ApplicationTimeoutType { + + /** + *

+ * Timeout imposed on overall application life time. It includes actual + * run-time plus non-runtime. Non-runtime delays are time elapsed by scheduler + * to allocate container, time taken to store in RMStateStore and etc. + *

+ * If this is set, then timeout monitoring start from application submission + * time. + */ + LIFETIME; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ae5a3a5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 1421873..4d43357 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1533,6 +1533,12 @@ public class YarnConfiguration extends Configuration { false; + // Configurations for applicaiton life time monitor feature + public static final String RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS = + RM_PREFIX + "application-timeouts.lifetime-monitor.interval-ms"; + + public static final long DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS = + 60000; /** * Interval of time the linux container executor should try cleaning up http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ae5a3a5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 2d6007e..f788295 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -365,6 +365,16 @@ message ApplicationSubmissionContextProto { optional ReservationIdProto reservation_id = 15; optional string node_label_expression = 16; optional ResourceRequestProto am_container_resource_request = 17; + repeated ApplicationTimeoutMapProto application_timeouts = 18; +} + +enum ApplicationTimeoutTypeProto { + APP_TIMEOUT_LIFETIME = 1; +} + +message ApplicationTimeoutMapProto { + optional ApplicationTimeoutTypeProto application_timeout_type = 1; + optional int64 timeout = 2; } message LogAggregationContextProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ae5a3a5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java index 67e3a84..62b54e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java @@ -18,7 +18,11 @@ package org.apache.hadoop.yarn.api.records.impl.pb; +import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -26,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.Priority; @@ -36,6 +41,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; @@ -63,6 +69,7 @@ extends ApplicationSubmissionContext { private ResourceRequest amResourceRequest = null; private LogAggregationContext logAggregationContext = null; private ReservationId reservationId = null; + private Map applicationTimeouts = null; public ApplicationSubmissionContextPBImpl() { builder = ApplicationSubmissionContextProto.newBuilder(); @@ -131,6 +138,9 @@ extends ApplicationSubmissionContext { if (this.reservationId != null) { builder.setReservationId(convertToProtoFormat(this.reservationId)); } + if (this.applicationTimeouts != null) { + addApplicationTimeouts(); + } } private void mergeLocalToProto() { @@ -548,4 +558,77 @@ extends ApplicationSubmissionContext { private ReservationIdProto convertToProtoFormat(ReservationId t) { return ((ReservationIdPBImpl) t).getProto(); } + + @Override + public Map getApplicationTimeouts() { + initApplicationTimeout(); + return this.applicationTimeouts; + } + + private void initApplicationTimeout() { + if (this.applicationTimeouts != null) { + return; + } + ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; + List lists = p.getApplicationTimeoutsList(); + this.applicationTimeouts = + new HashMap(lists.size()); + for (ApplicationTimeoutMapProto timeoutProto : lists) { + this.applicationTimeouts.put( + ProtoUtils + .convertFromProtoFormat(timeoutProto.getApplicationTimeoutType()), + timeoutProto.getTimeout()); + } + } + + @Override + public void setApplicationTimeouts( + Map appTimeouts) { + if (appTimeouts == null) { + return; + } + initApplicationTimeout(); + this.applicationTimeouts.clear(); + this.applicationTimeouts.putAll(appTimeouts); + } + + private void addApplicationTimeouts() { + maybeInitBuilder(); + builder.clearApplicationTimeouts(); + if (applicationTimeouts == null) { + return; + } + Iterable values = + new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + private Iterator iterator = + applicationTimeouts.keySet().iterator(); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public ApplicationTimeoutMapProto next() { + ApplicationTimeoutType key = iterator.next(); + return ApplicationTimeoutMapProto.newBuilder() + .setTimeout(applicationTimeouts.get(key)) + .setApplicationTimeoutType( + ProtoUtils.convertToProtoFormat(key)) + .build(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + this.builder.addAllApplicationTimeouts(values); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ae5a3a5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index 128120e..ab283e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; @@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnProtos.AMCommandProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutTypeProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto; @@ -259,6 +261,23 @@ public class ProtoUtils { return ApplicationAccessType.valueOf(e.name().replace( APP_ACCESS_TYPE_PREFIX, "")); } + + /* + * ApplicationTimeoutType + */ + private static String APP_TIMEOUT_TYPE_PREFIX = "APP_TIMEOUT_"; + + public static ApplicationTimeoutTypeProto convertToProtoFormat( + ApplicationTimeoutType e) { + return ApplicationTimeoutTypeProto + .valueOf(APP_TIMEOUT_TYPE_PREFIX + e.name()); + } + + public static ApplicationTimeoutType convertFromProtoFormat( + ApplicationTimeoutTypeProto e) { + return ApplicationTimeoutType + .valueOf(e.name().replace(APP_TIMEOUT_TYPE_PREFIX, "")); + } /* * Reservation Request interpreter type http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ae5a3a5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java index e80d032..b605026 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java @@ -44,8 +44,8 @@ public abstract class AbstractLivelinessMonitor extends AbstractService { private Thread checkerThread; private volatile boolean stopped; public static final int DEFAULT_EXPIRE = 5*60*1000;//5 mins - private int expireInterval = DEFAULT_EXPIRE; - private int monitorInterval = expireInterval/3; + private long expireInterval = DEFAULT_EXPIRE; + private long monitorInterval = expireInterval / 3; private final Clock clock; @@ -85,7 +85,12 @@ public abstract class AbstractLivelinessMonitor extends AbstractService { this.expireInterval = expireInterval; } - protected void setMonitorInterval(int monitorInterval) { + protected long getExpireInterval(O o) { + // by-default return for all the registered object interval. + return this.expireInterval; + } + + protected void setMonitorInterval(long monitorInterval) { this.monitorInterval = monitorInterval; } @@ -97,7 +102,11 @@ public abstract class AbstractLivelinessMonitor extends AbstractService { } public synchronized void register(O ob) { - running.put(ob, clock.getTime()); + register(ob, clock.getTime()); + } + + public synchronized void register(O ob, long monitorStartTime) { + running.put(ob, monitorStartTime); } public synchronized void unregister(O ob) { @@ -117,19 +126,20 @@ public abstract class AbstractLivelinessMonitor extends AbstractService { public void run() { while (!stopped && !Thread.currentThread().isInterrupted()) { synchronized (AbstractLivelinessMonitor.this) { - Iterator> iterator = - running.entrySet().iterator(); + Iterator> iterator = running.entrySet().iterator(); - //avoid calculating current time everytime in loop + // avoid calculating current time everytime in loop long currentTime = clock.getTime(); while (iterator.hasNext()) { Map.Entry entry = iterator.next(); - if (currentTime > entry.getValue() + expireInterval) { + O key = entry.getKey(); + long interval = getExpireInterval(key); + if (currentTime > entry.getValue() + interval) { iterator.remove(); - expire(entry.getKey()); - LOG.info("Expired:" + entry.getKey().toString() + - " Timed out after " + expireInterval/1000 + " secs"); + expire(key); + LOG.info("Expired:" + entry.getKey().toString() + + " Timed out after " + interval / 1000 + " secs"); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ae5a3a5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 965b575..524afec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3075,4 +3075,13 @@ yarn.resourcemanager.node-removal-untracked.timeout-ms 60000 + + + + The RMAppLifetimeMonitor Service uses this value as lifetime monitor interval + + yarn.resourcemanager.application-timeouts.lifetime-monitor.interval-ms + 60000 + + http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ae5a3a5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index e57a5a2..5270486 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -368,7 +368,7 @@ public class TestPBImplRecords { return bytes[rand.nextInt(4)]; } else if (type.equals(int.class) || type.equals(Integer.class)) { return rand.nextInt(1000000); - } else if (type.equals(long.class)) { + } else if (type.equals(long.class) || type.equals(Long.class)) { return Long.valueOf(rand.nextInt(1000000)); } else if (type.equals(float.class)) { return rand.nextFloat(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ae5a3a5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index caa0ff13..0e305a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetimeMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -105,6 +106,8 @@ public class RMActiveServiceContext { private boolean isSchedulerReady = false; private PlacementManager queuePlacementManager = null; + private RMAppLifetimeMonitor rmAppLifetimeMonitor; + public RMActiveServiceContext() { queuePlacementManager = new PlacementManager(); } @@ -467,4 +470,17 @@ public class RMActiveServiceContext { public void setQueuePlacementManager(PlacementManager placementMgr) { this.queuePlacementManager = placementMgr; } + + @Private + @Unstable + public void setRMAppLifetimeMonitor( + RMAppLifetimeMonitor lifetimeMonitor) { + this.rmAppLifetimeMonitor = lifetimeMonitor; + } + + @Private + @Unstable + public RMAppLifetimeMonitor getRMAppLifetimeMonitor() { + return this.rmAppLifetimeMonitor; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ae5a3a5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 136dee0..7352a28 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -385,6 +385,10 @@ public class RMAppManager implements EventHandler, } } + // fail the submission if configured application timeout value is invalid + RMServerUtils.validateApplicationTimeouts( + submissionContext.getApplicationTimeouts()); + // Create RMApp RMAppImpl application = new RMAppImpl(applicationId, rmContext, this.conf, http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ae5a3a5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 2ba445c..c9d185f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetimeMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -149,4 +150,8 @@ public interface RMContext { LeaderElectorService getLeaderElectorService(); QueueLimitCalculator getNodeManagerQueueLimitCalculator(); + + void setRMAppLifetimeMonitor(RMAppLifetimeMonitor rmAppLifetimeMonitor); + + RMAppLifetimeMonitor getRMAppLifetimeMonitor(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ae5a3a5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 1e702de..dc8f7d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetimeMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -499,4 +500,15 @@ public class RMContextImpl implements RMContext { QueueLimitCalculator limitCalculator) { this.queueLimitCalculator = limitCalculator; } + + @Override + public void setRMAppLifetimeMonitor( + RMAppLifetimeMonitor rmAppLifetimeMonitor) { + this.activeServiceContext.setRMAppLifetimeMonitor(rmAppLifetimeMonitor); + } + + @Override + public RMAppLifetimeMonitor getRMAppLifetimeMonitor() { + return this.activeServiceContext.getRMAppLifetimeMonitor(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ae5a3a5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index 7fcabab..b90e499 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -36,6 +36,7 @@ import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; import org.apache.hadoop.yarn.exceptions .InvalidResourceBlacklistRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -470,4 +472,18 @@ public class RMServerUtils { conf.set(entry.getKey(), entry.getValue()); } } + + public static void validateApplicationTimeouts( + Map timeouts) throws YarnException { + if (timeouts != null) { + for (Map.Entry timeout : timeouts + .entrySet()) { + if (timeout.getValue() < 0) { + String message = "Invalid application timeout, value=" + + timeout.getValue() + " for type=" + timeout.getKey(); + throw new YarnException(message); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ae5a3a5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 8a6997d..5e9bece 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -96,6 +96,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessM import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetimeMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; @@ -556,6 +557,10 @@ public class ResourceManager extends CompositeService implements Recoverable { addService(amFinishingMonitor); rmContext.setAMFinishingMonitor(amFinishingMonitor); + RMAppLifetimeMonitor rmAppLifetimeMonitor = createRMAppLifetimeMonitor(); + addService(rmAppLifetimeMonitor); + rmContext.setRMAppLifetimeMonitor(rmAppLifetimeMonitor); + RMNodeLabelsManager nlm = createNodeLabelManager(); nlm.setRMContext(rmContext); addService(nlm); @@ -1398,4 +1403,8 @@ public class ResourceManager extends CompositeService implements Recoverable { out.println(" " + "[-remove-application-from-state-store ]" + "\n"); } + + protected RMAppLifetimeMonitor createRMAppLifetimeMonitor() { + return new RMAppLifetimeMonitor(this.rmContext); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ae5a3a5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index e5bde32..727703b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -527,6 +528,8 @@ public class RMAppImpl implements RMApp, Recoverable { DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD; } } + + } /** @@ -1106,6 +1109,20 @@ public class RMAppImpl implements RMApp, Recoverable { } } + long applicationLifetime = + app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME); + if (applicationLifetime > 0) { + app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId, + ApplicationTimeoutType.LIFETIME, app.submitTime, + applicationLifetime * 1000); + if (LOG.isDebugEnabled()) { + LOG.debug("Application " + app.applicationId + + " is registered for timeout monitor, type=" + + ApplicationTimeoutType.LIFETIME + " value=" + + applicationLifetime + " seconds"); + } + } + // No existent attempts means the attempt associated with this app was not // started or started but not yet saved. if (app.attempts.isEmpty()) { @@ -1152,6 +1169,13 @@ public class RMAppImpl implements RMApp, Recoverable { @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { + Map timeouts = + app.submissionContext.getApplicationTimeouts(); + if (timeouts != null && timeouts.size() > 0) { + app.rmContext.getRMAppLifetimeMonitor() + .unregisterApp(app.getApplicationId(), timeouts.keySet()); + } + if (app.transitionTodo instanceof SingleArcTransition) { ((SingleArcTransition) app.transitionTodo).transition(app, app.eventCausingFinalSaving); @@ -1160,7 +1184,6 @@ public class RMAppImpl implements RMApp, Recoverable { app.eventCausingFinalSaving); } return app.targetedFinalState; - } } @@ -1209,6 +1232,18 @@ public class RMAppImpl implements RMApp, Recoverable { @Override public void transition(RMAppImpl app, RMAppEvent event) { + long applicationLifetime = + app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME); + if (applicationLifetime > 0) { + app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId, + ApplicationTimeoutType.LIFETIME, app.submitTime, + applicationLifetime * 1000); + LOG.debug("Application " + app.applicationId + + " is registered for timeout monitor, type=" + + ApplicationTimeoutType.LIFETIME + " value=" + applicationLifetime + + " seconds"); + } + // If recovery is enabled then store the application information in a // non-blocking call so make sure that RM has stored the information // needed to restart the AM after RM restart without further client @@ -1922,4 +1957,14 @@ public class RMAppImpl implements RMApp, Recoverable { public int getNextAttemptId() { return nextAttemptId; } + + private long getApplicationLifetime(ApplicationTimeoutType type) { + Map timeouts = + this.submissionContext.getApplicationTimeouts(); + long applicationLifetime = -1; + if (timeouts != null && timeouts.containsKey(type)) { + applicationLifetime = timeouts.get(type); + } + return applicationLifetime; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ae5a3a5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppLifetimeMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppLifetimeMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppLifetimeMonitor.java new file mode 100644 index 0000000..e550c97 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppLifetimeMonitor.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor; + +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor; +import org.apache.hadoop.yarn.util.SystemClock; + +/** + * This service will monitor the applications against the lifetime value given. + * The applications will be killed if it running beyond the given time. + */ +public class RMAppLifetimeMonitor + extends AbstractLivelinessMonitor { + + private static final Log LOG = LogFactory.getLog(RMAppLifetimeMonitor.class); + + private RMContext rmContext; + private Map monitoredApps = + new HashMap(); + + private static final EnumSet COMPLETED_APP_STATES = + EnumSet.of(RMAppState.FINISHED, RMAppState.FINISHING, RMAppState.FAILED, + RMAppState.KILLED, RMAppState.FINAL_SAVING, RMAppState.KILLING); + + public RMAppLifetimeMonitor(RMContext rmContext) { + super(RMAppLifetimeMonitor.class.getName(), SystemClock.getInstance()); + this.rmContext = rmContext; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + long monitorInterval = conf.getLong( + YarnConfiguration.RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS); + if (monitorInterval <= 0) { + monitorInterval = + YarnConfiguration.DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS; + } + setMonitorInterval(monitorInterval); + LOG.info("Application lifelime monitor interval set to " + monitorInterval + + " ms."); + super.serviceInit(conf); + } + + @SuppressWarnings("unchecked") + @Override + protected synchronized void expire(RMAppToMonitor monitoredAppKey) { + Long remove = monitoredApps.remove(monitoredAppKey); + ApplicationId appId = monitoredAppKey.getApplicationId(); + RMApp app = rmContext.getRMApps().get(appId); + if (app == null) { + return; + } + // Don't trigger a KILL event if application is in completed states + if (!COMPLETED_APP_STATES.contains(app.getState())) { + String diagnostics = + "Application killed due to exceeding its lifetime period " + remove + + " milliseconds"; + rmContext.getDispatcher().getEventHandler() + .handle(new RMAppEvent(appId, RMAppEventType.KILL, diagnostics)); + } else { + LOG.info("Application " + appId + + " is about to complete. So not killing the application."); + } + } + + public synchronized void registerApp(ApplicationId appId, + ApplicationTimeoutType timeoutType, long monitorStartTime, long timeout) { + RMAppToMonitor appToMonitor = new RMAppToMonitor(appId, timeoutType); + register(appToMonitor, monitorStartTime); + monitoredApps.putIfAbsent(appToMonitor, timeout); + } + + @Override + protected synchronized long getExpireInterval( + RMAppToMonitor monitoredAppKey) { + return monitoredApps.get(monitoredAppKey); + } + + public synchronized void unregisterApp(ApplicationId appId, + ApplicationTimeoutType timeoutType) { + RMAppToMonitor appToRemove = new RMAppToMonitor(appId, timeoutType); + unregister(appToRemove); + monitoredApps.remove(appToRemove); + } + + public synchronized void unregisterApp(ApplicationId appId, + Set types) { + for (ApplicationTimeoutType type : types) { + unregisterApp(appId, type); + } + } + + public synchronized void updateApplicationTimeouts(ApplicationId appId, + Map timeouts) { + // TODO in YARN-5611 + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ae5a3a5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppToMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppToMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppToMonitor.java new file mode 100644 index 0000000..1cf2132 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppToMonitor.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; + +/** + * This class used for monitor application with applicationId+appTimeoutType. + */ +public class RMAppToMonitor { + + private ApplicationId applicationId; + private ApplicationTimeoutType appTimeoutType; + + RMAppToMonitor(ApplicationId appId, ApplicationTimeoutType timeoutType) { + this.applicationId = appId; + this.appTimeoutType = timeoutType; + } + + public ApplicationId getApplicationId() { + return applicationId; + } + + public ApplicationTimeoutType getAppTimeoutType() { + return appTimeoutType; + } + + @Override + public int hashCode() { + return applicationId.hashCode() + appTimeoutType.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + RMAppToMonitor other = (RMAppToMonitor) obj; + if (!this.applicationId.equals(other.getApplicationId())) { + return false; + } + if (this.appTimeoutType != other.getAppTimeoutType()) { + return false; + } + return true; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(applicationId.toString()).append("_").append(appTimeoutType); + return sb.toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ae5a3a5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/package-info.java new file mode 100644 index 0000000..a3cc7ef --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor contains + * classes related to application monitor. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ae5a3a5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index f843261..25a8288 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -460,7 +461,7 @@ public class MockRM extends ResourceManager { return submitApp(resource, name, user, acls, false, queue, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, - false, null, 0, null, true, priority, amLabel); + false, null, 0, null, true, priority, amLabel, null); } public RMApp submitApp(Resource resource, String name, String user, @@ -561,7 +562,7 @@ public class MockRM extends ResourceManager { return submitApp(capability, name, user, acls, unmanaged, queue, maxAppAttempts, ts, appType, waitForAccepted, keepContainers, isAppIdProvided, applicationId, attemptFailuresValidityInterval, - logAggregationContext, cancelTokensWhenComplete, priority, ""); + logAggregationContext, cancelTokensWhenComplete, priority, "", null); } public RMApp submitApp(Resource capability, String name, String user, @@ -570,7 +571,8 @@ public class MockRM extends ResourceManager { boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, ApplicationId applicationId, long attemptFailuresValidityInterval, LogAggregationContext logAggregationContext, - boolean cancelTokensWhenComplete, Priority priority, String amLabel) + boolean cancelTokensWhenComplete, Priority priority, String amLabel, + Map applicationTimeouts) throws Exception { ApplicationId appId = isAppIdProvided ? applicationId : null; ApplicationClientProtocol client = getClientRMService(); @@ -587,6 +589,9 @@ public class MockRM extends ResourceManager { sub.setApplicationId(appId); sub.setApplicationName(name); sub.setMaxAppAttempts(maxAppAttempts); + if (applicationTimeouts != null && applicationTimeouts.size() > 0) { + sub.setApplicationTimeouts(applicationTimeouts); + } if (unmanaged) { sub.setUnmanagedAM(true); } @@ -1073,4 +1078,15 @@ public class MockRM extends ResourceManager { !apps.containsKey(appId)); LOG.info("app is removed from scheduler, " + appId); } + + public RMApp submitApp(int masterMemory, Priority priority, + Map applicationTimeouts) throws Exception { + Resource resource = Resource.newInstance(masterMemory, 0); + return submitApp( + resource, "", UserGroupInformation.getCurrentUser().getShortUserName(), + null, false, null, + super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, + false, false, null, 0, null, true, priority, null, applicationTimeouts); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2ae5a3a5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java new file mode 100644 index 0000000..3f2db1d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart; +import org.apache.hadoop.yarn.server.resourcemanager.TestWorkPreservingRMRestart; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Test class for application life time monitor feature test. + */ +public class TestApplicationLifetimeMonitor { + private YarnConfiguration conf; + + @Before + public void setup() throws IOException { + conf = new YarnConfiguration(); + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + UserGroupInformation.setConfiguration(conf); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, + true); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setLong(YarnConfiguration.RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS, + 3000L); + } + + @Test(timeout = 90000) + public void testApplicationLifetimeMonitor() throws Exception { + MockRM rm = null; + try { + rm = new MockRM(conf); + rm.start(); + Priority appPriority = Priority.newInstance(0); + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * 1024); + + Map timeouts = + new HashMap(); + timeouts.put(ApplicationTimeoutType.LIFETIME, 10L); + RMApp app1 = rm.submitApp(1024, appPriority, timeouts); + nm1.nodeHeartbeat(true); + // Send launch Event + MockAM am1 = + rm.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId()); + am1.registerAppAttempt(); + rm.waitForState(app1.getApplicationId(), RMAppState.KILLED); + Assert.assertTrue("Applicaiton killed before lifetime value", + (System.currentTimeMillis() - app1.getSubmitTime()) > 10000); + } finally { + stopRM(rm); + } + } + + @SuppressWarnings("rawtypes") + @Test(timeout = 180000) + public void testApplicationLifetimeOnRMRestart() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + nm1.nodeHeartbeat(true); + + long appLifetime = 60L; + Map timeouts = + new HashMap(); + timeouts.put(ApplicationTimeoutType.LIFETIME, appLifetime); + RMApp app1 = rm1.submitApp(200, Priority.newInstance(0), timeouts); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // Re-start RM + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + + // recover app + RMApp recoveredApp1 = + rm2.getRMContext().getRMApps().get(app1.getApplicationId()); + + NMContainerStatus amContainer = TestRMRestart.createNMContainerStatus( + am1.getApplicationAttemptId(), 1, ContainerState.RUNNING); + NMContainerStatus runningContainer = TestRMRestart.createNMContainerStatus( + am1.getApplicationAttemptId(), 2, ContainerState.RUNNING); + + nm1.registerNode(Arrays.asList(amContainer, runningContainer), null); + + // Wait for RM to settle down on recovering containers; + TestWorkPreservingRMRestart.waitForNumContainersToRecover(2, rm2, + am1.getApplicationAttemptId()); + Set launchedContainers = + ((RMNodeImpl) rm2.getRMContext().getRMNodes().get(nm1.getNodeId())) + .getLaunchedContainers(); + assertTrue(launchedContainers.contains(amContainer.getContainerId())); + assertTrue(launchedContainers.contains(runningContainer.getContainerId())); + + // check RMContainers are re-recreated and the container state is correct. + rm2.waitForState(nm1, amContainer.getContainerId(), + RMContainerState.RUNNING); + rm2.waitForState(nm1, runningContainer.getContainerId(), + RMContainerState.RUNNING); + + // re register attempt to rm2 + rm2.waitForState(recoveredApp1.getApplicationId(), RMAppState.ACCEPTED); + am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); + am1.registerAppAttempt(); + rm2.waitForState(recoveredApp1.getApplicationId(), RMAppState.RUNNING); + + // wait for app life time and application to be in killed state. + rm2.waitForState(recoveredApp1.getApplicationId(), RMAppState.KILLED); + Assert.assertTrue("Applicaiton killed before lifetime value", + (System.currentTimeMillis() + - recoveredApp1.getSubmitTime()) > appLifetime); + } + + private void stopRM(MockRM rm) { + if (rm != null) { + rm.stop(); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org