flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ron Crocker <rcroc...@newrelic.com>
Subject Re: Question about the behavior of TM when it lost the zookeeper client session in HA mode
Date Wed, 18 Jul 2018 21:30:20 GMT
I just stumbled on this same problem without any associated ZK issues. We had a Kafka broker
fail that caused this issue:

2018-07-18 02:48:13,497 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
  - Sink: Produce: <output_topic_name> (2/4) (7e7d61b286d90c51bbd20a15796633f2) switched
from RUNNING to FAILED.
java.lang.Exception: Failed to send data to Kafka: The server disconnected before a response
was received.
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:288)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.NetworkException: The server disconnected before
a response was received.

This is the kind of error we should be robust to - the Kafka cluster will (reasonably quickly)
recover and give a new broker for a particular partition (in this case, partition #2). Maybe
retries should be the default configuration? I believe the client uses the Kafka defaults
(acks=0, retries=0), but we typically run with acks=1 (or all) and retries=MAX_INT. Do I need
to do anything more than that to get a more robust producer?

Ron

> On May 16, 2018, at 7:45 PM, Tony Wei <tony19920430@gmail.com> wrote:
> 
> Hi Ufuk, Piotr
> 
> Thanks for all of your replies. I knew that jobs are cancelled if the JM looses the connection
to ZK, but JM didn't loose connection in my case.
> My job failed because of the exception from KafkaProducer. However, it happened before
and after that exception that TM lost ZK connection.
> So, as Piotr said, it looks like an error in Kafka producer and I will pay more attention
on it to see if there is something unexpected happens again.
> 
> Best Regards,
> Tony Wei
> 
> 2018-05-15 19:56 GMT+08:00 Piotr Nowojski <piotr@data-artisans.com <mailto:piotr@data-artisans.com>>:
> Hi,
> 
> It looks like there was an error in asynchronous job of sending the records to Kafka.
Probably this is a collateral damage of loosing connection to zookeeper. 
> 
> Piotrek
> 
>> On 15 May 2018, at 13:33, Ufuk Celebi <uce@apache.org <mailto:uce@apache.org>>
wrote:
>> 
>> Hey Tony,
>> 
>> thanks for the detailed report.
>> 
>> - In Flink 1.4, jobs are cancelled if the JM looses the connection to ZK and recovered
when the connection is re-established (and one JM becomes leader again).
>> 
>> - Regarding the KafkaProducer: I'm not sure from the log message whether Flink closes
the KafkaProducer because the job is cancelled or because there is a connectivity issue to
the Kafka cluster. Including Piotr (cc) in this thread who has worked on the KafkaProducer
in the past. If it is a connectivity issue, it might also explain why you lost the connection
to ZK.
>> 
>> Glad to hear that everything is back to normal. Keep us updated if something unexpected
happens again.
>> 
>> – Ufuk
>> 
>> 
>> On Tue, May 15, 2018 at 6:28 AM, Tony Wei <tony19920430@gmail.com <mailto:tony19920430@gmail.com>>
wrote:
>> Hi all,
>> 
>> I restarted the cluster and changed the log level to DEBUG, and raised the parallelism
of my streaming job from 32 to 40.
>> However, the problem just disappeared and I don't know why.
>> I will remain these settings for a while. If the error happen again, I will bring
more informations back for help. Thank you.
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2018-05-14 14:24 GMT+08:00 Tony Wei <tony19920430@gmail.com <mailto:tony19920430@gmail.com>>:
>> Hi all,
>> 
>> After I changed the `high-availability.zookeeper.client.session-timeout` and `maxSessionTimeout`
to 120000ms, the exception still occurred.
>> 
>> Here is the log snippet. It seems this is nothing to do with zookeeper client timeout,
but I still don't know why kafka producer would be closed without any task state changed.
>> 
>> ```
>> 2018-05-14 05:18:53,468 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn
 - Client session timed out, have not heard from server in 82828ms for sessionid 0x305f957eb8d000a
>> 2018-05-14 05:18:53,468 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn
 - Client session timed out, have not heard from server in 82828ms for sessionid 0x305f957eb8d000a,
closing socket connection and attempting reconnect
>> 2018-05-14 05:18:53,571 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
 - State change: SUSPENDED
>> 2018-05-14 05:18:53,574 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
 - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
>> 2018-05-14 05:18:53,850 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn
 - SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration
section named 'Client' was found in specified JAAS configuration file: '/mnt/jaas-466390940757021791.conf'.
Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server
allows it.
>> 2018-05-14 05:18:53,850 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn
 - Opening socket connection to server XXX.XXX.XXX.XXX:2181
>> 2018-05-14 05:18:53,852 ERROR org.apache.flink.shaded.curator.org.apache.curator.ConnectionState
 - Authentication failed
>> 2018-05-14 05:18:53,853 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn
 - Socket connection established to XXX.XXX.XXX.XXX:2181, initiating session
>> 2018-05-14 05:18:53,859 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn
 - Session establishment complete on server XXX.XXX.XXX.XXX:2181, sessionid = 0x305f957eb8d000a,
negotiated timeout = 120000
>> 2018-05-14 05:18:53,860 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
 - State change: RECONNECTED
>> 2018-05-14 05:18:53,860 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
 - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
>> 2018-05-14 05:28:54,781 INFO  org.apache.kafka.clients.producer.KafkaProducer   
           - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
>> 2018-05-14 05:28:54,829 INFO  org.apache.kafka.clients.producer.KafkaProducer   
           - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
>> 2018-05-14 05:28:54,918 INFO  org.apache.flink.runtime.taskmanager.Task         
           - match-rule -> (get-ordinary -> Sink: kafka-sink, get-cd -> Sink: kafka-sink-cd)
(1/32) (e3462ff8bb565bb0cf4de49ffc2595fb) switched from RUNNING to FAILED.
>> java.lang.Exception: Failed to send data to Kafka: The server disconnected before
a response was received.
>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:288)
>> 	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:464)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:441)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:415)
>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
>> 	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:464)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:441)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:415)
>> 	at org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.collect(CopyingDirectedOutput.java:62)
>> 	at org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.collect(CopyingDirectedOutput.java:34)
>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
>> 	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>> 	at com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$anonfun$flatMap1$4.apply(MatchRuleOperator.scala:39)
>> 	at com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$anonfun$flatMap1$4.apply(MatchRuleOperator.scala:38)
>> 	at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>> 	at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>> 	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>> 	at scala.collection.immutable.Map$Map2.foreach(Map.scala:137)
>> 	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>> 	at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
>> 	at com.appier.rt.rt_match.flink.operator.MatchRuleOperator.flatMap1(MatchRuleOperator.scala:38)
>> 	at com.appier.rt.rt_match.flink.operator.MatchRuleOperator.flatMap1(MatchRuleOperator.scala:14)
>> 	at org.apache.flink.streaming.api.operators.co <http://api.operators.co/>.CoStreamFlatMap.processElement1(CoStreamFlatMap.java:53)
>> 	at org.apache.flink.streaming.runtime.io <http://runtime.io/>.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:243)
>> 	at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> 	at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.kafka.common.errors.NetworkException: The server disconnected
before a response was received.
>> ```
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2018-05-14 11:36 GMT+08:00 Tony Wei <tony19920430@gmail.com <mailto:tony19920430@gmail.com>>:
>> Hi all,
>> 
>> Recently, my flink job met a problem that caused the job failed and restarted.
>> 
>> The log is list this screen snapshot
>> 
>> <exception.png>
>> 
>> or this
>> 
>> ```
>> 2018-05-11 13:21:04,582 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn
 - Client session timed out, have not heard from server in 61054ms for sessionid 0x3054b165fe2006a
>> 2018-05-11 13:21:04,583 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn
 - Client session timed out, have not heard from server in 61054ms for sessionid 0x3054b165fe2006a,
closing socket connection and attempting reconnect
>> 2018-05-11 13:21:04,683 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
 - State change: SUSPENDED
>> 2018-05-11 13:21:04,686 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
 - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
>> 2018-05-11 13:21:04,689 INFO  org.apache.kafka.clients.producer.KafkaProducer   
           - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
>> 2018-05-11 13:21:04,694 INFO  org.apache.kafka.clients.producer.KafkaProducer   
           - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
>> 2018-05-11 13:21:04,698 INFO  org.apache.flink.runtime.taskmanager.Task         
           - match-rule -> (get-ordinary -> Sink: kafka-sink, get-cd -> Sink: kafka-sink-cd)
(4/32) (65a4044ac963e083f2635fe24e7f2403) switched from RUNNING to FAILED.
>> java.lang.Exception: Failed to send data to Kafka: The server disconnected before
a response was received.
>> ```
>> 
>> Logs showed `org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka
producer with timeoutMillis = 9223372036854775807 ms.` This timeout value is Long.MAX_VALUE.
It happened when someone called `producer.close()`.
>> 
>> And I also saw the log said `org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn
 - Client session timed out, have not heard from server in 61054ms for sessionid 0x3054b165fe2006a,
closing socket connection and attempting reconnect`
>> and `org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.`
>> 
>> I have checked zookeeper and kafka and there was no error during that period.
>> I was wondering if TM will stop the tasks when it lost zookeeper client in HA mode.
Since I didn't see any document or mailing thread discuss this, I'm not sure if this is the
reason that made kafka producer closed.
>> Could someone who know HA well? Or someone know what happened in my job?
>> 
>> My flink cluster version is 1.4.0 with 2 masters and 10 slaves. My zookeeper cluster
version is 3.4.11 with 3 nodes.
>> The `high-availability.zookeeper.client.session-timeout` is default value: 60000
ms.
>> The `maxSessionTimeout` in zoo.cfg is 40000ms.
>> I have already change the maxSessionTimeout to 120000ms this morning.
>> 
>> This problem happened many many times during the last weekend and made my kafka log
delay grew up. Please help me. Thank you very much!
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 
>> 
>> 
>> 
>> 
> 
> 


Mime
View raw message