flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Flink YARN app terminated before the client receives the result
Date Mon, 30 Mar 2020 09:48:52 GMT
I think we have to take a step back here. For per-job (YARN) mode, the 
general problem is that there are two systems that can do shutdown (and 
other things) and two clients. There is YARN and there is Flink, and 
Flink is YARN inside YARN, in a way. The solution, I think, is that 
cancellation for YARN mode should go though YARN, not through Flink. 
Then there can be no races or other issues with the cluster shutting 
down before it has a chance to send a response.

Btw, the same goes for "attached mode" where a client waits for job 
completion. IMO, this should also go through YARN and not the Flink REST 
client.

What do you think?

Best,
Aljoscha

On 20.03.20 15:15, Till Rohrmann wrote:
> Yes you are right that `thenAcceptAsync` only breaks the control flow but
> it does not guarantee that the `RestServer` has actually sent the response
> to the client. Maybe we also need something similar to FLINK-10309 [1]. The
> problem I see with this approach is that it makes all RestHandlers stateful.
> 
> [1] https://issues.apache.org/jira/browse/FLINK-10309
> 
> Cheers,
> Till
> 
> On Fri, Mar 20, 2020 at 2:26 PM DONG, Weike <kyledong@connect.hku.hk> wrote:
> 
>> Hi Tison & Till,
>>
>> Changing *thenAccept *into *thenAcceptAsync *in the
>> MiniDispatcher#cancelJob does not help to solve the problem in my
>> environment. However, I have found that adding a* Thread.sleep(2000) *before
>> the return of JobCancellationHandler#handleRequest solved the problem (at
>> least the symptom goes away). As this is only a dirty hack, I will try to
>> get a more decent solution to this problem.
>>
>> Sincerely,
>> Weike
>>
>> On Tue, Mar 17, 2020 at 11:11 PM tison <wander4096@gmail.com> wrote:
>>
>>> JIRA created as https://jira.apache.org/jira/browse/FLINK-16637
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Till Rohrmann <trohrmann@apache.org> 于2020年3月17日周二 下午5:57写道:
>>>
>>>>   @Tison could you create an issue to track the problem. Please also link
>>>> the uploaded log file for further debugging.
>>>>
>>>> I think the reason why it worked in Flink 1.9 could have been that we
>>>> had a async callback in the longer chain which broke the flow of execution
>>>> and allowed to send the response. This is no longer the case. As an easy
>>>> fix one could change thenAccept into thenAcceptAsync in the
>>>> MiniDispatcher#cancelJob. But this only solves symptoms. Maybe we should
>>>> think about allowing not only StatusHandler to close asynchronously. At the
>>>> moment we say that all other handler shut down immediately (see
>>>> AbstractHandler#closeHandlerAsync). But the problem with this change would
>>>> be that all handler would become stateful because they would need to
>>>> remember whether a request is currently ongoing or not.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Mar 17, 2020 at 9:01 AM DONG, Weike <kyledong@connect.hku.hk>
>>>> wrote:
>>>>
>>>>> Hi Tison & Till and all,
>>>>>
>>>>> I have uploaded the client, taskmanager and jobmanager log to Gist (
>>>>> https://gist.github.com/kylemeow/500b6567368316ec6f5b8f99b469a49f),
>>>>> and I
>>>>> can reproduce this bug every time when trying to cancel Flink 1.10 jobs
>>>>> on
>>>>> YARN.
>>>>>
>>>>> Besides, in earlier Flink versions like 1.9, the REST API for
>>>>> *cancelling
>>>>> job with a savepoint *sometimes throws exceptions to the client side
>>>>> due to
>>>>> early shutdown of the server, even though the savepoint was successfully
>>>>> completed by reviewing the log, however when using the newly introduced
>>>>> *stop* API, that bug disappeared, however, *cancel* API seems to be
>>>>> buggy
>>>>> now.
>>>>>
>>>>> Best,
>>>>> Weike
>>>>>
>>>>> On Tue, Mar 17, 2020 at 10:17 AM tison <wander4096@gmail.com> wrote:
>>>>>
>>>>>> edit: previously after the cancellation we have a longer call chain
to
>>>>>> #jobReachedGloballyTerminalState which does the archive job &
JM
>>>>> graceful
>>>>>> showdown, which might take some time so that ...
>>>>>>
>>>>>> Best,
>>>>>> tison.
>>>>>>
>>>>>>
>>>>>> tison <wander4096@gmail.com> 于2020年3月17日周二 上午10:13写道:
>>>>>>
>>>>>>> Hi Weike & Till,
>>>>>>>
>>>>>>> I agree with Till and it is also the analysis from my side. However,
>>>>> it
>>>>>>> seems even if we don't have FLINK-15116, it is still possible
that we
>>>>>>> complete the cancel future but the cluster got shutdown before
it
>>>>> properly
>>>>>>> delivered the response.
>>>>>>>
>>>>>>> There is one thing strange that this behavior almost reproducible,
it
>>>>>>> should be a possible order but not always. Maybe previous we
have to
>>>>>>> firstly cancel the job which has a long call chain so that it
>>>>> happens we
>>>>>>> have enough time to delivered the response.
>>>>>>>
>>>>>>> But the resolution looks like we introduce some
>>>>>>> synchronization/finalization logics that clear these outstanding
>>>>> future
>>>>>>> with best effort before the cluster(RestServer) down.
>>>>>>>
>>>>>>> Best,
>>>>>>> tison.
>>>>>>>
>>>>>>>
>>>>>>> Till Rohrmann <trohrmann@apache.org> 于2020年3月17日周二
上午4:12写道:
>>>>>>>
>>>>>>>> Hi Weike,
>>>>>>>>
>>>>>>>> could you share the complete logs with us? Attachments are
being
>>>>>>>> filtered out by the Apache mail server but it works if you
upload
>>>>> the logs
>>>>>>>> somewhere (e.g. https://gist.github.com/) and then share
the link
>>>>> with
>>>>>>>> us. Ideally you run the cluster with DEBUG log settings.
>>>>>>>>
>>>>>>>> I assume that you are running Flink 1.10, right?
>>>>>>>>
>>>>>>>> My suspicion is that this behaviour has been introduced with
>>>>> FLINK-15116
>>>>>>>> [1]. It looks as if we complete the shutdown future in
>>>>>>>> MiniDispatcher#cancelJob before we return the response to
the
>>>>>>>> RestClusterClient. My guess is that this triggers the shutdown
of
>>>>> the
>>>>>>>> RestServer which then is not able to serve the response to
the
>>>>> client. I'm
>>>>>>>> pulling in Aljoscha and Tison who introduced this change.
They
>>>>> might be
>>>>>>>> able to verify my theory and propose a solution for it.
>>>>>>>>
>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-15116
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>>
>>>>>>>> On Fri, Mar 13, 2020 at 7:50 AM DONG, Weike <
>>>>> kyledong@connect.hku.hk>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Yangze and all,
>>>>>>>>>
>>>>>>>>> I have tried numerous times, and this behavior persists.
>>>>>>>>>
>>>>>>>>> Below is the tail log of taskmanager.log:
>>>>>>>>>
>>>>>>>>> 2020-03-13 12:06:14.240 [flink-akka.actor.default-dispatcher-3]
>>>>> INFO
>>>>>>>>>   org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl
 -
>>>>> Free slot
>>>>>>>>> TaskSlot(index:0, state:ACTIVE, resource profile:
>>>>>>>>> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.503gb
>>>>>>>>> (1613968148 bytes), taskOffHeapMemory=0 bytes,
>>>>> managedMemory=1.403gb
>>>>>>>>> (1505922928 bytes), networkMemory=359.040mb (376480732
bytes)},
>>>>>>>>> allocationId: d3acaeac3db62454742e800b5410adfd, jobId:
>>>>>>>>> d0a674795be98bd2574d9ea3286801cb).
>>>>>>>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3]
>>>>> INFO
>>>>>>>>>   org.apache.flink.runtime.taskexecutor.JobLeaderService
 - Remove
>>>>> job
>>>>>>>>> d0a674795be98bd2574d9ea3286801cb from job leader monitoring.
>>>>>>>>> 2020-03-13 12:06:14.244 [flink-akka.actor.default-dispatcher-3]
>>>>> INFO
>>>>>>>>>   org.apache.flink.runtime.taskexecutor.TaskExecutor
 - Close
>>>>> JobManager
>>>>>>>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>>>>>>>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3]
>>>>> INFO
>>>>>>>>>   org.apache.flink.runtime.taskexecutor.TaskExecutor
 - Close
