flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <till.rohrm...@gmail.com>
Subject Re: YARN JobManager HA using wrong network interface
Date Thu, 03 Mar 2016 11:29:25 GMT
No I don't think this behaviour has been introduced by HA. That is the
default behaviour we used for a long time. If you think we should still
change it, then I can open an issue for it.

On Thu, Mar 3, 2016 at 12:20 PM, Stephan Ewen <sewen@apache.org> wrote:

> Okay, that is a change from the original behavior, introduced in HA.
> Originally, if the connection attempts failed, it always returned the InetAddress.getLocalHost()
> interface.
> I think we should change it back to that, because that interface is by far
> the best possible heuristic.
>
> On Thu, Mar 3, 2016 at 11:39 AM, Till Rohrmann <trohrmann@apache.org>
> wrote:
>
>> If I’m not mistaken, then it’s not necessarily true that the heuristic
>> returns InetAddress.getLocalHost() in all cases. The heuristic will
>> select the first network interface with the afore-mentioned conditions but
>> before returning it, it will try a last time to connect to the JM via the
>> interface bound to InetAddress.getLocalHost(). However, if this fails,
>> then the heuristically selected network interface will be returned.
>> ​
>>
>> On Thu, Mar 3, 2016 at 10:49 AM, Stephan Ewen <sewen@apache.org> wrote:
>>
>>> If the ThasManager cannot connect to the JobManager, it will use the
>>> interface that is bound to the machine's host name
>>> ("InetAddress.getLocalHost()").
>>>
>>> So, the best way to fix this would be to make sure that all machines
>>> have a proper network configuration. Then Flink would either use an address
>>> that can connect (via trying various interfaces), or it would default back
>>> to the hostname/interface that is configured on the machine.
>>>
>>>
>>> On Thu, Mar 3, 2016 at 10:43 AM, Till Rohrmann <trohrmann@apache.org>
>>> wrote:
>>>
>>>> Hi Max,
>>>>
>>>> the problem is that before starting the TM, we have to find the network
>>>> interface which is reachable by the other machines. So what we do is to
>>>> connect to the current JobManager. If it should happen, as in your case,
>>>> that the JobManager just died and the new JM address has not been written
>>>> to ZooKeeper, then the TMs don’t have much choice other than using the
>>>> heuristic.
>>>>
>>>> I can’t really tell why eth1 is chosen over eth0. The condition is that
>>>> the interface address is an Inet4Address, no link local address as
>>>> well as not a loopback address.
>>>>
>>>> Thus, Ufuk’s solution, to increase akka.lookup.timeout seems to be the
>>>> easiest solution to solve your problem. I’ve checked the default value
is
>>>> set to 10 s which might be a bit too low for restarting a new JM and
>>>> publishing its address via ZooKeeper.
>>>>
>>>> Cheers,
>>>> Till
>>>> ​
>>>>
>>>> On Thu, Mar 3, 2016 at 10:28 AM, Ufuk Celebi <uce@apache.org> wrote:
>>>>
>>>>> I had an offline chat with Till about this. He pointed out that the
>>>>> address is chosen once at start up time (while not being able to
>>>>> connect to the old job manager) and then it stays fixed at eth1.
>>>>>
>>>>> You can increase the lookup timeout by setting akka.lookup.timeout to
>>>>> a higher value (like 100 s). This is the only workaroud I'm aware of
>>>>> at this point. Maybe Till can chime in here whether this has other
>>>>> implications as well?
>>>>>
>>>>> – Ufuk
>>>>>
>>>>> On Thu, Mar 3, 2016 at 9:59 AM, Ufuk Celebi <uce@apache.org> wrote:
>>>>> > Hey Max!
>>>>> >
>>>>> > for the first WARN in
>>>>> > org.apache.flink.runtime.webmonitor.JobManagerRetriever: this is
>>>>> > expected if the new leader has not updated ZooKeeper yet. The
>>>>> > important thing is that the new leading job manager is eventually
>>>>> > retrieved. This did happen, right?
>>>>> >
>>>>> > Regarding eth1 vs. eth0: After the new job manager becomes leader,
>>>>> the
>>>>> > task manager should re-try connecting to it with the same strategy
as
>>>>> > in the initial connection establishment (e.g. try SLOW first and
only
>>>>> > fall back to HEURISTIC). Can you see in the logs whether this
>>>>> happens?
>>>>> >
>>>>> > The best thing would be to share the complete logs. Is this possible?
>>>>> > If not publicly, feel free to send it to me privately (uce at apache
>>>>> > org).
>>>>> >
>>>>> > – Ufuk
>>>>> >
>>>>> >
>>>>> > On Thu, Mar 3, 2016 at 9:21 AM, Maximilian Bode
>>>>> > <maximilian.bode@tngtech.com> wrote:
>>>>> >> Hi everyone,
>>>>> >>
>>>>> >> we are trying to get to work JobManager HA in the context of
a
>>>>> per-job YARN
>>>>> >> session using the 1.0.0-rc3 from a few days ago and are having
a
>>>>> problem
>>>>> >> concerning task managers with several network interfaces.
>>>>> >>
>>>>> >> After manually killing the job manager process, the jobmanager.log
>>>>> on the
>>>>> >> newly allocated second job manager reads:
>>>>> >> ---
>>>>> >> 2016-03-02 18:01:09,635 WARN  Remoting
>>>>> >> - Tried to associate with unreachable remote address
>>>>> >> [akka.tcp://flink@10.127.68.136:34811]. Address is now gated
for
>>>>> 5000 ms,
>>>>> >> all messages to this address will be delivered to dead letters.
>>>>> Reason:
>>>>> >> Connection refused: /10.127.68.136:34811
>>>>> >> 2016-03-02 18:01:09,644 WARN
>>>>> >> org.apache.flink.runtime.webmonitor.JobManagerRetriever    
  -
>>>>> Failed to
>>>>> >> retrieve leader gateway and port.
>>>>> >> akka.actor.ActorNotFound: Actor not found for:
>>>>> >> ActorSelection[Anchor(akka.tcp://flink@10.127.68.136:34811/),
>>>>> >> Path(/user/jobmanager)]
>>>>> >> at
>>>>> >>
>>>>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
>>>>> >> at
>>>>> >>
>>>>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
>>>>> >> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>>>>> >> at
>>>>> >>
>>>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
>>>>> >> at
>>>>> >>
>>>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
>>>>> >> at
>>>>> >>
>>>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>>>>> >> at
>>>>> >>
>>>>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>>>>> >> at
>>>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>>>> >> at
>>>>> akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
>>>>> >> at
>>>>> >>
>>>>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
>>>>> >> at
>>>>> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
>>>>> >> at
>>>>> >>
>>>>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
>>>>> >> at
>>>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>>>>> >> at
>>>>> >>
>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>>>>> >> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
>>>>> >> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
>>>>> >> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
>>>>> >> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
>>>>> >> at
>>>>> >>
>>>>> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
>>>>> >> at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
>>>>> >> at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
>>>>> >> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
>>>>> >> at
>>>>> >>
>>>>> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
>>>>> >> at
>>>>> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
>>>>> >> at akka.actor.ActorCell.terminate(ActorCell.scala:369)
>>>>> >> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
>>>>> >> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
>>>>> >> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
>>>>> >> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>>>> >> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>>>> >> at
>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>> >> at
>>>>> >>
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>> >> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>> >> at
>>>>> >>
>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>> >> ---
>>>>> >> where the IP not found is from the old job manager. So far,
is this
>>>>> the
>>>>> >> expected behavior?
>>>>> >>
>>>>> >> The problem then arises on a new task manager, which also tries
to
>>>>> connect
>>>>> >> to the old job manager unsuccessfully. The
>>>>> ZooKeeperLeaderRetrievalService
>>>>> >> starts cycling through the available network interfaces, as
can be
>>>>> seen in
>>>>> >> the relevant taskmanager.log:
>>>>> >> ---
>>>>> >> 2016-03-02 18:01:13,636 INFO
>>>>> >>
>>>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
 -
>>>>> >> Starting ZooKeeperLeaderRetrievalService.
>>>>> >> 2016-03-02 18:01:13,646 INFO
>>>>> >> org.apache.flink.runtime.util.LeaderRetrievalUtils         
  -
>>>>> Trying to
>>>>> >> select the network interface and address to use by connecting
to
>>>>> the leading
>>>>> >> JobManager.
>>>>> >> 2016-03-02 18:01:13,646 INFO
>>>>> >> org.apache.flink.runtime.util.LeaderRetrievalUtils         
  -
>>>>> TaskManager
>>>>> >> will try to connect for 10000 milliseconds before falling back
to
>>>>> heuristics
>>>>> >> 2016-03-02 18:01:13,712 INFO
>>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>>> >> - Retrieved new target address /10.127.68.136:34811.
>>>>> >> 2016-03-02 18:01:14,079 INFO
>>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>>> >> - Trying to connect to address /10.127.68.136:34811
>>>>> >> 2016-03-02 18:01:14,082 INFO
>>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>>> >> - Failed to connect from address
>>>>> >> 'task.manager.eth0.hostname.com/10.127.68.136': Connection refused
>>>>> >> 2016-03-02 18:01:14,082 INFO
>>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>>> >> - Failed to connect from address '/10.127.68.136': Connection
>>>>> refused
>>>>> >> 2016-03-02 18:01:14,082 INFO
>>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>>> >> - Failed to connect from address '/10.120.193.110': Connection
>>>>> refused
>>>>> >> 2016-03-02 18:01:14,082 INFO
>>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>>> >> - Failed to connect from address '/10.127.68.136': Connection
>>>>> refused
>>>>> >> 2016-03-02 18:01:14,083 INFO
>>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>>> >> - Failed to connect from address '/127.0.0.1': Connection refused
>>>>> >> 2016-03-02 18:01:14,083 INFO
>>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>>> >> - Failed to connect from address '/10.120.193.110': Connection
>>>>> refused
>>>>> >> 2016-03-02 18:01:14,083 INFO
>>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>>> >> - Failed to connect from address '/10.127.68.136': Connection
>>>>> refused
>>>>> >> 2016-03-02 18:01:14,083 INFO
>>>>> org.apache.flink.runtime.net.ConnectionUtils
>>>>> >> - Failed to connect from address '/127.0.0.1': Connection refused
>>>>> >> ---
>>>>> >> After five repetitions, the task manager stops trying to retrieve
>>>>> the leader
>>>>> >> and using the HEURISTIC strategy ends up using  eth1
>>>>> (10.120.193.110) from
>>>>> >> now on:
>>>>> >> ---
>>>>> >> 2016-03-02 18:01:23,650 INFO
>>>>> >>
>>>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
 -
>>>>> >> Stopping ZooKeeperLeaderRetrievalService.
>>>>> >> 2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ClientCnxn
>>>>> >> - EventThread shut down
>>>>> >> 2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ZooKeeper
>>>>> >> - Session: 0x25229757cff035b closed
>>>>> >> 2016-03-02 18:01:23,664 INFO
>>>>> >> org.apache.flink.runtime.taskmanager.TaskManager           
  -
>>>>> TaskManager
>>>>> >> will use hostname/address 'task.manager.eth1.hostname.com'
>>>>> (10.120.193.110)
>>>>> >> for communication.
>>>>> >> ---
>>>>> >> Following the new jobmanager is discovered and the taskmanager
is
>>>>> able to
>>>>> >> register at the jobmanager using eth1. The problem is that
>>>>> connections TO
>>>>> >> eth1 are not possible. So flink should always use eth0. The
>>>>> exception we
>>>>> >> later see is:
>>>>> >> ---
>>>>> >> java.io.IOException: Connecting the channel failed: Connecting
to
>>>>> remote
>>>>> >> task manager + 'other.task.manager.eth1.hostname/
>>>>> 10.120.193.111:46620' has
>>>>> >> failed. This might indicate that the remote task manager has
been
>>>>> lost.
>>>>> >> at
>>>>> >>
>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
>>>>> >> at
>>>>> >>
>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
>>>>> >> at
>>>>> >>
>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
>>>>> >> at
>>>>> >>
>>>>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
>>>>> >> at
>>>>> >>
>>>>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115)
>>>>> >> at
>>>>> >>
>>>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:388)
>>>>> >> at
>>>>> >>
>>>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:411)
>>>>> >> at
>>>>> >>
>>>>> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)
>>>>> >> at
>>>>> >>
>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
>>>>> >> at
>>>>> >>
>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>>>>> >> at
>>>>> >>
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>>>>> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>> >> at java.lang.Thread.run(Thread.java:744)
>>>>> >> ---
>>>>> >> The root cause seems to be that network interface selection
is
>>>>> still using
>>>>> >> the old jobmanager location and hence is not able to choose
the
>>>>> right
>>>>> >> interface. In particular, it seems that iteration order over
the
>>>>> network
>>>>> >> interfaces differs between the HEURISTIC and SLOW strategy,
which
>>>>> then leads
>>>>> >> to the wrong interface being selected.
>>>>> >>
>>>>> >> Cheers,
>>>>> >>  Max
>>>>> >> —
>>>>> >> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>>>>> >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>> >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert
Dahlke
>>>>> >> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>> >>
>>>>>
>>>>
>>>>
>>>
>>
>

Mime
View raw message