hadoop-yarn-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "NING DING (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (YARN-4398) Yarn recover functionality causes the cluster running slowly and the cluster usage rate is far below 100
Date Mon, 30 Nov 2015 03:41:11 GMT

     [ https://issues.apache.org/jira/browse/YARN-4398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

NING DING updated YARN-4398:
----------------------------
    Description: 
In my hadoop cluster, the resourceManager recover functionality is enabled with FileSystemRMStateStore.
I found this cause the yarn cluster running slowly and cluster usage rate is just 50 even
there are many pending Apps. 

The scenario is below.
In thread A, the RMAppImpl$RMAppNewlySavingTransition is calling storeNewApplication method
defined in RMStateStore. This storeNewApplication method is synchronized.
{code:title=RMAppImpl.java|borderStyle=solid}
  private static final class RMAppNewlySavingTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {

      // If recovery is enabled then store the application information in a
      // non-blocking call so make sure that RM has stored the information
      // needed to restart the AM after RM restart without further client
      // communication
      LOG.info("Storing application with id " + app.applicationId);
      app.rmContext.getStateStore().storeNewApplication(app);
    }
  }
{code}
{code:title=RMStateStore.java|borderStyle=solid}
public synchronized void storeNewApplication(RMApp app) {
    ApplicationSubmissionContext context = app
                                            .getApplicationSubmissionContext();
    assert context instanceof ApplicationSubmissionContextPBImpl;
    ApplicationStateData appState =
        ApplicationStateData.newInstance(
            app.getSubmitTime(), app.getStartTime(), context, app.getUser());
    dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
  }
{code}
In thread B, the FileSystemRMStateStore is calling storeApplicationStateInternal method. It's
also synchronized.
This storeApplicationStateInternal method saves an ApplicationStateData into HDFS and it normally
costs 90~300 milliseconds in my hadoop cluster.
{code:title=FileSystemRMStateStore.java|borderStyle=solid}
public synchronized void storeApplicationStateInternal(ApplicationId appId,
      ApplicationStateData appStateDataPB) throws Exception {
    Path appDirPath = getAppDir(rmAppRoot, appId);
    mkdirsWithRetries(appDirPath);
    Path nodeCreatePath = getNodePath(appDirPath, appId.toString());

    LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
    byte[] appStateData = appStateDataPB.getProto().toByteArray();
    try {
      // currently throw all exceptions. May need to respond differently for HA
      // based on whether we have lost the right to write to FS
      writeFileWithRetries(nodeCreatePath, appStateData, true);
    } catch (Exception e) {
      LOG.info("Error storing info for app: " + appId, e);
      throw e;
    }
  }
{code}
Think thread B firstly come into FileSystemRMStateStore.storeApplicationStateInternal method,
then thread A must be blocked for a while because of synchronization.

In ResourceManager there is only one RMStateStore instance. In my cluster it's FileSystemRMStateStore
type.
Debug the RMAppNewlySavingTransition.transition method, the thread stack shows it called form
AsyncDispatcher.dispatch method. This method code is as below. 
{code:title=AsyncDispatcher.java|borderStyle=solid}
  protected void dispatch(Event event) {
    //all events go thru this loop
    if (LOG.isDebugEnabled()) {
      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
          + event.toString());
    }

    Class<? extends Enum> type = event.getType().getDeclaringClass();

    try{
      EventHandler handler = eventDispatchers.get(type);
      if(handler != null) {
        handler.handle(event);
      } else {
        throw new Exception("No handler for registered for " + type);
      }
    } catch (Throwable t) {
      //TODO Maybe log the state of the queue
      LOG.fatal("Error in dispatcher thread", t);
      // If serviceStop is called, we should exit this thread gracefully.
      if (exitOnDispatchException
          && (ShutdownHookManager.get().isShutdownInProgress()) == false
          && stopped == false) {
        Thread shutDownThread = new Thread(createShutDownThread());
        shutDownThread.setName("AsyncDispatcher ShutDown handler");
        shutDownThread.start();
      }
    }
  }
{code}
Above code shows AsyncDispatcher.dispatch method can process different type events.
In fact this AsyncDispatcher instance is just ResourceManager.rmDispatcher created in ResourceManager.serviceInit
method.
You can find many eventTypes and handlers are registered in ResourceManager.rmDispatcher.
In above scenario thread B blocks thread A, then many following events processing are blocked.

In my testing cluster, there is only one queue and the client submits 1000 applications concurrently,
the yarn cluster usage rate is 50. Many apps are pending. 
If I disable resourceManager recover functionality, the cluster usage can be 100.

  was:
