spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "angerszhu (Jira)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-36540) AM should not just finish with Success when dissconnected
Date Wed, 18 Aug 2021 08:04:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-36540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

angerszhu updated SPARK-36540:
------------------------------
    Description: 
We meet a case AM lose connection
{code}
21/08/18 02:14:15 ERROR TransportRequestHandler: Error sending result RpcResponse{requestId=5675952834716124039,
body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=64]}} to xx.xx.xx.xx:41420;
closing connection
java.nio.channels.ClosedChannelException
        at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
        at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
{code}

Check the code about client, when AMEndpoint dissconnected, will finish Application with SUCCESS
final status
{code}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
      // In cluster mode or unmanaged am case, do not rely on the disassociated event to exit
      // This avoids potentially reporting incorrect exit codes if the driver fails
      if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) {
        logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
        finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
      }
    }
{code}

Nomally in client mode, when application success, driver will stop and AM loss connection,
it's ok that exit with SUCCESS, but if there is a not work problem cause dissconnected. Still
finish with final status is not correct.
Then YarnClientSchedulerBackend will receive application report with final status with success
and stop SparkContext cause application failed but mark it as a normal stop.
{code}
  private class MonitorThread extends Thread {
    private var allowInterrupt = true

    override def run() {
      try {
        val YarnAppReport(_, state, diags) =
          client.monitorApplication(appId.get, logApplicationReport = false)
        logError(s"YARN application has exited unexpectedly with state $state! " +
          "Check the YARN application logs for more details.")
        diags.foreach { err =>
          logError(s"Diagnostics message: $err")
        }
        allowInterrupt = false
        sc.stop()
      } catch {
        case e: InterruptedException => logInfo("Interrupting monitor thread")
      }
    }

    def stopMonitor(): Unit = {
      if (allowInterrupt) {
        this.interrupt()
      }
    }
  }
{code}


  was:
We meet a case AM lose connection
{code}
21/08/18 02:14:15 ERROR TransportRequestHandler: Error sending result RpcResponse{requestId=5675952834716124039,
body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=64]}} to ip-10-128-152-193.idata-server.shopee.io/10.128.152.193:41420;
closing connection
java.nio.channels.ClosedChannelException
        at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
        at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
{code}

Check the code about client, when AMEndpoint dissconnected, will finish Application with SUCCESS
final status
{code}
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
      // In cluster mode or unmanaged am case, do not rely on the disassociated event to exit
      // This avoids potentially reporting incorrect exit codes if the driver fails
      if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) {
        logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
        finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
      }
    }
{code}

Nomally in client mode, when application success, driver will stop and AM loss connection,
it's ok that exit with SUCCESS, but if there is a not work problem cause dissconnected. Still
finish with final status is not correct.
Then YarnClientSchedulerBackend will receive application report with final status with success
and stop SparkContext cause application failed but mark it as a normal stop.
{code}
  private class MonitorThread extends Thread {
    private var allowInterrupt = true

    override def run() {
      try {
        val YarnAppReport(_, state, diags) =
          client.monitorApplication(appId.get, logApplicationReport = false)
        logError(s"YARN application has exited unexpectedly with state $state! " +
          "Check the YARN application logs for more details.")
        diags.foreach { err =>
          logError(s"Diagnostics message: $err")
        }
        allowInterrupt = false
        sc.stop()
      } catch {
        case e: InterruptedException => logInfo("Interrupting monitor thread")
      }
    }

    def stopMonitor(): Unit = {
      if (allowInterrupt) {
        this.interrupt()
      }
    }
  }
{code}



> AM should not just finish with Success when dissconnected
> ---------------------------------------------------------
>
>                 Key: SPARK-36540
>                 URL: https://issues.apache.org/jira/browse/SPARK-36540
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, YARN
>    Affects Versions: 3.2.0
>            Reporter: angerszhu
>            Priority: Major
>
> We meet a case AM lose connection
> {code}
> 21/08/18 02:14:15 ERROR TransportRequestHandler: Error sending result RpcResponse{requestId=5675952834716124039,
body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=64]}} to xx.xx.xx.xx:41420;
closing connection
> java.nio.channels.ClosedChannelException
>         at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
>         at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
>         at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
>         at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
>         at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>         at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
>         at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>         at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>         at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>         at java.lang.Thread.run(Thread.java:748)
> {code}
> Check the code about client, when AMEndpoint dissconnected, will finish Application with
SUCCESS final status
> {code}
> override def onDisconnected(remoteAddress: RpcAddress): Unit = {
>       // In cluster mode or unmanaged am case, do not rely on the disassociated event
to exit
>       // This avoids potentially reporting incorrect exit codes if the driver fails
>       if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) {
>         logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
>         finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
>       }
>     }
> {code}
> Nomally in client mode, when application success, driver will stop and AM loss connection,
it's ok that exit with SUCCESS, but if there is a not work problem cause dissconnected. Still
finish with final status is not correct.
> Then YarnClientSchedulerBackend will receive application report with final status with
success and stop SparkContext cause application failed but mark it as a normal stop.
> {code}
>   private class MonitorThread extends Thread {
>     private var allowInterrupt = true
>     override def run() {
>       try {
>         val YarnAppReport(_, state, diags) =
>           client.monitorApplication(appId.get, logApplicationReport = false)
>         logError(s"YARN application has exited unexpectedly with state $state! " +
>           "Check the YARN application logs for more details.")
>         diags.foreach { err =>
>           logError(s"Diagnostics message: $err")
>         }
>         allowInterrupt = false
>         sc.stop()
>       } catch {
>         case e: InterruptedException => logInfo("Interrupting monitor thread")
>       }
>     }
>     def stopMonitor(): Unit = {
>       if (allowInterrupt) {
>         this.interrupt()
>       }
>     }
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message