flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Vergilio, Thalita" <t.vergilio4...@student.leedsbeckett.ac.uk>
Subject Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes
Date Thu, 09 Nov 2017 22:04:24 GMT
Hi Till,

I have made some progress with the name resolution for machines that are not in the same subnet.
The problem I am facing now is Flink-specific, so I wonder if you could help me.

It is all running fine in a multi-cloud setup with the jobmanager in Azure and the taskmanager
in the Google cloud. However, when I scale the taskmanager up and it start running on Azure
nodes as well, I get an Akka error which I presume means the taskmanagers can't talk to each
other when parallelising the task.

Do you know what the IP address and port below are? Are they assigned by Flink?

Thank you very much.


java.lang.Exception: Cannot deploy task Source: Read(UnboundedKafkaSource) -> Flat Map
-> KafkaPuePipelineProcessor/Window.Into()/Window.Assign.out -> ParMultiDo(Anonymous)
-> ToKeyedWorkItem (2/3) (b9f31626fb7d83d39e24e570e034f03e) - TaskManager (3a9c37463c88510a44097df0c99b5f90
@ (dataPort=38963)) not responding after a timeout of 10000 ms
        at org.apache.flink.runtime.executiongraph.Execution$2.apply(Execution.java:437)
        at org.apache.flink.runtime.executiongraph.Execution$2.apply(Execution.java:429)
        at org.apache.flink.runtime.concurrent.impl.FlinkFuture$3.recover(FlinkFuture.java:201)
        at akka.dispatch.Recover.internal(Future.scala:268)
        at akka.dispatch.japi$RecoverBridge.apply(Future.scala:184)
        at akka.dispatch.japi$RecoverBridge.apply(Future.scala:182)
        at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
        at scala.util.Try$.apply(Try.scala:161)
        at scala.util.Failure.recover(Try.scala:185)
        at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
        at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@]]
after [10000 ms]
        at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
        at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
        at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
        at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
        at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
        at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
        at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
        at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
        ... 1 more

From: Till Rohrmann <trohrmann@apache.org>
Sent: 06 November 2017 13:48:59
To: Vergilio, Thalita
Cc: Piotr Nowojski; user@flink.apache.org; Patrick Lucas
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different

I'm not entirely sure how docker swarm works but from the Flink perspective there mustn't
be two TaskManagers running on the same host (meaning an entity where you share the same address)
if you set the TaskManager data port to a fixed value (otherwise only one of them can be started
due to port conflicts). If you can ensure that this is the case, then it should be save to
specify a port for the data transmission.


On Mon, Nov 6, 2017 at 2:37 PM, Vergilio, Thalita <t.vergilio4822@student.leedsbeckett.ac.uk<mailto:t.vergilio4822@student.leedsbeckett.ac.uk>>

Hi Till,

Thanks a lot for your answer.

Is the taskmanager.data.port unique per TaskManager? The documentation says it is assigned
at runtime by the OS. My thinking here is that you would need to know what that is at service
creation time, which would go against the whole idea of how services are scaled in Docker

When you create a Swarm service using 'docker stack deploy' or 'docker service create', the
configuration that is used at that point is the same that will be used by all instances of
the service. If you then scale TaskManager to 8 or 10 containers, each of them gets the same
service configuration(the one used to create the service).

I have in fact tried to map specific ports in the TaskManager service configuration, but then
I got "port already in use" when I tried to scale up the service.

I wonder if there is a way around it.

Maybe the people who developed the create-docker-swarm-service.sh script in the docker-flink
project would be able to shed some light?

From: Till Rohrmann <trohrmann@apache.org<mailto:trohrmann@apache.org>>
Sent: 06 November 2017 12:40:33
To: Piotr Nowojski
Cc: Vergilio, Thalita; user@flink.apache.org<mailto:user@flink.apache.org>; Patrick

Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different

Hi Thalita,

in order to make Flink work, I think you have to expose the JobManager RPC port, the Blob
server port and make sure that the TaskManager can talk to each other by exposing the `taskmanager.data.port`.
The query server port is only necessary if you want to use queryable state.

