flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shailesh Jain <shailesh.j...@stellapps.com>
Subject Re: Standalone cluster instability
Date Fri, 10 Aug 2018 04:59:15 GMT
Hi,

I hit a similar issue yesterday, the task manager died suspiciously, no
error logs in the task manager logs, but I see the following exceptions in
the job manager logs:

2018-08-05 18:03:28,322 ERROR
akka.remote.Remoting                                          - Association
to [akka.tcp://flink@localhost:34483] with UID [328996232] irrecoverably
failed. Quarantining address.
java.util.concurrent.TimeoutException: Remote system has been silent for
too long. (more than 48.0 hours)
        at
akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at
akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

but almost 3 days later it hit this:

2018-08-08 13:22:00,061 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
Internal state machine job (1057c13d169dae609466210174e2cc8b) switched from
state RUNNING to FAILING.
java.lang.Exception: TaskManager was lost/killed:
5ee5de1112776c404541743b63ae0fe0 @ localhost (dataPort=44997)
        at
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
        at
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)
        at
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
        at
org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
        at
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
        at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)
        at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
        at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
        at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at
akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
        at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
        at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
        at akka.actor.ActorCell.invoke(ActorCell.scala:494)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

followed by:

2018-08-08 13:22:20,090 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
Internal state machine job (1057c13d169dae609466210174e2cc8b) switched from
state RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Not enough free slots available to run the job. You can decrease the
operator parallelism or increase the number of slots per TaskManager in the
configuration. Task to schedule: < Attempt #2 (Source: Custom Source ->
Filter (1/1)) @ (unassigned) - [SCHEDULED] > with groupID <
fbd084243e87c3fdf3c709a0f2eecfd7 > in sharing group < SlotSharingGroup
[fa00013ef15454ea93d21e8c346e0dd4, fbd084243e87c3fdf3c709a0f2eecfd7,
8f5517c035f67da702f459ef5f3b849f] >. Resources available to scheduler:
Number of instances=0, total number of slots=0, available slots=0
        at
org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:263)
        at
org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:142)
        at
org.apache.flink.runtime.executiongraph.Execution.lambda$allocateAndAssignSlotForExecution$1(Execution.java:440)
        at
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981)
        at
java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124)
        at
org.apache.flink.runtime.executiongraph.Execution.allocateAndAssignSlotForExecution(Execution.java:438)
        at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.allocateResourcesForAll(ExecutionJobVertex.java:503)
        at
org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleEager(ExecutionGraph.java:900)
        at
org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:854)
        at
org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1175)
        at
org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59)
        at
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

There are no error logs in task manager, and following is the last memory
consumption log by task manager:

2018-08-08 13:19:23,341 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Memory
usage stats: [HEAP: 2329/3072/3072 MB, NON HEAP: 154/197/-1 MB
(used/committed/max)]
2018-08-08 13:19:23,341 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Direct
memory stats: Count: 115, Total Capacity: 38101792, Used Memory: 38101793
2018-08-08 13:19:23,341 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap
pool stats: [Code Cache: 52/55/240 MB (used/committed/max)], [Metaspace:
90/125/-1 MB (used/committed/max)], [Compressed Class Space: 11/17/1024 MB
(used/committed/max)]
2018-08-08 13:19:23,341 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Garbage
collector stats: [G1 Young Generation, GC TIME (ms): 300736, GC COUNT:
6574], [G1 Old Generation, GC TIME (ms): 152, GC COUNT: 2]

So I think it rules out OOM as a cause for this crash.

Any ideas/leads to debug this would be really helpful. The cluster is
running on version 1.4.2.

Thanks,
Shailesh

On Mon, Mar 26, 2018 at 4:18 PM, Alexander Smirnov <
alexander.smirnoff@gmail.com> wrote:

