Return-Path: X-Original-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BC13CE1E8 for ; Fri, 11 Jan 2013 19:41:25 +0000 (UTC) Received: (qmail 29033 invoked by uid 500); 11 Jan 2013 19:41:25 -0000 Delivered-To: apmail-hadoop-yarn-commits-archive@hadoop.apache.org Received: (qmail 29002 invoked by uid 500); 11 Jan 2013 19:41:25 -0000 Mailing-List: contact yarn-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: yarn-commits@hadoop.apache.org Delivered-To: mailing list yarn-commits@hadoop.apache.org Received: (qmail 28994 invoked by uid 99); 11 Jan 2013 19:41:25 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Jan 2013 19:41:25 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Jan 2013 19:41:11 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 07B8F2388ABC; Fri, 11 Jan 2013 19:40:34 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1432246 [2/3] - in /hadoop/common/branches/HDFS-2802/hadoop-yarn-project: ./ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop... Date: Fri, 11 Jan 2013 19:40:30 -0000 To: yarn-commits@hadoop.apache.org From: suresh@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130111194034.07B8F2388ABC@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1432246&r1=1432245&r2=1432246&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Fri Jan 11 19:40:23 2013 @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re import java.io.IOException; import java.nio.ByteBuffer; import java.util.LinkedList; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,6 +37,10 @@ import org.apache.hadoop.yarn.event.Even import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -48,7 +53,8 @@ import org.apache.hadoop.yarn.server.sec /** * This class manages the list of applications for the resource manager. */ -public class RMAppManager implements EventHandler { +public class RMAppManager implements EventHandler, + Recoverable { private static final Log LOG = LogFactory.getLog(RMAppManager.class); @@ -173,6 +179,10 @@ public class RMAppManager implements Eve completedApps.add(applicationId); writeAuditLog(applicationId); + + // application completely done. Remove from state + RMStateStore store = rmContext.getStateStore(); + store.removeApplication(rmContext.getRMApps().get(applicationId)); } } @@ -306,6 +316,37 @@ public class RMAppManager implements Eve } return credentials; } + + @Override + public void recover(RMState state) throws Exception { + RMStateStore store = rmContext.getStateStore(); + assert store != null; + // recover applications + Map appStates = state.getApplicationState(); + LOG.info("Recovering " + appStates.size() + " applications"); + for(ApplicationState appState : appStates.values()) { + // re-submit the application + // this is going to send an app start event but since the async dispatcher + // has not started that event will be queued until we have completed re + // populating the state + if(appState.getApplicationSubmissionContext().getUnmanagedAM()) { + // do not recover unmanaged applications since current recovery + // mechanism of restarting attempts does not work for them. + // This will need to be changed in work preserving recovery in which + // RM will re-connect with the running AM's instead of restarting them + LOG.info("Not recovering unmanaged application " + appState.getAppId()); + store.removeApplication(appState); + } else { + LOG.info("Recovering application " + appState.getAppId()); + submitApplication(appState.getApplicationSubmissionContext(), + appState.getSubmitTime()); + // re-populate attempt information in application + RMAppImpl appImpl = (RMAppImpl) rmContext.getRMApps().get( + appState.getAppId()); + appImpl.recover(state); + } + } + } @Override public void handle(RMAppManagerEvent event) { Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java?rev=1432246&r1=1432245&r2=1432246&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java Fri Jan 11 19:40:23 2013 @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMa import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; @@ -38,6 +39,8 @@ import org.apache.hadoop.yarn.server.res public interface RMContext { Dispatcher getDispatcher(); + + RMStateStore getStateStore(); ConcurrentMap getRMApps(); Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java?rev=1432246&r1=1432245&r2=1432246&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java Fri Jan 11 19:40:23 2013 @@ -23,7 +23,10 @@ import java.util.concurrent.ConcurrentMa import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; @@ -33,6 +36,8 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import com.google.common.annotations.VisibleForTesting; + public class RMContextImpl implements RMContext { private final Dispatcher rmDispatcher; @@ -48,6 +53,7 @@ public class RMContextImpl implements RM private AMLivelinessMonitor amLivelinessMonitor; private AMLivelinessMonitor amFinishingMonitor; + private RMStateStore stateStore = null; private ContainerAllocationExpirer containerAllocationExpirer; private final DelegationTokenRenewer tokenRenewer; private final ApplicationTokenSecretManager appTokenSecretManager; @@ -55,6 +61,7 @@ public class RMContextImpl implements RM private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager; public RMContextImpl(Dispatcher rmDispatcher, + RMStateStore store, ContainerAllocationExpirer containerAllocationExpirer, AMLivelinessMonitor amLivelinessMonitor, AMLivelinessMonitor amFinishingMonitor, @@ -63,6 +70,7 @@ public class RMContextImpl implements RM RMContainerTokenSecretManager containerTokenSecretManager, ClientToAMTokenSecretManagerInRM clientTokenSecretManager) { this.rmDispatcher = rmDispatcher; + this.stateStore = store; this.containerAllocationExpirer = containerAllocationExpirer; this.amLivelinessMonitor = amLivelinessMonitor; this.amFinishingMonitor = amFinishingMonitor; @@ -71,11 +79,39 @@ public class RMContextImpl implements RM this.containerTokenSecretManager = containerTokenSecretManager; this.clientToAMTokenSecretManager = clientTokenSecretManager; } + + @VisibleForTesting + // helper constructor for tests + public RMContextImpl(Dispatcher rmDispatcher, + ContainerAllocationExpirer containerAllocationExpirer, + AMLivelinessMonitor amLivelinessMonitor, + AMLivelinessMonitor amFinishingMonitor, + DelegationTokenRenewer tokenRenewer, + ApplicationTokenSecretManager appTokenSecretManager, + RMContainerTokenSecretManager containerTokenSecretManager, + ClientToAMTokenSecretManagerInRM clientTokenSecretManager) { + this(rmDispatcher, null, containerAllocationExpirer, amLivelinessMonitor, + amFinishingMonitor, tokenRenewer, appTokenSecretManager, + containerTokenSecretManager, clientTokenSecretManager); + RMStateStore nullStore = new NullRMStateStore(); + nullStore.setDispatcher(rmDispatcher); + try { + nullStore.init(new YarnConfiguration()); + setStateStore(nullStore); + } catch (Exception e) { + assert false; + } + } @Override public Dispatcher getDispatcher() { return this.rmDispatcher; } + + @Override + public RMStateStore getStateStore() { + return stateStore; + } @Override public ConcurrentMap getRMApps() { @@ -126,4 +162,9 @@ public class RMContextImpl implements RM public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() { return this.clientToAMTokenSecretManager; } + + @VisibleForTesting + public void setStateStore(RMStateStore store) { + stateStore = store; + } } \ No newline at end of file Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1432246&r1=1432245&r2=1432246&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Fri Jan 11 19:40:23 2013 @@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.source.JvmMetrics; import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; @@ -45,10 +46,11 @@ import org.apache.hadoop.yarn.event.Even import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreFactory; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -80,6 +82,8 @@ import org.apache.hadoop.yarn.webapp.Web import org.apache.hadoop.yarn.webapp.WebApps; import org.apache.hadoop.yarn.webapp.WebApps.Builder; +import com.google.common.annotations.VisibleForTesting; + /** * The ResourceManager is the main class that is a set of components. * "I am the ResourceManager. All your resources are belong to us..." @@ -119,14 +123,13 @@ public class ResourceManager extends Com protected RMDelegationTokenSecretManager rmDTSecretManager; private WebApp webApp; protected RMContext rmContext; - private final RMStateStore store; protected ResourceTrackerService resourceTracker; + private boolean recoveryEnabled; private Configuration conf; - - public ResourceManager(RMStateStore store) { + + public ResourceManager() { super("ResourceManager"); - this.store = store; } public RMContext getRMContext() { @@ -160,12 +163,34 @@ public class ResourceManager extends Com this.containerTokenSecretManager = createContainerTokenSecretManager(conf); + boolean isRecoveryEnabled = conf.getBoolean( + YarnConfiguration.RECOVERY_ENABLED, + YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED); + + RMStateStore rmStore = null; + if(isRecoveryEnabled) { + recoveryEnabled = true; + rmStore = RMStateStoreFactory.getStore(conf); + } else { + recoveryEnabled = false; + rmStore = new NullRMStateStore(); + } + try { + rmStore.init(conf); + rmStore.setDispatcher(rmDispatcher); + } catch (Exception e) { + // the Exception from stateStore.init() needs to be handled for + // HA and we need to give up master status if we got fenced + LOG.error("Failed to init state store", e); + ExitUtil.terminate(1, e); + } + this.rmContext = - new RMContextImpl(this.rmDispatcher, + new RMContextImpl(this.rmDispatcher, rmStore, this.containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, tokenRenewer, this.appTokenSecretManager, this.containerTokenSecretManager, this.clientToAMSecretManager); - + // Register event handler for NodesListManager this.nodesListManager = new NodesListManager(this.rmContext); this.rmDispatcher.register(NodesListManagerEventType.class, @@ -226,9 +251,15 @@ public class ResourceManager extends Com addService(applicationMasterLauncher); new RMNMInfo(this.rmContext, this.scheduler); - + super.init(conf); } + + @VisibleForTesting + protected void setRMStateStore(RMStateStore rmStore) { + rmStore.setDispatcher(rmDispatcher); + ((RMContextImpl) rmContext).setStateStore(rmStore); + } protected RMContainerTokenSecretManager createContainerTokenSecretManager( Configuration conf) { @@ -502,6 +533,19 @@ public class ResourceManager extends Com this.appTokenSecretManager.start(); this.containerTokenSecretManager.start(); + if(recoveryEnabled) { + try { + RMStateStore rmStore = rmContext.getStateStore(); + RMState state = rmStore.loadState(); + recover(state); + } catch (Exception e) { + // the Exception from loadState() needs to be handled for + // HA and we need to give up master status if we got fenced + LOG.error("Failed to load/recover state", e); + ExitUtil.terminate(1, e); + } + } + startWepApp(); DefaultMetricsSystem.initialize("ResourceManager"); JvmMetrics.initSingleton("ResourceManager", null); @@ -555,6 +599,13 @@ public class ResourceManager extends Com DefaultMetricsSystem.shutdown(); + RMStateStore store = rmContext.getStateStore(); + try { + store.close(); + } catch (Exception e) { + LOG.error("Error closing store.", e); + } + super.stop(); } @@ -643,6 +694,8 @@ public class ResourceManager extends Com @Override public void recover(RMState state) throws Exception { + // recover applications + rmAppManager.recover(state); } public static void main(String argv[]) { @@ -650,13 +703,11 @@ public class ResourceManager extends Com StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG); try { Configuration conf = new YarnConfiguration(); - RMStateStore store = StoreFactory.getStore(conf); - ResourceManager resourceManager = new ResourceManager(store); + ResourceManager resourceManager = new ResourceManager(); ShutdownHookManager.get().addShutdownHook( new CompositeServiceShutdownHook(resourceManager), SHUTDOWN_HOOK_PRIORITY); resourceManager.init(conf); - //resourceManager.recover(store.restore()); resourceManager.start(); } catch (Throwable t) { LOG.fatal("Error starting ResourceManager", t); Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1432246&r1=1432245&r2=1432246&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Fri Jan 11 19:40:23 2013 @@ -15,10 +15,313 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.yarn.server.resourcemanager.recovery; -public interface RMStateStore { - public interface RMState { +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent; + +@Private +@Unstable +/** + * Base class to implement storage of ResourceManager state. + * Takes care of asynchronous notifications and interfacing with YARN objects. + * Real store implementations need to derive from it and implement blocking + * store and load methods to actually store and load the state. + */ +public abstract class RMStateStore { + public static final Log LOG = LogFactory.getLog(RMStateStore.class); + + /** + * State of an application attempt + */ + public static class ApplicationAttemptState { + final ApplicationAttemptId attemptId; + final Container masterContainer; + + public ApplicationAttemptState(ApplicationAttemptId attemptId, + Container masterContainer) { + this.attemptId = attemptId; + this.masterContainer = masterContainer; + } + + public Container getMasterContainer() { + return masterContainer; + } + public ApplicationAttemptId getAttemptId() { + return attemptId; + } + } + + /** + * State of an application application + */ + public static class ApplicationState { + final ApplicationSubmissionContext context; + final long submitTime; + Map attempts = + new HashMap(); + + ApplicationState(long submitTime, ApplicationSubmissionContext context) { + this.submitTime = submitTime; + this.context = context; + } + + public ApplicationId getAppId() { + return context.getApplicationId(); + } + public long getSubmitTime() { + return submitTime; + } + public int getAttemptCount() { + return attempts.size(); + } + public ApplicationSubmissionContext getApplicationSubmissionContext() { + return context; + } + public ApplicationAttemptState getAttempt(ApplicationAttemptId attemptId) { + return attempts.get(attemptId); + } + } + + /** + * State of the ResourceManager + */ + public static class RMState { + Map appState = + new HashMap(); + + public Map getApplicationState() { + return appState; + } + } + + private Dispatcher rmDispatcher; + + /** + * Dispatcher used to send state operation completion events to + * ResourceManager services + */ + public void setDispatcher(Dispatcher dispatcher) { + this.rmDispatcher = dispatcher; + } + + AsyncDispatcher dispatcher; + + public synchronized void init(Configuration conf) throws Exception{ + // create async handler + dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + dispatcher.register(RMStateStoreEventType.class, + new ForwardingEventHandler()); + dispatcher.start(); + + initInternal(conf); + } + + /** + * Derived classes initialize themselves using this method. + * The base class is initialized and the event dispatcher is ready to use at + * this point + */ + protected abstract void initInternal(Configuration conf) throws Exception; + + public synchronized void close() throws Exception { + closeInternal(); + dispatcher.stop(); + } + + /** + * Derived classes close themselves using this method. + * The base class will be closed and the event dispatcher will be shutdown + * after this + */ + protected abstract void closeInternal() throws Exception; + + /** + * Blocking API + * The derived class must recover state from the store and return a new + * RMState object populated with that state + * This must not be called on the dispatcher thread + */ + public abstract RMState loadState() throws Exception; + + /** + * Blocking API + * ResourceManager services use this to store the application's state + * This must not be called on the dispatcher thread + */ + public synchronized void storeApplication(RMApp app) throws Exception { + ApplicationSubmissionContext context = app + .getApplicationSubmissionContext(); + assert context instanceof ApplicationSubmissionContextPBImpl; + + ApplicationStateDataPBImpl appStateData = new ApplicationStateDataPBImpl(); + appStateData.setSubmitTime(app.getSubmitTime()); + appStateData.setApplicationSubmissionContext(context); + + LOG.info("Storing info for app: " + context.getApplicationId()); + storeApplicationState(app.getApplicationId().toString(), appStateData); + } + + /** + * Blocking API + * Derived classes must implement this method to store the state of an + * application. + */ + protected abstract void storeApplicationState(String appId, + ApplicationStateDataPBImpl appStateData) + throws Exception; + + @SuppressWarnings("unchecked") + /** + * Non-blocking API + * ResourceManager services call this to store state on an application attempt + * This does not block the dispatcher threads + * RMAppAttemptStoredEvent will be sent on completion to notify the RMAppAttempt + */ + public synchronized void storeApplicationAttempt(RMAppAttempt appAttempt) { + ApplicationAttemptState attemptState = new ApplicationAttemptState( + appAttempt.getAppAttemptId(), appAttempt.getMasterContainer()); + dispatcher.getEventHandler().handle( + new RMStateStoreAppAttemptEvent(attemptState)); + } + + /** + * Blocking API + * Derived classes must implement this method to store the state of an + * application attempt + */ + protected abstract void storeApplicationAttemptState(String attemptId, + ApplicationAttemptStateDataPBImpl attemptStateData) + throws Exception; + + + /** + * Non-blocking API + * ResourceManager services call this to remove an application from the state + * store + * This does not block the dispatcher threads + * There is no notification of completion for this operation. + */ + public synchronized void removeApplication(RMApp app) { + ApplicationState appState = new ApplicationState( + app.getSubmitTime(), app.getApplicationSubmissionContext()); + for(RMAppAttempt appAttempt : app.getAppAttempts().values()) { + ApplicationAttemptState attemptState = new ApplicationAttemptState( + appAttempt.getAppAttemptId(), appAttempt.getMasterContainer()); + appState.attempts.put(attemptState.getAttemptId(), attemptState); + } + + removeApplication(appState); + } + + @SuppressWarnings("unchecked") + /** + * Non-Blocking API + */ + public synchronized void removeApplication(ApplicationState appState) { + dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState)); + } + + /** + * Blocking API + * Derived classes must implement this method to remove the state of an + * application and its attempts + */ + protected abstract void removeApplicationState(ApplicationState appState) + throws Exception; + + // Dispatcher related code + + private synchronized void handleStoreEvent(RMStateStoreEvent event) { + switch(event.getType()) { + case STORE_APP_ATTEMPT: + { + ApplicationAttemptState attemptState = + ((RMStateStoreAppAttemptEvent) event).getAppAttemptState(); + Exception storedException = null; + ApplicationAttemptStateDataPBImpl attemptStateData = + new ApplicationAttemptStateDataPBImpl(); + attemptStateData.setAttemptId(attemptState.getAttemptId()); + attemptStateData.setMasterContainer(attemptState.getMasterContainer()); + + LOG.info("Storing info for attempt: " + attemptState.getAttemptId()); + try { + storeApplicationAttemptState(attemptState.getAttemptId().toString(), + attemptStateData); + } catch (Exception e) { + LOG.error("Error storing appAttempt: " + + attemptState.getAttemptId(), e); + storedException = e; + } finally { + notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(), + storedException); + } + } + break; + case REMOVE_APP: + { + ApplicationState appState = + ((RMStateStoreRemoveAppEvent) event).getAppState(); + ApplicationId appId = appState.getAppId(); + + LOG.info("Removing info for app: " + appId); + try { + removeApplicationState(appState); + } catch (Exception e) { + LOG.error("Error removing app: " + appId, e); + } + } + break; + default: + LOG.error("Unknown RMStateStoreEvent type: " + event.getType()); + } + } + + @SuppressWarnings("unchecked") + /** + * In (@link storeApplicationAttempt}, derived class can call this method to + * notify the application attempt about operation completion + * @param appAttempt attempt that has been saved + */ + private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId, + Exception storedException) { + rmDispatcher.getEventHandler().handle( + new RMAppAttemptStoredEvent(attemptId, storedException)); + } + + /** + * EventHandler implementation which forward events to the FSRMStateStore + * This hides the EventHandle methods of the store from its public interface + */ + private final class ForwardingEventHandler + implements EventHandler { + + @Override + public void handle(RMStateStoreEvent event) { + handleStoreEvent(event); + } } + } \ No newline at end of file Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/package-info.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/package-info.java?rev=1432246&r1=1432245&r2=1432246&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/package-info.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/package-info.java Fri Jan 11 19:40:23 2013 @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + @InterfaceAudience.Private @InterfaceStability.Unstable package org.apache.hadoop.yarn.server.resourcemanager.recovery; Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java?rev=1432246&r1=1432245&r2=1432246&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java Fri Jan 11 19:40:23 2013 @@ -44,6 +44,12 @@ public interface RMApp extends EventHand * @return the {@link ApplicationId} for this {@link RMApp}. */ ApplicationId getApplicationId(); + + /** + * The application submission context for this {@link RMApp} + * @return the {@link ApplicationSubmissionContext} for this {@link RMApp} + */ + ApplicationSubmissionContext getApplicationSubmissionContext(); /** * The current state of the {@link RMApp}. Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1432246&r1=1432245&r2=1432246&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Fri Jan 11 19:40:23 2013 @@ -50,6 +50,9 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; @@ -66,7 +69,7 @@ import org.apache.hadoop.yarn.state.Stat import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.Records; -public class RMAppImpl implements RMApp { +public class RMAppImpl implements RMApp, Recoverable { private static final Log LOG = LogFactory.getLog(RMAppImpl.class); private static final String UNAVAILABLE = "N/A"; @@ -243,6 +246,11 @@ public class RMAppImpl implements RMApp public ApplicationId getApplicationId() { return this.applicationId; } + + @Override + public ApplicationSubmissionContext getApplicationSubmissionContext() { + return this.submissionContext; + } @Override public FinalApplicationStatus getFinalApplicationStatus() { @@ -512,9 +520,22 @@ public class RMAppImpl implements RMApp this.writeLock.unlock(); } } + + @Override + public void recover(RMState state) { + ApplicationState appState = state.getApplicationState().get(getApplicationId()); + LOG.info("Recovering app: " + getApplicationId() + " with " + + + appState.getAttemptCount() + " attempts"); + for(int i=0; i= app.maxRetries) { retryApp = false; msg = "Application " + app.getApplicationId() + " failed " + app.maxRetries + " times due to " + failedEvent.getDiagnostics() @@ -655,7 +678,7 @@ public class RMAppImpl implements RMApp } if (retryApp) { - app.createNewAttempt(); + app.createNewAttempt(true); return initialState; } else { LOG.info(msg); Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java?rev=1432246&r1=1432245&r2=1432246&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java Fri Jan 11 19:40:23 2013 @@ -39,9 +39,15 @@ public enum RMAppAttemptEventType { CONTAINER_ACQUIRED, CONTAINER_ALLOCATED, CONTAINER_FINISHED, + + // Source: RMStateStore + ATTEMPT_SAVED, // Source: Scheduler APP_REJECTED, APP_ACCEPTED, + + // Source: RMAttemptImpl.recover + RECOVER } Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1432246&r1=1432245&r2=1432246&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Fri Jan 11 19:40:23 2013 @@ -38,6 +38,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -57,6 +58,11 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -69,6 +75,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -85,7 +92,7 @@ import org.apache.hadoop.yarn.state.Stat import org.apache.hadoop.yarn.util.BuilderUtils; @SuppressWarnings({"unchecked", "rawtypes"}) -public class RMAppAttemptImpl implements RMAppAttempt { +public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { private static final Log LOG = LogFactory.getLog(RMAppAttemptImpl.class); @@ -153,12 +160,15 @@ public class RMAppAttemptImpl implements .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FAILED, RMAppAttemptEventType.REGISTERED, new UnexpectedAMRegisteredTransition()) + .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.RECOVERED, + RMAppAttemptEventType.RECOVER) // Transitions from SUBMITTED state .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FAILED, RMAppAttemptEventType.APP_REJECTED, new AppRejectedTransition()) .addTransition(RMAppAttemptState.SUBMITTED, - EnumSet.of(RMAppAttemptState.LAUNCHED, RMAppAttemptState.SCHEDULED), + EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, + RMAppAttemptState.SCHEDULED), RMAppAttemptEventType.APP_ACCEPTED, new ScheduleTransition()) .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.KILLED, @@ -170,12 +180,42 @@ public class RMAppAttemptImpl implements // Transitions from SCHEDULED State .addTransition(RMAppAttemptState.SCHEDULED, - RMAppAttemptState.ALLOCATED, + RMAppAttemptState.ALLOCATED_SAVING, RMAppAttemptEventType.CONTAINER_ALLOCATED, new AMContainerAllocatedTransition()) .addTransition(RMAppAttemptState.SCHEDULED, RMAppAttemptState.KILLED, RMAppAttemptEventType.KILL, new BaseFinalTransition(RMAppAttemptState.KILLED)) + + // Transitions from ALLOCATED_SAVING State + .addTransition(RMAppAttemptState.ALLOCATED_SAVING, + RMAppAttemptState.ALLOCATED, + RMAppAttemptEventType.ATTEMPT_SAVED, new AttemptStoredTransition()) + .addTransition(RMAppAttemptState.ALLOCATED_SAVING, + RMAppAttemptState.ALLOCATED_SAVING, + RMAppAttemptEventType.CONTAINER_ACQUIRED, + new ContainerAcquiredTransition()) + // App could be killed by the client. So need to handle this. + .addTransition(RMAppAttemptState.ALLOCATED_SAVING, + RMAppAttemptState.KILLED, + RMAppAttemptEventType.KILL, + new BaseFinalTransition(RMAppAttemptState.KILLED)) + + // Transitions from LAUNCHED_UNMANAGED_SAVING State + .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, + RMAppAttemptState.LAUNCHED, + RMAppAttemptEventType.ATTEMPT_SAVED, + new UnmanagedAMAttemptSavedTransition()) + // attempt should not try to register in this state + .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, + RMAppAttemptState.FAILED, + RMAppAttemptEventType.REGISTERED, + new UnexpectedAMRegisteredTransition()) + // App could be killed by the client. So need to handle this. + .addTransition(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, + RMAppAttemptState.KILLED, + RMAppAttemptEventType.KILL, + new BaseFinalTransition(RMAppAttemptState.KILLED)) // Transitions from ALLOCATED State .addTransition(RMAppAttemptState.ALLOCATED, @@ -279,11 +319,30 @@ public class RMAppAttemptImpl implements RMAppAttemptEventType.EXPIRE, RMAppAttemptEventType.REGISTERED, RMAppAttemptEventType.CONTAINER_ALLOCATED, + RMAppAttemptEventType.ATTEMPT_SAVED, + RMAppAttemptEventType.CONTAINER_FINISHED, + RMAppAttemptEventType.UNREGISTERED, + RMAppAttemptEventType.KILL, + RMAppAttemptEventType.STATUS_UPDATE)) + + // Transitions from RECOVERED State + .addTransition( + RMAppAttemptState.RECOVERED, + RMAppAttemptState.RECOVERED, + EnumSet.of(RMAppAttemptEventType.START, + RMAppAttemptEventType.APP_ACCEPTED, + RMAppAttemptEventType.APP_REJECTED, + RMAppAttemptEventType.EXPIRE, + RMAppAttemptEventType.LAUNCHED, + RMAppAttemptEventType.LAUNCH_FAILED, + RMAppAttemptEventType.REGISTERED, + RMAppAttemptEventType.CONTAINER_ALLOCATED, + RMAppAttemptEventType.CONTAINER_ACQUIRED, + RMAppAttemptEventType.ATTEMPT_SAVED, RMAppAttemptEventType.CONTAINER_FINISHED, RMAppAttemptEventType.UNREGISTERED, RMAppAttemptEventType.KILL, RMAppAttemptEventType.STATUS_UPDATE)) - .installTopology(); public RMAppAttemptImpl(ApplicationAttemptId appAttemptId, @@ -318,7 +377,7 @@ public class RMAppAttemptImpl implements @Override public ApplicationSubmissionContext getSubmissionContext() { return this.submissionContext; - } + } @Override public FinalApplicationStatus getFinalApplicationStatus() { @@ -494,6 +553,10 @@ public class RMAppAttemptImpl implements } } + private void setMasterContainer(Container container) { + masterContainer = container; + } + @Override public void handle(RMAppAttemptEvent event) { @@ -561,6 +624,21 @@ public class RMAppAttemptImpl implements } } + @Override + public void recover(RMState state) { + ApplicationState appState = + state.getApplicationState().get(getAppAttemptId().getApplicationId()); + ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId()); + assert attemptState != null; + setMasterContainer(attemptState.getMasterContainer()); + LOG.info("Recovered attempt: AppId: " + getAppAttemptId().getApplicationId() + + " AttemptId: " + getAppAttemptId() + + " MasterContainer: " + masterContainer); + setDiagnostics("Attempt recovered after RM restart"); + handle(new RMAppAttemptEvent(getAppAttemptId(), + RMAppAttemptEventType.RECOVER)); + } + private static class BaseTransition implements SingleArcTransition { @@ -625,13 +703,12 @@ public class RMAppAttemptImpl implements @Override public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - - // Send the acceptance to the app - appAttempt.eventHandler.handle(new RMAppEvent(event - .getApplicationAttemptId().getApplicationId(), - RMAppEventType.APP_ACCEPTED)); - if (!appAttempt.submissionContext.getUnmanagedAM()) { + // Send the acceptance to the app + appAttempt.eventHandler.handle(new RMAppEvent(event + .getApplicationAttemptId().getApplicationId(), + RMAppEventType.APP_ACCEPTED)); + // Request a container for the AM. ResourceRequest request = BuilderUtils.newResourceRequest( AM_CONTAINER_PRIORITY, "*", appAttempt.submissionContext @@ -647,35 +724,42 @@ public class RMAppAttemptImpl implements return RMAppAttemptState.SCHEDULED; } else { // RM not allocating container. AM is self launched. - // Directly go to LAUNCHED state - // Register with AMLivelinessMonitor - appAttempt.rmContext.getAMLivelinessMonitor().register( - appAttempt.applicationAttemptId); - return RMAppAttemptState.LAUNCHED; + RMStateStore store = appAttempt.rmContext.getStateStore(); + // save state and then go to LAUNCHED state + appAttempt.storeAttempt(store); + return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING; } } } - private static final class AMContainerAllocatedTransition extends BaseTransition { + private static final class AMContainerAllocatedTransition + extends BaseTransition { @Override public void transition(RMAppAttemptImpl appAttempt, - RMAppAttemptEvent event) { - + RMAppAttemptEvent event) { // Acquire the AM container from the scheduler. Allocation amContainerAllocation = appAttempt.scheduler.allocate( appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST); // Set the masterContainer - appAttempt.masterContainer = amContainerAllocation.getContainers().get( - 0); + appAttempt.setMasterContainer(amContainerAllocation.getContainers().get( + 0)); - // Send event to launch the AM Container - appAttempt.eventHandler.handle(new AMLauncherEvent( - AMLauncherEventType.LAUNCH, appAttempt)); + RMStateStore store = appAttempt.rmContext.getStateStore(); + appAttempt.storeAttempt(store); } } - + + private static final class AttemptStoredTransition extends BaseTransition { + @Override + public void transition(RMAppAttemptImpl appAttempt, + RMAppAttemptEvent event) { + appAttempt.checkAttemptStoreError(event); + appAttempt.launchAttempt(); + } + } + private static class BaseFinalTransition extends BaseTransition { private final RMAppAttemptState finalAttemptState; @@ -736,17 +820,34 @@ public class RMAppAttemptImpl implements } } - private static final class AMLaunchedTransition extends BaseTransition { + private static class AMLaunchedTransition extends BaseTransition { @Override public void transition(RMAppAttemptImpl appAttempt, - RMAppAttemptEvent event) { - + RMAppAttemptEvent event) { // Register with AMLivelinessMonitor - appAttempt.rmContext.getAMLivelinessMonitor().register( - appAttempt.applicationAttemptId); - + appAttempt.attemptLaunched(); } } + + private static final class UnmanagedAMAttemptSavedTransition + extends AMLaunchedTransition { + @Override + public void transition(RMAppAttemptImpl appAttempt, + RMAppAttemptEvent event) { + appAttempt.checkAttemptStoreError(event); + // Send the acceptance to the app + // Ideally this should have been done when the scheduler accepted the app. + // But its here because until the attempt is saved the client should not + // launch the unmanaged AM. Client waits for the app status to be accepted + // before doing so. So we have to delay the accepted state until we have + // completed storing the attempt + appAttempt.eventHandler.handle(new RMAppEvent(event + .getApplicationAttemptId().getApplicationId(), + RMAppEventType.APP_ACCEPTED)); + + super.transition(appAttempt, event); + } + } private static final class LaunchFailedTransition extends BaseFinalTransition { @@ -1040,4 +1141,37 @@ public class RMAppAttemptImpl implements this.readLock.unlock(); } } + + private void launchAttempt(){ + // Send event to launch the AM Container + eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this)); + } + + private void attemptLaunched() { + // Register with AMLivelinessMonitor + rmContext.getAMLivelinessMonitor().register(getAppAttemptId()); + } + + private void checkAttemptStoreError(RMAppAttemptEvent event) { + RMAppAttemptStoredEvent storeEvent = (RMAppAttemptStoredEvent) event; + if(storeEvent.getStoredException() != null) + { + // This needs to be handled for HA and give up master status if we got + // fenced + LOG.error("Failed to store attempt: " + getAppAttemptId(), + storeEvent.getStoredException()); + ExitUtil.terminate(1, storeEvent.getStoredException()); + } + } + + private void storeAttempt(RMStateStore store) { + // store attempt data in a non-blocking manner to prevent dispatcher + // thread starvation and wait for state to be saved + LOG.info("Storing attempt: AppId: " + + getAppAttemptId().getApplicationId() + + " AttemptId: " + + getAppAttemptId() + + " MasterContainer: " + masterContainer); + store.storeApplicationAttempt(this); + } } Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java?rev=1432246&r1=1432245&r2=1432246&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java Fri Jan 11 19:40:23 2013 @@ -19,6 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; public enum RMAppAttemptState { - NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING, - FINISHING, FINISHED, KILLED, + NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING, FINISHING, + FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, RECOVERED } Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java?rev=1432246&r1=1432245&r2=1432246&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java Fri Jan 11 19:40:23 2013 @@ -199,6 +199,11 @@ public class RMContainerImpl implements } @Override + public String toString() { + return containerId.toString(); + } + + @Override public void handle(RMContainerEvent event) { LOG.debug("Processing " + event.getContainerId() + " of type " + event.getType()); try { @@ -221,7 +226,7 @@ public class RMContainerImpl implements writeLock.unlock(); } } - + private static class BaseTransition implements SingleArcTransition { Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1432246&r1=1432245&r2=1432246&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Fri Jan 11 19:40:23 2013 @@ -177,7 +177,9 @@ public class FSLeafQueue extends FSQueue Collections.sort(appScheds, comparator); for (AppSchedulable sched: appScheds) { - return sched.assignContainer(node, reserved); + if (sched.getRunnable()) { + return sched.assignContainer(node, reserved); + } } return Resources.none(); Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1432246&r1=1432245&r2=1432246&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Fri Jan 11 19:40:23 2013 @@ -514,7 +514,6 @@ public class FairScheduler implements Re queue.addApp(schedulerApp); queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId()); - rootMetrics.submitApp(user, applicationAttemptId.getAttemptId()); applications.put(applicationAttemptId, schedulerApp); @@ -777,7 +776,8 @@ public class FairScheduler implements Re boolean assignedContainer = false; for (FSLeafQueue sched : scheds) { Resource assigned = sched.assignContainer(node, false); - if (Resources.greaterThan(assigned, Resources.none())) { + if (Resources.greaterThan(assigned, Resources.none()) || + node.getReservedContainer() != null) { eventLog.log("ASSIGN", nm.getHostName(), assigned); assignedContainers++; assignedContainer = true; Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1432246&r1=1432245&r2=1432246&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Fri Jan 11 19:40:23 2013 @@ -227,6 +227,9 @@ public class QueueManager { * Return whether a queue exists already. */ public boolean exists(String name) { + if (!name.startsWith(ROOT_QUEUE + ".") && !name.equals(ROOT_QUEUE)) { + name = ROOT_QUEUE + "." + name; + } synchronized (queues) { return queues.containsKey(name); } Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java?rev=1432246&r1=1432245&r2=1432246&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java Fri Jan 11 19:40:23 2013 @@ -276,21 +276,26 @@ public class DelegationTokenRenewer exte Collection > tokens = ts.getAllTokens(); long now = System.currentTimeMillis(); + // find tokens for renewal, but don't add timers until we know + // all renewable tokens are valid + Set dtrs = new HashSet(); for(Token token : tokens) { // first renew happens immediately if (token.isManaged()) { DelegationTokenToRenew dtr = new DelegationTokenToRenew(applicationId, token, getConfig(), now, shouldCancelAtEnd); - - addTokenToList(dtr); - - setTimerForTokenRenewal(dtr, true); - if (LOG.isDebugEnabled()) { - LOG.debug("Registering token for renewal for:" + - " service = " + token.getService() + - " for appId = " + applicationId); - } + renewToken(dtr); + dtrs.add(dtr); + } + } + for (DelegationTokenToRenew dtr : dtrs) { + addTokenToList(dtr); + setTimerForTokenRenewal(dtr); + if (LOG.isDebugEnabled()) { + LOG.debug("Registering token for renewal for:" + + " service = " + dtr.token.getService() + + " for appId = " + applicationId); } } } @@ -301,54 +306,49 @@ public class DelegationTokenRenewer exte */ private class RenewalTimerTask extends TimerTask { private DelegationTokenToRenew dttr; + private boolean cancelled = false; RenewalTimerTask(DelegationTokenToRenew t) { dttr = t; } @Override - public void run() { + public synchronized void run() { + if (cancelled) { + return; + } + Token token = dttr.token; try { - // need to use doAs so that http can find the kerberos tgt - dttr.expirationDate = UserGroupInformation.getLoginUser() - .doAs(new PrivilegedExceptionAction(){ - - @Override - public Long run() throws Exception { - return dttr.token.renew(dttr.conf); - } - }); - + renewToken(dttr); if (LOG.isDebugEnabled()) { LOG.debug("Renewing delegation-token for:" + token.getService() + "; new expiration;" + dttr.expirationDate); } - setTimerForTokenRenewal(dttr, false);// set the next one + setTimerForTokenRenewal(dttr);// set the next one } catch (Exception e) { LOG.error("Exception renewing token" + token + ". Not rescheduled", e); removeFailedDelegationToken(dttr); } } + + @Override + public synchronized boolean cancel() { + cancelled = true; + return super.cancel(); + } } /** * set task to renew the token */ - private - void setTimerForTokenRenewal(DelegationTokenToRenew token, - boolean firstTime) throws IOException { + private void setTimerForTokenRenewal(DelegationTokenToRenew token) + throws IOException { // calculate timer time - long now = System.currentTimeMillis(); - long renewIn; - if(firstTime) { - renewIn = now; - } else { - long expiresIn = (token.expirationDate - now); - renewIn = now + expiresIn - expiresIn/10; // little bit before the expiration - } + long expiresIn = token.expirationDate - System.currentTimeMillis(); + long renewIn = token.expirationDate - expiresIn/10; // little bit before the expiration // need to create new task every time TimerTask tTask = new RenewalTimerTask(token); @@ -357,6 +357,24 @@ public class DelegationTokenRenewer exte renewalTimer.schedule(token.timerTask, new Date(renewIn)); } + // renew a token + private void renewToken(final DelegationTokenToRenew dttr) + throws IOException { + // need to use doAs so that http can find the kerberos tgt + // NOTE: token renewers should be responsible for the correct UGI! + try { + dttr.expirationDate = UserGroupInformation.getLoginUser().doAs( + new PrivilegedExceptionAction(){ + @Override + public Long run() throws Exception { + return dttr.token.renew(dttr.conf); + } + }); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + // cancel a token private void cancelToken(DelegationTokenToRenew t) { if(t.shouldCancelAtEnd) { Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java?rev=1432246&r1=1432245&r2=1432246&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java Fri Jan 11 19:40:23 2013 @@ -84,12 +84,12 @@ class AppsBlock extends HtmlBlock { appsTableData.append("[\"") .append(appInfo.getAppId()).append("\",\"") - .append(StringEscapeUtils.escapeHtml(appInfo.getUser())) - .append("\",\"") - .append(StringEscapeUtils.escapeHtml(appInfo.getName())) - .append("\",\"") - .append(StringEscapeUtils.escapeHtml(appInfo.getQueue())) - .append("\",\"") + .append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml( + appInfo.getUser()))).append("\",\"") + .append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml( + appInfo.getName()))).append("\",\"") + .append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml( + appInfo.getQueue()))).append("\",\"") .append(appInfo.getStartTime()).append("\",\"") .append(appInfo.getFinishTime()).append("\",\"") .append(appInfo.getState()).append("\",\"") Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java?rev=1432246&r1=1432245&r2=1432246&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java Fri Jan 11 19:40:23 2013 @@ -20,13 +20,14 @@ package org.apache.hadoop.yarn.server.re import static org.apache.hadoop.yarn.util.StringHelper.join; import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_STATE; -import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR; -import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR_VALUE; +import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_PROGRESSBAR; +import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_PROGRESSBAR_VALUE; import java.util.Collection; import java.util.HashSet; import java.util.concurrent.ConcurrentMap; +import org.apache.commons.lang.StringEscapeUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -36,7 +37,6 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo; -import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY; @@ -86,44 +86,52 @@ public class FairSchedulerAppsBlock exte reqAppStates.add(RMAppState.valueOf(stateString)); } } + StringBuilder appsTableData = new StringBuilder("[\n"); for (RMApp app : apps.values()) { if (reqAppStates != null && !reqAppStates.contains(app.getState())) { continue; } AppInfo appInfo = new AppInfo(app, true); String percent = String.format("%.1f", appInfo.getProgress()); - String startTime = Times.format(appInfo.getStartTime()); - String finishTime = Times.format(appInfo.getFinishTime()); ApplicationAttemptId attemptId = app.getCurrentAppAttempt().getAppAttemptId(); int fairShare = fsinfo.getAppFairShare(attemptId); + //AppID numerical value parsed by parseHadoopID in yarn.dt.plugins.js + appsTableData.append("[\"") + .append(appInfo.getAppId()).append("\",\"") + .append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml( + appInfo.getUser()))).append("\",\"") + .append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml( + appInfo.getName()))).append("\",\"") + .append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml( + appInfo.getQueue()))).append("\",\"") + .append(fairShare).append("\",\"") + .append(appInfo.getStartTime()).append("\",\"") + .append(appInfo.getFinishTime()).append("\",\"") + .append(appInfo.getState()).append("\",\"") + .append(appInfo.getFinalStatus()).append("\",\"") + // Progress bar + .append("
").append("
") + .append("\",\"") + .append(appInfo.getTrackingUI()).append("\"],\n"); - tbody. - tr(). - td(). - br().$title(appInfo.getAppIdNum())._(). // for sorting - a(url("app", appInfo.getAppId()), appInfo.getAppId())._(). - td(appInfo.getUser()). - td(appInfo.getName()). - td(appInfo.getQueue()). - td("" + fairShare). - td(). - br().$title(String.valueOf(appInfo.getStartTime()))._(). - _(startTime)._(). - td(). - br().$title(String.valueOf(appInfo.getFinishTime()))._(). - _(finishTime)._(). - td(appInfo.getState()). - td(appInfo.getFinalStatus()). - td(). - br().$title(percent)._(). // for sorting - div(_PROGRESSBAR). - $title(join(percent, '%')). // tooltip - div(_PROGRESSBAR_VALUE). - $style(join("width:", percent, '%'))._()._()._(). - td(). - a(!appInfo.isTrackingUrlReady()? - "#" : appInfo.getTrackingUrlPretty(), appInfo.getTrackingUI())._()._(); } + if(appsTableData.charAt(appsTableData.length() - 2) == ',') { + appsTableData.delete(appsTableData.length()-2, appsTableData.length()-1); + } + appsTableData.append("]"); + html.script().$type("text/javascript"). + _("var appsTableData=" + appsTableData)._(); + tbody._()._(); } } Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java?rev=1432246&r1=1432245&r2=1432246&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java Fri Jan 11 19:40:23 2013 @@ -20,16 +20,18 @@ package org.apache.hadoop.yarn.server.re import static org.apache.hadoop.yarn.util.StringHelper.join; -import java.util.List; +import java.util.Collection; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerLeafQueueInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerQueueInfo; import org.apache.hadoop.yarn.webapp.ResponseInfo; import org.apache.hadoop.yarn.webapp.SubView; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.LI; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.UL; import org.apache.hadoop.yarn.webapp.view.HtmlBlock; import org.apache.hadoop.yarn.webapp.view.InfoBlock; @@ -48,16 +50,15 @@ public class FairSchedulerPage extends R @RequestScoped static class FSQInfo { - FairSchedulerInfo fsinfo; FairSchedulerQueueInfo qinfo; } - static class QueueInfoBlock extends HtmlBlock { - final FairSchedulerQueueInfo qinfo; + static class LeafQueueBlock extends HtmlBlock { + final FairSchedulerLeafQueueInfo qinfo; - @Inject QueueInfoBlock(ViewContext ctx, FSQInfo info) { + @Inject LeafQueueBlock(ViewContext ctx, FSQInfo info) { super(ctx); - qinfo = (FairSchedulerQueueInfo) info.qinfo; + qinfo = (FairSchedulerLeafQueueInfo)info.qinfo; } @Override @@ -81,6 +82,47 @@ public class FairSchedulerPage extends R } } + static class QueueBlock extends HtmlBlock { + final FSQInfo fsqinfo; + + @Inject QueueBlock(FSQInfo info) { + fsqinfo = info; + } + + @Override + public void render(Block html) { + Collection subQueues = fsqinfo.qinfo.getChildQueues(); + UL ul = html.ul("#pq"); + for (FairSchedulerQueueInfo info : subQueues) { + float capacity = info.getMaxResourcesFraction(); + float fairShare = info.getFairShareFraction(); + float used = info.getUsedFraction(); + LI> li = ul. + li(). + a(_Q).$style(width(capacity * Q_MAX_WIDTH)). + $title(join("Fair Share:", percent(fairShare))). + span().$style(join(Q_GIVEN, ";font-size:1px;", width(fairShare/capacity))). + _('.')._(). + span().$style(join(width(used/capacity), + ";font-size:1px;left:0%;", used > fairShare ? Q_OVER : Q_UNDER)). + _('.')._(). + span(".q", info.getQueueName())._(). + span().$class("qstats").$style(left(Q_STATS_POS)). + _(join(percent(used), " used"))._(); + + fsqinfo.qinfo = info; + if (info instanceof FairSchedulerLeafQueueInfo) { + li.ul("#lq").li()._(LeafQueueBlock.class)._()._(); + } else { + li._(QueueBlock.class); + } + li._(); + } + + ul._(); + } + } + static class QueuesBlock extends HtmlBlock { final FairScheduler fs; final FSQInfo fsqinfo; @@ -89,8 +131,9 @@ public class FairSchedulerPage extends R fs = (FairScheduler)rm.getResourceScheduler(); fsqinfo = info; } - - @Override public void render(Block html) { + + @Override + public void render(Block html) { html._(MetricsOverviewTable.class); UL>> ul = html. div("#cs-wrapper.ui-widget"). @@ -106,8 +149,8 @@ public class FairSchedulerPage extends R span(".q", "default")._()._(); } else { FairSchedulerInfo sinfo = new FairSchedulerInfo(fs); - fsqinfo.fsinfo = sinfo; - fsqinfo.qinfo = null; + fsqinfo.qinfo = sinfo.getRootQueueInfo(); + float used = fsqinfo.qinfo.getUsedFraction(); ul. li().$style("margin-bottom: 1em"). @@ -120,29 +163,15 @@ public class FairSchedulerPage extends R _("Used (over fair share)")._(). span().$class("qlegend ui-corner-all ui-state-default"). _("Max Capacity")._(). - _(); - - List subQueues = fsqinfo.fsinfo.getQueueInfos(); - for (FairSchedulerQueueInfo info : subQueues) { - fsqinfo.qinfo = info; - float capacity = info.getMaxResourcesFraction(); - float fairShare = info.getFairShareFraction(); - float used = info.getUsedFraction(); - ul. - li(). - a(_Q).$style(width(capacity * Q_MAX_WIDTH)). - $title(join("Fair Share:", percent(fairShare))). - span().$style(join(Q_GIVEN, ";font-size:1px;", width(fairShare/capacity))). - _('.')._(). - span().$style(join(width(used/capacity), - ";font-size:1px;left:0%;", used > fairShare ? Q_OVER : Q_UNDER)). - _('.')._(). - span(".q", info.getQueueName())._(). - span().$class("qstats").$style(left(Q_STATS_POS)). - _(join(percent(used), " used"))._(). - ul("#lq").li()._(QueueInfoBlock.class)._()._(). - _(); - } + _(). + li(). + a(_Q).$style(width(Q_MAX_WIDTH)). + span().$style(join(width(used), ";left:0%;", + used > 1 ? Q_OVER : Q_UNDER))._(".")._(). + span(".q", "root")._(). + span().$class("qstats").$style(left(Q_STATS_POS)). + _(join(percent(used), " used"))._(). + _(QueueBlock.class)._(); } ul._()._(). script().$type("text/javascript"). @@ -159,13 +188,16 @@ public class FairSchedulerPage extends R "#cs a { font-weight: normal; margin: 2px; position: relative }", "#cs a span { font-weight: normal; font-size: 80% }", "#cs-wrapper .ui-widget-header { padding: 0.2em 0.5em }", + ".qstats { font-weight: normal; font-size: 80%; position: absolute }", + ".qlegend { font-weight: normal; padding: 0 1em; margin: 1em }", "table.info tr th {width: 50%}")._(). // to center info table script("/static/jt/jquery.jstree.js"). script().$type("text/javascript"). _("$(function() {", " $('#cs a span').addClass('ui-corner-all').css('position', 'absolute');", " $('#cs').bind('loaded.jstree', function (e, data) {", - " data.inst.open_all(); }).", + " data.inst.open_node('#pq', true);", + " }).", " jstree({", " core: { animation: 188, html_titles: true },", " plugins: ['themeroller', 'html_data', 'ui'],", @@ -175,8 +207,9 @@ public class FairSchedulerPage extends R " });", " $('#cs').bind('select_node.jstree', function(e, data) {", " var q = $('.q', data.rslt.obj).first().text();", - " if (q == 'root') q = '';", - " $('#apps').dataTable().fnFilter(q, 3);", + " if (q == 'root') q = '';", + " else q = '^' + q.substr(q.lastIndexOf('.') + 1) + '$';", + " $('#apps').dataTable().fnFilter(q, 3, true);", " });", " $('#cs').show();", "});")._(); @@ -197,4 +230,19 @@ public class FairSchedulerPage extends R static String left(float f) { return String.format("left:%.1f%%", f * 100); } + + @Override + protected String getAppsTableColumnDefs() { + StringBuilder sb = new StringBuilder(); + return sb + .append("[\n") + .append("{'sType':'numeric', 'aTargets': [0]") + .append(", 'mRender': parseHadoopID }") + + .append("\n, {'sType':'numeric', 'aTargets': [5, 6]") + .append(", 'mRender': renderHadoopDate }") + + .append("\n, {'sType':'numeric', bSearchable:false, 'aTargets': [9]") + .append(", 'mRender': parseHadoopProgress }]").toString(); + } } Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmView.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmView.java?rev=1432246&r1=1432245&r2=1432246&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmView.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmView.java Fri Jan 11 19:40:23 2013 @@ -66,7 +66,17 @@ public class RmView extends TwoColumnLay .append(", bDeferRender: true") .append(", bProcessing: true") - .append("\n, aoColumnDefs: [\n") + .append("\n, aoColumnDefs: ") + .append(getAppsTableColumnDefs()) + + // Sort by id upon page load + .append(", aaSorting: [[0, 'desc']]}").toString(); + } + + protected String getAppsTableColumnDefs() { + StringBuilder sb = new StringBuilder(); + return sb + .append("[\n") .append("{'sType':'numeric', 'aTargets': [0]") .append(", 'mRender': parseHadoopID }") @@ -74,9 +84,6 @@ public class RmView extends TwoColumnLay .append(", 'mRender': renderHadoopDate }") .append("\n, {'sType':'numeric', bSearchable:false, 'aTargets': [8]") - .append(", 'mRender': parseHadoopProgress }]") - - // Sort by id upon page load - .append(", aaSorting: [[0, 'desc']]}").toString(); + .append(", 'mRender': parseHadoopProgress }]").toString(); } }