I've pulled in Patrick who has more experience with running Flink on top of Docker. He'll
definitely be able to provide more detailed recommendations.


On Mon, Nov 6, 2017 at 9:22 AM, Piotr Nowojski <piotr@data-artisans.com<mailto:piotr@data-artisans.com>>
Till, is there somewhere a list of ports that need to exposed that’s more up to date compared
to docker-flunk README?


On 3 Nov 2017, at 10:23, Vergilio, Thalita <t.vergilio4822@student.leedsbeckett.ac.uk<mailto:t.vergilio4822@student.leedsbeckett.ac.uk>>

Just an update: by changing the JOB_MANAGER_RPC_ADDRESS to the public IP of the JobManager
and exposing port 6123 as {{PUBLIC_IP}}:6123:6123, I manged to get the TaskManagers from different
nodes and even different subnets to talk to the JobManager.

This is how I created the services:

docker network create -d overlay overlay

docker service create --name jobmanager --env JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 8081:8081
-p{{PUBLIC_IP}}:6123:6123 -p 48081:48081 -p 6124:6124 -p 6125:6125 --network overlay --constraint
'node.hostname == ubuntu-swarm-manager' flink jobmanager

docker service create --name taskmanager --env JOB_MANAGER_RPC_ADDRESS={{PUBLIC_IP}}  -p 6121:6121
-p 6122:6122  --network overlay --constraint 'node.hostname != ubuntu-swarm-manager' flink

However, I am still encountering errors further down the line. When I submit a job using the
Web UI, it fails because the JobManager can't talk to the TaskManager on port 35033. I presume
this is the taskmanager.data.port, which needs to be set to a range and this range exposed
when I create the service?

Are there any other ports that I need to open at service creation time?

Connecting the channel failed: Connecting to remote task manager + '/{{IP_ADDRESS_OF_MANAGER}}:35033'
has failed. This might indicate that the remote task manager has been lost.
        at org.apache.flink.runtime.io<http://org.apache.flink.runtime.io>.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
        at org.apache.flink.runtime.io<http://org.apache.flink.runtime.io>.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
        at org.apache.flink.runtime.io<http://org.apache.flink.runtime.io>.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
        at org.apache.flink.runtime.io<http://org.apache.flink.runtime.io>.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
        at org.apache.flink.runtime.io<http://org.apache.flink.runtime.io>.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:112)
        at org.apache.flink.runtime.io<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:433)
        at org.apache.flink.runtime.io<http://org.apache.flink.runtime.io>.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:455)
        at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:91)
        at org.apache.flink.streaming.runtime.io<http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)

From: Piotr Nowojski <piotr@data-artisans.com<mailto:piotr@data-artisans.com>>
Sent: 02 November 2017 14:26:32
To: Vergilio, Thalita
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different

Did you try to expose required ports that are listed in the README when starting the containers?


• The Web Client is on port 48081
• JobManager RPC port 6123 (default, not exposed to host)
• TaskManagers RPC port 6122 (default, not exposed to host)
• TaskManagers Data port 6121 (default, not exposed to host)


On 2 Nov 2017, at 14:44, javalass <t.vergilio4822@student.leedsbeckett.ac.uk<mailto:t.vergilio4822@student.leedsbeckett.ac.uk>>

I am using the Docker-Flink project in:

I am creating the services with the following commands:
docker network create -d overlay overlay
docker service create --name jobmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager -p 8081:8081 --network overlay
--constraint 'node.hostname == ubuntu-swarm-manager' flink jobmanager
docker service create --name taskmanager --env
JOB_MANAGER_RPC_ADDRESS=jobmanager --network overlay --constraint
'node.hostname != ubuntu-swarm-manager' flink taskmanager

I wonder if there's any configuration I'm missing. This is the error I get:
- Trying to register at JobManager akka.tcp://flink@jobmanager:6123/
user/jobmanager (attempt 4, timeout: 4000 milliseconds)

Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

To view the terms under which this email is distributed, please go to:-

To view the terms under which this email is distributed, please go to:-

To view the terms under which this email is distributed, please go to:-

View raw message