> Hi Piotr,
>
> I didn't find anything special in the logs before the failure.
> Here are the logs, please take a look:
>
> https://drive.google.com/drive/folders/1zlUDMpbO9xZjjJzf28lUX-bkn_
> x7QV59?usp=sharing
>
> The configuration is:
>
> 3 task managers:
> qafdsflinkw011.scl
> qafdsflinkw012.scl
> qafdsflinkw013.scl - lost connection
>
> 3 job  managers:
> qafdsflinkm011.scl - the leader
> qafdsflinkm012.scl
> qafdsflinkm013.scl
>
> 3 zookeepers:
> qafdsflinkzk011.scl
> qafdsflinkzk012.scl
> qafdsflinkzk013.scl
>
> Thank you,
> Alex
>
>
>
> On Wed, Mar 21, 2018 at 6:23 PM Piotr Nowojski <piotr@data-artisans.com>
> wrote:
>
>> Hi,
>>
>> Does the issue really happen after 48 hours?
>> Is there some indication of a failure in TaskManager log?
>>
>> If you will be still unable to solve the problem, please provide full
>> TaskManager and JobManager logs.
>>
>> Piotrek
>>
>> On 21 Mar 2018, at 16:00, Alexander Smirnov <alexander.smirnoff@gmail.com>
>> wrote:
>>
>> One more question - I see a lot of line like the following in the logs
>>
>> [2018-03-21 00:30:35,975] ERROR Association to [akka.tcp://flink@
>> qafdsflinkw811.nn.five9lab.com:35320] with UID [1500204560]
>> irrecoverably failed. Quarantining address. (akka.remote.Remoting)
>> [2018-03-21 00:34:15,208] WARN Association to [akka.tcp://flink@
>> qafdsflinkw811.nn.five9lab.com:41068] with unknown UID is irrecoverably
>> failed. Address cannot be quarantined without knowing the UID, gating
>> instead for 5000 ms. (akka.remote.Remoting)
>> [2018-03-21 00:34:15,235] WARN Association to [akka.tcp://flink@
>> qafdsflinkw811.nn.five9lab.com:40677] with unknown UID is irrecoverably
>> failed. Address cannot be quarantined without knowing the UID, gating
>> instead for 5000 ms. (akka.remote.Remoting)
>> [2018-03-21 00:34:15,256] WARN Association to [akka.tcp://flink@
>> qafdsflinkw811.nn.five9lab.com:40382] with unknown UID is irrecoverably
>> failed. Address cannot be quarantined without knowing the UID, gating
>> instead for 5000 ms. (akka.remote.Remoting)
>> [2018-03-21 00:34:15,256] WARN Association to [akka.tcp://flink@
>> qafdsflinkw811.nn.five9lab.com:44744] with unknown UID is irrecoverably
>> failed. Address cannot be quarantined without knowing the UID, gating
>> instead for 5000 ms. (akka.remote.Remoting)
>> [2018-03-21 00:34:15,266] WARN Association to [akka.tcp://flink@
>> qafdsflinkw811.nn.five9lab.com:42413] with unknown UID is irrecoverably
>> failed. Address cannot be quarantined without knowing the UID, gating
>> instead for 5000 ms. (akka.remote.Remoting)
>>
>>
>> The host is available, but I don't understand where port number comes
>> from. Task Manager uses another port (which is printed in logs on startup)
>> Could you please help to understand why it happens?
>>
>> Thank you,
>> Alex
>>
>>
>> On Wed, Mar 21, 2018 at 4:19 PM Alexander Smirnov <
>> alexander.smirnoff@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I've assembled a standalone cluster of 3 task managers and 3 job
>>> managers(and 3 ZK) following the instructions at
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-
>>> release-1.4/ops/deployment/cluster_setup.html and https://ci.apache.org/
>>> projects/flink/flink-docs-release-1.4/ops/jobmanager_
>>> high_availability.html
>>>
>>> It works ok, but randomly, task managers becomes unavailable. JobManager
>>> has exception like below in logs:
>>>
>>>
>>> [2018-03-19 00:33:10,211] WARN Association with remote system
>>> [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:42413] has failed,
>>> address is now gated for [5000] ms. Reason: [Association failed with
>>> [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:42413]] Caused by:
>>> [Connection refused: qafdsflinkw811.nn.five9lab.com/10.5.61.124:42413]
>>> (akka.remote.ReliableDeliverySupervisor)
>>> [2018-03-21 00:30:35,975] ERROR Association to [akka.tcp://flink@
>>> qafdsflinkw811.nn.five9lab.com:35320] with UID [1500204560]
>>> irrecoverably failed. Quarantining address. (akka.remote.Remoting)
>>> java.util.concurrent.TimeoutException: Remote system has been silent
>>> for too long. (more than 48.0 hours)
>>>         at akka.remote.ReliableDeliverySupervisor$$
>>> anonfun$idle$1.applyOrElse(Endpoint.scala:375)
>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>         at akka.remote.ReliableDeliverySupervisor.
>>> aroundReceive(Endpoint.scala:203)
>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(
>>> ForkJoinTask.java:260)
>>>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>>> runTask(ForkJoinPool.java:1339)
>>>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>>> ForkJoinPool.java:1979)
>>>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
>>> ForkJoinWorkerThread.java:107)
>>>
>>> I can't find a reason for this exception, any ideas?
>>>
>>> Thank you,
>>> Alex
>>>
>>
>>

Mime
View raw message