In my hadoop cluster, the resourceManager recover functionality is enabled with FileSystemRMStateStore.
I found this cause the yarn cluster running slowly and cluster usage rate is just 50 even
there are many pending Apps. 

The scenario is below.
In thread A, the RMAppImpl$RMAppNewlySavingTransition is calling storeNewApplication method
defined in RMStateStore. This storeNewApplication method is synchronized.
{code:title=RMAppImpl.java|borderStyle=solid}
  private static final class RMAppNewlySavingTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {

      // If recovery is enabled then store the application information in a
      // non-blocking call so make sure that RM has stored the information
      // needed to restart the AM after RM restart without further client
      // communication
      LOG.info("Storing application with id " + app.applicationId);
      app.rmContext.getStateStore().storeNewApplication(app);
    }
  }
{code}
{code:title=RMStateStore.java|borderStyle=solid}
public synchronized void storeNewApplication(RMApp app) {
    ApplicationSubmissionContext context = app
                                            .getApplicationSubmissionContext();
    assert context instanceof ApplicationSubmissionContextPBImpl;
    ApplicationStateData appState =
        ApplicationStateData.newInstance(
            app.getSubmitTime(), app.getStartTime(), context, app.getUser());
    dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
  }
{code}
In thread B, the FileSystemRMStateStore is calling storeApplicationStateInternal method. It's
also synchronized.
This storeApplicationStateInternal method saves an ApplicationStateData into HDFS and it normally
costs 90~300 milliseconds in my hadoop cluster.
{code:title=FileSystemRMStateStore.java|borderStyle=solid}
public synchronized void storeApplicationStateInternal(ApplicationId appId,
      ApplicationStateData appStateDataPB) throws Exception {
    Path appDirPath = getAppDir(rmAppRoot, appId);
    mkdirsWithRetries(appDirPath);
    Path nodeCreatePath = getNodePath(appDirPath, appId.toString());

    LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
    byte[] appStateData = appStateDataPB.getProto().toByteArray();
    try {
      // currently throw all exceptions. May need to respond differently for HA
      // based on whether we have lost the right to write to FS
      writeFileWithRetries(nodeCreatePath, appStateData, true);
    } catch (Exception e) {
      LOG.info("Error storing info for app: " + appId, e);
      throw e;
    }
  }
{code}
Think thread B firstly come into FileSystemRMStateStore.storeApplicationStateInternal method,
then thread A must be blocked for a while because of synchronization.

In ResourceManager there is only one RMStateStore instance. In my cluster it's FileSystemRMStateStore
type.
Debug the RMAppNewlySavingTransition.transition method, the thread stack shows it called form
AsyncDispatcher.dispatch method. This method code is as below. 
{code:title=AsyncDispatcher.java|borderStyle=solid}
  protected void dispatch(Event event) {
    //all events go thru this loop
    if (LOG.isDebugEnabled()) {
      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
          + event.toString());
    }

    Class<? extends Enum> type = event.getType().getDeclaringClass();

    try{
      EventHandler handler = eventDispatchers.get(type);
      if(handler != null) {
        handler.handle(event);
      } else {
        throw new Exception("No handler for registered for " + type);
      }
    } catch (Throwable t) {
      //TODO Maybe log the state of the queue
      LOG.fatal("Error in dispatcher thread", t);
      // If serviceStop is called, we should exit this thread gracefully.
      if (exitOnDispatchException
          && (ShutdownHookManager.get().isShutdownInProgress()) == false
          && stopped == false) {
        Thread shutDownThread = new Thread(createShutDownThread());
        shutDownThread.setName("AsyncDispatcher ShutDown handler");
        shutDownThread.start();
      }
    }
  }
{code}
Above code shows AsyncDispatcher.dispatch method can process different type events.
In fact this AsyncDispatcher instance is just ResourceManager.rmDispatcher created in ResourceManager.serviceInit
method.
You can find many eventTypes and handlers are registered in ResourceManager.rmDispatcher.
In above scenario thread B blocks thread A, then many following events processing are blocked.

In my testing cluster, there is only one queue and the client submits 1000 applications concurrently,
the yarn cluster usage only can reach 50. Many apps are pending. 
If I disable resourceManager recover functionality, the cluster usage can be 100.

To solve this issue, I remove synchronized modifier on some methods in FileSystemRMStateStore.
In this way, the yarn cluster usage can be 100% and the whole cluster is good running.
Please see my attached patch.


