flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Boris Lublinsky <boris.lublin...@lightbend.com>
Subject Re: Jira issue Flink-11127
Date Fri, 22 Feb 2019 17:00:17 GMT
And it works now
The problem was that I was setting jobmaneger.rest.address 
 jobmanager.rpc.address

that was creating actor system on the local host

Although I am still getting the below messages in the job manager periodically, but they seem
to be harmless

ERROR org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler  - Caught
exception
java.io.IOException: Connection reset by peer
	at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
	at org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
	at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1108)
	at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:345)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
	at java.lang.Thread.run(Thread.java:748)
2019-02-22 16:49:22,016 WARN  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
  - Unhandled exception
java.io.IOException: Connection reset by peer
	at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
	at org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
	at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1108)
	at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:345)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
	at java.lang.Thread.run(Thread.java:748)





Boris Lublinsky
FDP Architect
boris.lublinsky@lightbend.com
https://www.lightbend.com/

> On Feb 22, 2019, at 10:35 AM, Andrey Zagrebin <andrey@ververica.com> wrote:
> 
> cc aleksey@ververica.com <mailto:aleksey@ververica.com>
> On Fri, Feb 22, 2019 at 1:28 AM Boris Lublinsky <boris.lublinsky@lightbend.com <mailto:boris.lublinsky@lightbend.com>>
wrote:
> Adding metric-query port makes it a bit better, but there is still an error
> 
> 
> 019-02-22 00:03:56,173 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor     
      - Could not resolve ResourceManager address akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager
<>, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/
<>), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message of type
"akka.actor.Identify"..
> 2019-02-22 00:04:16,213 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor    
       - Could not resolve ResourceManager address akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager
<>, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/
<>), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message of type
"akka.actor.Identify"..
> 2019-02-22 00:04:36,253 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor    
       - Could not resolve ResourceManager address akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager
<>, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/
<>), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message of type
"akka.actor.Identify"..
> 2019-02-22 00:04:56,293 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor    
       - Could not resolve ResourceManager address akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/user/resourcemanager
<>, retrying in 10000 ms: Ask timed out on [ActorSelection[Anchor(akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/
<>), Path(/user/resourcemanager)]] after [10000 ms]. Sender[null] sent message of type
"akka.actor.Identify”..
> 
> In the task manager and
> 
> 2019-02-21 23:59:46,479 ERROR akka.remote.EndpointWriter                            
       - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient
[Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/ <>]] arriving at [akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123
<>] inbound addresses are [akka.tcp://flink@127.0.0.1:6123 <>]
> 2019-02-21 23:59:57,808 ERROR akka.remote.EndpointWriter                            
       - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient
[Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/ <>]] arriving at [akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123
<>] inbound addresses are [akka.tcp://flink@127.0.0.1:6123 <>]
> 2019-02-22 00:00:06,519 ERROR akka.remote.EndpointWriter                            
       - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient
[Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/ <>]] arriving at [akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123
<>] inbound addresses are [akka.tcp://flink@127.0.0.1:6123 <>]
> 2019-02-22 00:00:17,849 ERROR akka.remote.EndpointWriter                            
       - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient
[Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/ <>]] arriving at [akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123
<>] inbound addresses are [akka.tcp://flink@127.0.0.1:6123 <>]
> 2019-02-22 00:00:26,558 ERROR akka.remote.EndpointWriter                            
       - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient
[Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/ <>]] arriving at [akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123
<>] inbound addresses are [akka.tcp://flink@127.0.0.1:6123 <>]
> 2019-02-22 00:00:37,888 ERROR akka.remote.EndpointWriter                            
       - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient
[Actor[akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123/ <>]] arriving at [akka.tcp://flink@maudlin-ibis-fdp-flink-jobmanager:6123
<>] inbound addresses are [akka.tcp://flink@127.0.0.1:6123 <>]
> 
> I the job manager
> 
> Port 6123 is opened in both Job Manager deployment
> 
> apiVersion: extensions/v1beta1
> kind: Deployment
> metadata:
>   name: {{ template "fullname" . }}-jobmanager
> spec:
>   replicas: 1
>   template:
>     metadata:
>       annotations:
>         prometheus.io/scrape: <http://prometheus.io/scrape:> 'true'
>         prometheus.io/port: <http://prometheus.io/port:> '9249'
>       labels:
>         server: flink
>         app: {{ template "fullname" . }}
>         component: jobmanager
>     spec:
>       containers:
>       - name: jobmanager
>         image: {{ .Values.image }}:{{ .Values.imageTag }}
>         imagePullPolicy: {{ .Values.imagePullPolicy }}
>         args:
>         - jobmanager
>         ports:
>         - containerPort: 6123
>           name: rpc
>         - containerPort: 6124
>           name: blob
>         - containerPort: 8081
>           name: ui
>         env:
>         - name: CONTAINER_METRIC_PORT
>           value: '{{ .Values.flink.metric_query_port }}'
>         - name: JOB_MANAGER_RPC_ADDRESS
>           value : {{ template "fullname" . }}-jobmanager
>         livenessProbe:
>           httpGet:
>             path: /overview
>             port: 8081
>           initialDelaySeconds: 30
>           periodSeconds: 10
>         resources:
>           limits:
>             cpu: {{ .Values.resources.jobmanager.limits.cpu }}
>             memory: {{ .Values.resources.jobmanager.limits.memory }}
>           requests:
>             cpu: {{ .Values.resources.jobmanager.requests.cpu }}
>             memory: {{ .Values.resources.jobmanager.requests.memory }}
> 
> And Job manager service
> 
> apiVersion: v1
> kind: Service
> metadata:
>   name: {{ template "fullname" . }}-jobmanager
> spec:
>   ports:
>   - name: rpc
>     port: 6123
>   - name: blob
>     port: 6124
>   - name: ui
>     port: 8081
>   selector:
>     app: {{ template "fullname" . }}
>     component: jobmanager
> 
> 
> 
> 
> Boris Lublinsky
> FDP Architect
> boris.lublinsky@lightbend.com <mailto:boris.lublinsky@lightbend.com>
> https://www.lightbend.com/ <https://www.lightbend.com/>
>> On Feb 21, 2019, at 6:13 PM, Boris Lublinsky <boris.lublinsky@lightbend.com <mailto:boris.lublinsky@lightbend.com>>
wrote:
>> 
>> 
>> Boris Lublinsky
>> FDP Architect
>> boris.lublinsky@lightbend.com <mailto:boris.lublinsky@lightbend.com>
>> https://www.lightbend.com/ <https://www.lightbend.com/>
>>> On Feb 21, 2019, at 2:05 AM, Konstantin Knauf <konstantin@ververica.com <mailto:konstantin@ververica.com>>
wrote:
>>> 
>>> Hi Boris, 
>>> 
>>> the exact command depends on the docker-entrypoint.sh script and the image you
are using. For the example contained in the Flink repository it is "task-manager", I think.
The important thing is to pass "taskmanager.host" to the Taskmanager process. You can verify
by checking the Taskmanager logs. These should contain lines like below:
>>> 
>>> 2019-02-21 08:03:00,004 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner
     [] -  Program Arguments:
>>> 2019-02-21 08:03:00,008 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner
     [] -     -Dtaskmanager.host=10.12.10.173
>>> 
>>> In the Jobmanager logs you should see that the Taskmanager is registered under
the IP above in a line similar to:
>>> 
>>> 2019-02-21 08:03:26,874 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager
[] - Registering TaskManager with ResourceID a0513ba2c472d2d1efc07626da9c1bda (akka.tcp://flink@10.12.10.173:46531/user/taskmanager_0
<http://flink@10.12.10.173:46531/user/taskmanager_0>) at ResourceManager
>>> 
>>> A service per Taskmanager is not required. The purpose of the config parameter
is that the Jobmanager addresses the taskmanagers by IP instead of hostname.
>>> 
>>> Hope this helps!
>>> 
>>> Cheers, 
>>> 
>>> Konstantin
>>> 
>>> 
>>> 
>>> On Wed, Feb 20, 2019 at 4:37 PM Boris Lublinsky <boris.lublinsky@lightbend.com
<mailto:boris.lublinsky@lightbend.com>> wrote:
>>> Also, The suggested workaround does not quite work.
>>> 2019-02-20 15:27:43,928 WARN  akka.remote.ReliableDeliverySupervisor        
               - Association with remote system [akka.tcp://flink-metrics@flink-taskmanager-1:6170
<>] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@flink-taskmanager-1:6170
<>]] Caused by: [flink-taskmanager-1: No address associated with hostname]
>>> 2019-02-20 15:27:48,750 ERROR org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler
 - Caught exception
>>> 
>>> I think the problem is that its trying to connect to flink-task-manager-1
>>> 
>>> Using busybody to experiment with nslookup, I can see
>>> / # nslookup flink-taskmanager-1.flink-taskmanager
>>> Server:    10.0.11.151
>>> Address 1: 10.0.11.151 ip-10-0-11-151.us <http://ip-10-0-11-151.us/>-west-2.compute.internal
>>> 
>>> Name:      flink-taskmanager-1.flink-taskmanager
>>> Address 1: 10.131.2.136 flink-taskmanager-1.flink-taskmanager.flink.svc.cluster.local
>>> / # nslookup flink-taskmanager-1
>>> Server:    10.0.11.151
>>> Address 1: 10.0.11.151 ip-10-0-11-151.us <http://ip-10-0-11-151.us/>-west-2.compute.internal
>>> 
>>> nslookup: can't resolve 'flink-taskmanager-1'
>>> / # nslookup flink-taskmanager-0.flink-taskmanager
>>> Server:    10.0.11.151
>>> Address 1: 10.0.11.151 ip-10-0-11-151.us <http://ip-10-0-11-151.us/>-west-2.compute.internal
>>> 
>>> Name:      flink-taskmanager-0.flink-taskmanager
>>> Address 1: 10.131.0.111 flink-taskmanager-0.flink-taskmanager.flink.svc.cluster.local
>>> / # nslookup flink-taskmanager-0
>>> Server:    10.0.11.151
>>> Address 1: 10.0.11.151 ip-10-0-11-151.us <http://ip-10-0-11-151.us/>-west-2.compute.internal
>>> 
>>> nslookup: can't resolve 'flink-taskmanager-0'
>>> / # 
>>> 
>>> So the name should be postfixed with the service name. How do I force it? I suspect
I am missing config parameter
>>> 
>>>  
>>> Boris Lublinsky
>>> FDP Architect
>>> boris.lublinsky@lightbend.com <mailto:boris.lublinsky@lightbend.com>
>>> https://www.lightbend.com/ <https://www.lightbend.com/>
>>>> On Feb 19, 2019, at 4:33 AM, Konstantin Knauf <konstantin@ververica.com
<mailto:konstantin@ververica.com>> wrote:
>>>> 
>>>> Hi Boris, 
>>>> 
>>>> the solution is actually simpler than it sounds from the ticket. The only
thing you need to do is to set the "taskmanager.host" to the Pod's IP address in the Flink
configuration. The easiest way to do this is to pass this config dynamically via a command-line
parameter. 
>>>> 
>>>> The Deployment spec could looks something like this:
>>>> containers:
>>>> - name: taskmanager
>>>>   [...]
>>>>   args:
>>>>   - "taskmanager.sh"
>>>>   - "start-foreground"
>>>>   - "-Dtaskmanager.host=$(K8S_POD_IP)"
>>>>   [...]
>>>>   env:
>>>>   - name: K8S_POD_IP
>>>>     valueFrom:
>>>>       fieldRef:
>>>>         fieldPath: status.podIP
>>>> 
>>>> Hope this helps and let me know if this works. 
>>>> 
>>>> Best, 
>>>> 
>>>> Konstantin
>>>> 
>>>> On Sun, Feb 17, 2019 at 9:51 PM Boris Lublinsky <boris.lublinsky@lightbend.com
<mailto:boris.lublinsky@lightbend.com>> wrote:
>>>> I was looking at this issue https://issues.apache.org/jira/browse/FLINK-11127
<https://issues.apache.org/jira/browse/FLINK-11127>
>>>> Apparently there is a workaround for it.
>>>> Is it possible provide the complete helm chart for it.
>>>> Bits and pieces are in the ticket, but it would be nice to see the full chart
>>>> 
>>>> Boris Lublinsky
>>>> FDP Architect
>>>> boris.lublinsky@lightbend.com <mailto:boris.lublinsky@lightbend.com>
>>>> https://www.lightbend.com/ <https://www.lightbend.com/>
>>>> 
>>>> 
>>>> -- 
>>>> Konstantin Knauf | Solutions Architect
>>>> +49 160 91394525
>>>> 
>>>>  <https://www.ververica.com/>
>>>> Follow us @VervericaData
>>>> --
>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference
>>>> Stream Processing | Event Driven | Real Time
>>>> --
>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>> --
>>>> Data Artisans GmbH
>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen    
>>> 
>>> 
>>> 
>>> -- 
>>> Konstantin Knauf | Solutions Architect
>>> +49 160 91394525
>>>  <https://www.ververica.com/>
>>> Follow us @VervericaData
>>> --
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference
>>> Stream Processing | Event Driven | Real Time
>>> --
>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>> --
>>> Data Artisans GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen    
>> 
> 


Mime
View raw message