>>>>> JobManager
>>>>>>>>> connection for job d0a674795be98bd2574d9ea3286801cb.
>>>>>>>>> 2020-03-13 12:06:14.250 [flink-akka.actor.default-dispatcher-3]
>>>>> INFO
>>>>>>>>>   org.apache.flink.runtime.taskexecutor.JobLeaderService
 - Cannot
>>>>> reconnect
>>>>>>>>> to job d0a674795be98bd2574d9ea3286801cb because it is
not
>>>>> registered.
>>>>>>>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>>>>>>>>   org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED
SIGNAL
>>>>> 15:
>>>>>>>>> SIGTERM. Shutting down as requested.
>>>>>>>>> 2020-03-13 12:06:19.744 [SIGTERM handler] INFO
>>>>>>>>>   org.apache.flink.yarn.YarnTaskExecutorRunner  - RECEIVED
SIGNAL
>>>>> 15:
>>>>>>>>> SIGTERM. Shutting down as requested.
>>>>>>>>> 2020-03-13 12:06:19.745 [PermanentBlobCache shutdown
hook] INFO
>>>>>>>>>   org.apache.flink.runtime.blob.PermanentBlobCache  -
Shutting down
>>>>> BLOB
>>>>>>>>> cache
>>>>>>>>> 2020-03-13 12:06:19.749 [FileChannelManagerImpl-netty-shuffle
>>>>> shutdown
>>>>>>>>> hook] INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl
>>>>> -
>>>>>>>>> FileChannelManager removed spill file directory
>>>>>>>>>
>>>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-netty-shuffle-65cd4ebb-51f4-48a9-8e3c-43e431bca46d
>>>>>>>>> 2020-03-13 12:06:19.750 [TaskExecutorLocalStateStoresManager
>>>>> shutdown
>>>>>>>>> hook] INFO
>>>>>>>>>
>>>>> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
>>>>>>>>> Shutting down TaskExecutorLocalStateStoresManager.
>>>>>>>>> 2020-03-13 12:06:19.750 [TransientBlobCache shutdown
hook] INFO
>>>>>>>>>   org.apache.flink.runtime.blob.TransientBlobCache  -
Shutting down
>>>>> BLOB
>>>>>>>>> cache
>>>>>>>>> 2020-03-13 12:06:19.751 [FileChannelManagerImpl-io shutdown
hook]
>>>>> INFO
>>>>>>>>>   org.apache.flink.runtime.io.disk.FileChannelManagerImpl
 -