> Yarn recover functionality causes the cluster running slowly and the cluster usage rate
is far below 100
> --------------------------------------------------------------------------------------------------------
>
>                 Key: YARN-4398
>                 URL: https://issues.apache.org/jira/browse/YARN-4398
>             Project: Hadoop YARN
>          Issue Type: Bug
>          Components: resourcemanager
>    Affects Versions: 2.7.1
>            Reporter: NING DING
>         Attachments: YARN-4398.1.patch, YARN-4398.2.patch
>
>
> In my hadoop cluster, the resourceManager recover functionality is enabled with FileSystemRMStateStore.
> I found this cause the yarn cluster running slowly and cluster usage rate is just 50
even there are many pending Apps. 
> The scenario is below.
> In thread A, the RMAppImpl$RMAppNewlySavingTransition is calling storeNewApplication
method defined in RMStateStore. This storeNewApplication method is synchronized.
> {code:title=RMAppImpl.java|borderStyle=solid}
>   private static final class RMAppNewlySavingTransition extends RMAppTransition {
>     @Override
>     public void transition(RMAppImpl app, RMAppEvent event) {
>       // If recovery is enabled then store the application information in a
>       // non-blocking call so make sure that RM has stored the information
>       // needed to restart the AM after RM restart without further client
>       // communication
>       LOG.info("Storing application with id " + app.applicationId);
>       app.rmContext.getStateStore().storeNewApplication(app);
>     }
>   }
> {code}
> {code:title=RMStateStore.java|borderStyle=solid}
> public synchronized void storeNewApplication(RMApp app) {
>     ApplicationSubmissionContext context = app
>                                             .getApplicationSubmissionContext();
>     assert context instanceof ApplicationSubmissionContextPBImpl;
>     ApplicationStateData appState =
>         ApplicationStateData.newInstance(
>             app.getSubmitTime(), app.getStartTime(), context, app.getUser());
>     dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
>   }
> {code}
> In thread B, the FileSystemRMStateStore is calling storeApplicationStateInternal method.
It's also synchronized.
> This storeApplicationStateInternal method saves an ApplicationStateData into HDFS and
it normally costs 90~300 milliseconds in my hadoop cluster.
> {code:title=FileSystemRMStateStore.java|borderStyle=solid}
> public synchronized void storeApplicationStateInternal(ApplicationId appId,
>       ApplicationStateData appStateDataPB) throws Exception {
>     Path appDirPath = getAppDir(rmAppRoot, appId);
>     mkdirsWithRetries(appDirPath);
>     Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
>     LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
>     byte[] appStateData = appStateDataPB.getProto().toByteArray();
>     try {
>       // currently throw all exceptions. May need to respond differently for HA
>       // based on whether we have lost the right to write to FS
>       writeFileWithRetries(nodeCreatePath, appStateData, true);
>     } catch (Exception e) {
>       LOG.info("Error storing info for app: " + appId, e);
>       throw e;
>     }
>   }
> {code}
> Think thread B firstly come into FileSystemRMStateStore.storeApplicationStateInternal
method, then thread A must be blocked for a while because of synchronization.
> In ResourceManager there is only one RMStateStore instance. In my cluster it's FileSystemRMStateStore
type.
> Debug the RMAppNewlySavingTransition.transition method, the thread stack shows it called
form AsyncDispatcher.dispatch method. This method code is as below. 
> {code:title=AsyncDispatcher.java|borderStyle=solid}
>   protected void dispatch(Event event) {
>     //all events go thru this loop
>     if (LOG.isDebugEnabled()) {
>       LOG.debug("Dispatching the event " + event.getClass().getName() + "."
>           + event.toString());
>     }
>     Class<? extends Enum> type = event.getType().getDeclaringClass();
>     try{
>       EventHandler handler = eventDispatchers.get(type);
>       if(handler != null) {
>         handler.handle(event);
>       } else {
>         throw new Exception("No handler for registered for " + type);
>       }
>     } catch (Throwable t) {
>       //TODO Maybe log the state of the queue
>       LOG.fatal("Error in dispatcher thread", t);
>       // If serviceStop is called, we should exit this thread gracefully.
>       if (exitOnDispatchException
>           && (ShutdownHookManager.get().isShutdownInProgress()) == false
>           && stopped == false) {
>         Thread shutDownThread = new Thread(createShutDownThread());
>         shutDownThread.setName("AsyncDispatcher ShutDown handler");
>         shutDownThread.start();
>       }
>     }
>   }
> {code}
> Above code shows AsyncDispatcher.dispatch method can process different type events.
> In fact this AsyncDispatcher instance is just ResourceManager.rmDispatcher created in
ResourceManager.serviceInit method.
> You can find many eventTypes and handlers are registered in ResourceManager.rmDispatcher.
> In above scenario thread B blocks thread A, then many following events processing are
blocked.
> In my testing cluster, there is only one queue and the client submits 1000 applications
concurrently, the yarn cluster usage rate is 50. Many apps are pending. 
> If I disable resourceManager recover functionality, the cluster usage can be 100.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message