>>>>>>>>> FileChannelManager removed spill file directory
>>>>>>>>>
>>>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-io-67ad5c3a-aec6-42be-ab1f-0ce3841fc4bd
>>>>>>>>> 2020-03-13 12:06:19.752 [FileCache shutdown hook] INFO
>>>>>>>>>   org.apache.flink.runtime.filecache.FileCache  - removed
file cache
>>>>>>>>> directory
>>>>>>>>>
>>>>> /data/emr/yarn/local/usercache/hadoop/appcache/application_1562207369540_0135/flink-dist-cache-65075ee3-e009-4978-a9d8-ec010e6f4b31
>>>>>>>>>
>>>>>>>>> As the tail log of jobmanager.log is kind of lengthy,
I have
>>>>> attached
>>>>>>>>> it in this mail.
>>>>>>>>>
>>>>>>>>>  From what I have seen, the TaskManager and JobManager
shut down by
>>>>>>>>> themselves, however, I have noticed some Netty exceptions
(from
>>>>> the stack
>>>>>>>>> trace, it is part of the REST handler) like:
>>>>>>>>>
>>>>>>>>> ERROR
>>>>>>>>>
>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>>>>>>>>>   - Failed to submit a listener notification task. Event
loop shut
>>>>> down?
>>>>>>>>> java.util.concurrent.RejectedExecutionException: event
executor
>>>>>>>>> terminated
>>>>>>>>>
>>>>>>>>> Thus I suppose that these exceptions might be the actual
cause of
>>>>>>>>> premature termination of the REST server, and I am still
looking
>>>>> into the
>>>>>>>>> real cause of this.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Weike
>>>>>>>>>
>>>>>>>>> On Fri, Mar 13, 2020 at 1:45 PM Yangze Guo <karmagyz@gmail.com>
>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Would you mind to share more information about why
the task
>>>>> executor
>>>>>>>>>> is killed? If it is killed by Yarn, you might get
such info in
>>>>> Yarn
>>>>>>>>>> NM/RM logs.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Yangze Guo
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Yangze Guo
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Mar 13, 2020 at 12:31 PM DONG, Weike <
>>>>> kyledong@connect.hku.hk>
>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> Recently I have encountered a strange behavior
of Flink on YARN,
>>>>>>>>>> which is that when I try to cancel a Flink job running
in per-job
>>>>> mode on
>>>>>>>>>> YARN using commands like
>>>>>>>>>>>
>>>>>>>>>>> "cancel -m yarn-cluster -yid application_1559388106022_9412
>>>>>>>>>> ed7e2e0ab0a7316c1b65df6047bc6aae"
>>>>>>>>>>>
>>>>>>>>>>> the client happily found and connected to ResourceManager
and
>>>>> then
>>>>>>>>>> stucks at
>>>>>>>>>>> Found Web Interface 172.28.28.3:50099 of application
>>>>>>>>>> 'application_1559388106022_9412'.
>>>>>>>>>>>
>>>>>>>>>>> And after one minute, an exception is thrown
at the client side:
>>>>>>>>>>> Caused by: org.apache.flink.util.FlinkException:
Could not
>>>>> cancel
>>>>>>>>>> job ed7e2e0ab0a7316c1b65df6047bc6aae.
>>>>>>>>>>>      at
>>>>>>>>>>
>>>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:545)
>>>>>>>>>>>      at
>>>>>>>>>>
>>>>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>>>>>>>>>>>      at
>>>>>>>>>>
>>>>> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:538)
>>>>>>>>>>>      at
>>>>>>>>>>
>>>>> org.apache.flink.client.cli.CliFrontend.parseParametersWithException(CliFrontend.java:917)
>>>>>>>>>>>      at
>>>>>>>>>>
>>>>> org.apache.flink.client.cli.CliFrontend.lambda$mainWithReturnCodeAndException$10(CliFrontend.java:988)
>>>>>>>>>>>      at java.security.AccessController.doPrivileged(Native
>>>>> Method)
>>>>>>>>>>>      at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>>>>>>>      at
>>>>>>>>>>
>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>>>>>>>>>>>      ... 20 more
>>>>>>>>>>> Caused by: java.util.concurrent.TimeoutException
>>>>>>>>>>>      at
>>>>>>>>>>
>>>>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>>>>>>>>>>>      at
>>>>>>>>>>
>>>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>>>>>>>>>>>      at
>>>>>>>>>>
>>>>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$7(CliFrontend.java:543)
>>>>>>>>>>>      ... 27 more
>>>>>>>>>>>
>>>>>>>>>>> Then I discovered that the YARN app has already
terminated with
>>>>>>>>>> FINISHED state and KILLED final status, like below.
>>>>>>>>>>>
>>>>>>>>>>> And after digging into the log of this finished
YARN app, I have
>>>>>>>>>> found that TaskManager had already received the SIGTERM
signal and
>>>>>>>>>> terminated gracefully.
>>>>>>>>>>> org.apache.flink.yarn.YarnTaskExecutorRunner
 - RECEIVED SIGNAL
>>>>> 15:
>>>>>>>>>> SIGTERM. Shutting down as requested.
>>>>>>>>>>>
>>>>>>>>>>> Also, the log of JobManager shows that it terminated
with exit
>>>>> code
>>>>>>>>>> 0.
>>>>>>>>>>> Terminating cluster entrypoint process YarnJobClusterEntrypoint
>>>>> with
>>>>>>>>>> exit code 0
>>>>>>>>>>>
>>>>>>>>>>> However, the JobManager did not return anything
to the client
>>>>> before
>>>>>>>>>> its shutdown, which is different from previous versions
(like
>>>>> Flink 1.9).
>>>>>>>>>>>
>>>>>>>>>>> I wonder if this is a new bug on the flink-clients
or flink-yarn
>>>>>>>>>> module?
>>>>>>>>>>>
>>>>>>>>>>> Thank you : )
>>>>>>>>>>>
>>>>>>>>>>> Sincerely,
>>>>>>>>>>> Weike
>>>>>>>>>>
>>>>>>>>>
>>>>>
>>>>
> 


Mime
View raw message