From user-return-21423-archive-asf-public=cust-asf.ponee.io@flink.apache.org Wed Jul 18 23:30:37 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 3C0DB180636 for ; Wed, 18 Jul 2018 23:30:36 +0200 (CEST) Received: (qmail 15583 invoked by uid 500); 18 Jul 2018 21:30:34 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 15572 invoked by uid 99); 18 Jul 2018 21:30:34 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Jul 2018 21:30:34 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 65605C126E for ; Wed, 18 Jul 2018 21:30:34 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.041 X-Spam-Level: *** X-Spam-Status: No, score=3.041 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, HTML_OBFUSCATE_10_20=1.162, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=newrelic.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id KpzLbnCSrBOu for ; Wed, 18 Jul 2018 21:30:30 +0000 (UTC) Received: from mail-pg1-f193.google.com (mail-pg1-f193.google.com [209.85.215.193]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 9C2C35F178 for ; Wed, 18 Jul 2018 21:30:29 +0000 (UTC) Received: by mail-pg1-f193.google.com with SMTP id p23-v6so2560814pgv.13 for ; Wed, 18 Jul 2018 14:30:29 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=newrelic.com; s=google; h=from:message-id:mime-version:subject:date:in-reply-to:cc:to :references; bh=D8Aws4lUxkOsyDbQqVli3qglsFJeMWDFO8YZWAu2Gtg=; b=vM70bmLRbWa5S1jnZR46lZYJ+CYsTQVEKuMIS5Vocn66VeAJhbYlfnS0iUX36PyqsN Y/lOnzeD7EWp3aU9ScM/kkOY3gAE5YJ8wIpjKiY3N1f6M3pvHW0deto5DrUrcc2oZUxt A2kgQ90XJDSPzUquV4x0/CIDqqT3aEQOM3bO52ROX2l2pKSJ4nLEG8E1z8tLJSOVXQi7 M3d6ew5pDenDuT6YUdKQ17sam2RHvKbwc2sdrNNNc38lFS9P83HD1iQ2mdquV9PM6tO1 +fXuPsfnoD/vUVXZJZOQipHnvvCbaJO5BzkCrXEbM2U2EcSAv+0qUpL3F7fBeNLDv+7z 6NDw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:message-id:mime-version:subject:date :in-reply-to:cc:to:references; bh=D8Aws4lUxkOsyDbQqVli3qglsFJeMWDFO8YZWAu2Gtg=; b=uoZFaek66DQj/daAdr08uc8S7VM+dJc+x169u8V8eeMr6TsBPcTddms/Eor0GR7df8 wB8V3FCv725BcCWoFxoNfdieTnXJPAzqOHjmJh0HSWA8gtwpVV2igsZXd2W+2OD8stHL 5M2Web5G9gyAyAA4uEC5t3DP5BU3uy3Op8RZjOtsBUO0gkGyAAHtFya2bSurCsMoKjO6 D5mJj8VNM91q/ly2pe8v1tHiusmH7OwwiMQlP8BF1hp1PWi/Zw2Qa2uKhiGWBNDHnDuB 9o8WRwchRjvnYvRoc198po76uRQLu7/+U43bxy77k3frGEpSLA79Q8QBZ7pKPyr0foOY iqyA== X-Gm-Message-State: AOUpUlET8uS/8k9i4AAMeTeuNBi8DVkE1xxs9fuSRzQpMZtr3nuj42ss XZAM7MU6q3BsLCw/kzrctsMWQ//P8AU= X-Google-Smtp-Source: AAOMgpfDP4jAAY7m4gsOsVlLThIdesMLyFNySf3LMI0Q+MTDyyMUUJoJAGwJZn+UNoukEgyoZYu39Q== X-Received: by 2002:a62:a312:: with SMTP id s18-v6mr6709636pfe.13.1531949422586; Wed, 18 Jul 2018 14:30:22 -0700 (PDT) Received: from [172.16.112.191] ([38.104.105.178]) by smtp.gmail.com with ESMTPSA id 14-v6sm8897468pft.93.2018.07.18.14.30.21 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Wed, 18 Jul 2018 14:30:21 -0700 (PDT) From: Ron Crocker Message-Id: <2B0AF483-4A7B-4E55-80BC-E706C675714F@newrelic.com> Content-Type: multipart/alternative; boundary="Apple-Mail=_6C2BE91C-8DAD-4505-BF14-ED9B58545D75" Mime-Version: 1.0 (Mac OS X Mail 11.5 \(3445.9.1\)) Subject: Re: Question about the behavior of TM when it lost the zookeeper client session in HA mode Date: Wed, 18 Jul 2018 14:30:20 -0700 In-Reply-To: Cc: Tony Wei To: user , Tony Wei References: <438D598F-8751-4ABB-B32F-A0417EF2F56E@data-artisans.com> X-Mailer: Apple Mail (2.3445.9.1) --Apple-Mail=_6C2BE91C-8DAD-4505-BF14-ED9B58545D75 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 I just stumbled on this same problem without any associated ZK issues. = We had a Kafka broker fail that caused this issue: 2018-07-18 02:48:13,497 INFO = org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: = Produce: (2/4) (7e7d61b286d90c51bbd20a15796633f2) = switched from RUNNING to FAILED. java.lang.Exception: Failed to send data to Kafka: The server = disconnected before a response was received. at = org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkEr= roneous(FlinkKafkaProducerBase.java:373) at = org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(F= linkKafkaProducer010.java:288) at = org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamS= ink.java:56) at = org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(St= reamInputProcessor.java:207) at = org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputSt= reamTask.java:69) at = org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java= :264) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.NetworkException: The server = disconnected before a response was received. This is the kind of error we should be robust to - the Kafka cluster = will (reasonably quickly) recover and give a new broker for a particular = partition (in this case, partition #2). Maybe retries should be the = default configuration? I believe the client uses the Kafka defaults = (acks=3D0, retries=3D0), but we typically run with acks=3D1 (or all) and = retries=3DMAX_INT. Do I need to do anything more than that to get a more = robust producer? Ron > On May 16, 2018, at 7:45 PM, Tony Wei wrote: >=20 > Hi Ufuk, Piotr >=20 > Thanks for all of your replies. I knew that jobs are cancelled if the = JM looses the connection to ZK, but JM didn't loose connection in my = case. > My job failed because of the exception from KafkaProducer. However, it = happened before and after that exception that TM lost ZK connection. > So, as Piotr said, it looks like an error in Kafka producer and I will = pay more attention on it to see if there is something unexpected happens = again. >=20 > Best Regards, > Tony Wei >=20 > 2018-05-15 19:56 GMT+08:00 Piotr Nowojski >: > Hi, >=20 > It looks like there was an error in asynchronous job of sending the = records to Kafka. Probably this is a collateral damage of loosing = connection to zookeeper.=20 >=20 > Piotrek >=20 >> On 15 May 2018, at 13:33, Ufuk Celebi > wrote: >>=20 >> Hey Tony, >>=20 >> thanks for the detailed report. >>=20 >> - In Flink 1.4, jobs are cancelled if the JM looses the connection to = ZK and recovered when the connection is re-established (and one JM = becomes leader again). >>=20 >> - Regarding the KafkaProducer: I'm not sure from the log message = whether Flink closes the KafkaProducer because the job is cancelled or = because there is a connectivity issue to the Kafka cluster. Including = Piotr (cc) in this thread who has worked on the KafkaProducer in the = past. If it is a connectivity issue, it might also explain why you lost = the connection to ZK. >>=20 >> Glad to hear that everything is back to normal. Keep us updated if = something unexpected happens again. >>=20 >> =E2=80=93 Ufuk >>=20 >>=20 >> On Tue, May 15, 2018 at 6:28 AM, Tony Wei > wrote: >> Hi all, >>=20 >> I restarted the cluster and changed the log level to DEBUG, and = raised the parallelism of my streaming job from 32 to 40. >> However, the problem just disappeared and I don't know why. >> I will remain these settings for a while. If the error happen again, = I will bring more informations back for help. Thank you. >>=20 >> Best Regards, >> Tony Wei >>=20 >> 2018-05-14 14:24 GMT+08:00 Tony Wei >: >> Hi all, >>=20 >> After I changed the = `high-availability.zookeeper.client.session-timeout` and = `maxSessionTimeout` to 120000ms, the exception still occurred. >>=20 >> Here is the log snippet. It seems this is nothing to do with = zookeeper client timeout, but I still don't know why kafka producer = would be closed without any task state changed. >>=20 >> ``` >> 2018-05-14 05:18:53,468 WARN = org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - = Client session timed out, have not heard from server in 82828ms for = sessionid 0x305f957eb8d000a >> 2018-05-14 05:18:53,468 INFO = org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - = Client session timed out, have not heard from server in 82828ms for = sessionid 0x305f957eb8d000a, closing socket connection and attempting = reconnect >> 2018-05-14 05:18:53,571 INFO = org.apache.flink.shaded.curator.org.apache.curator.framework.state.Connect= ionStateManager - State change: SUSPENDED >> 2018-05-14 05:18:53,574 WARN = org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService = - Connection to ZooKeeper suspended. Can no longer retrieve the leader = from ZooKeeper. >> 2018-05-14 05:18:53,850 WARN = org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - = SASL configuration failed: javax.security.auth.login.LoginException: No = JAAS configuration section named 'Client' was found in specified JAAS = configuration file: '/mnt/jaas-466390940757021791.conf'. Will continue = connection to Zookeeper server without SASL authentication, if Zookeeper = server allows it. >> 2018-05-14 05:18:53,850 INFO = org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - = Opening socket connection to server XXX.XXX.XXX.XXX:2181 >> 2018-05-14 05:18:53,852 ERROR = org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - = Authentication failed >> 2018-05-14 05:18:53,853 INFO = org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - = Socket connection established to XXX.XXX.XXX.XXX:2181, initiating = session >> 2018-05-14 05:18:53,859 INFO = org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - = Session establishment complete on server XXX.XXX.XXX.XXX:2181, sessionid = =3D 0x305f957eb8d000a, negotiated timeout =3D 120000 >> 2018-05-14 05:18:53,860 INFO = org.apache.flink.shaded.curator.org.apache.curator.framework.state.Connect= ionStateManager - State change: RECONNECTED >> 2018-05-14 05:18:53,860 INFO = org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService = - Connection to ZooKeeper was reconnected. Leader retrieval can be = restarted. >> 2018-05-14 05:28:54,781 INFO = org.apache.kafka.clients.producer.KafkaProducer - Closing = the Kafka producer with timeoutMillis =3D 9223372036854775807 ms. >> 2018-05-14 05:28:54,829 INFO = org.apache.kafka.clients.producer.KafkaProducer - Closing = the Kafka producer with timeoutMillis =3D 9223372036854775807 ms. >> 2018-05-14 05:28:54,918 INFO = org.apache.flink.runtime.taskmanager.Task - = match-rule -> (get-ordinary -> Sink: kafka-sink, get-cd -> Sink: = kafka-sink-cd) (1/32) (e3462ff8bb565bb0cf4de49ffc2595fb) switched from = RUNNING to FAILED. >> java.lang.Exception: Failed to send data to Kafka: The server = disconnected before a response was received. >> at = org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkEr= roneous(FlinkKafkaProducerBase.java:373) >> at = org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(F= linkKafkaProducer010.java:288) >> at = org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamS= ink.java:56) >> at = org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.push= ToOperator(OperatorChain.java:464) >> at = org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.coll= ect(OperatorChain.java:441) >> at = org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.coll= ect(OperatorChain.java:415) >> at = org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOu= tput.collect(AbstractStreamOperator.java:831) >> at = org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOu= tput.collect(AbstractStreamOperator.java:809) >> at = org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMa= p.java:41) >> at = org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.push= ToOperator(OperatorChain.java:464) >> at = org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.coll= ect(OperatorChain.java:441) >> at = org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.coll= ect(OperatorChain.java:415) >> at = org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.co= llect(CopyingDirectedOutput.java:62) >> at = org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.co= llect(CopyingDirectedOutput.java:34) >> at = org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOu= tput.collect(AbstractStreamOperator.java:831) >> at = org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOu= tput.collect(AbstractStreamOperator.java:809) >> at = org.apache.flink.streaming.api.operators.TimestampedCollector.collect(Time= stampedCollector.java:51) >> at = com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$anonfun$flatMap1$= 4.apply(MatchRuleOperator.scala:39) >> at = com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$anonfun$flatMap1$= 4.apply(MatchRuleOperator.scala:38) >> at = scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.sca= la:245) >> at = scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.sca= la:245) >> at = scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(Trave= rsableLike.scala:733) >> at scala.collection.immutable.Map$Map2.foreach(Map.scala:137) >> at = scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:= 732) >> at = scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245) >> at = com.appier.rt.rt_match.flink.operator.MatchRuleOperator.flatMap1(MatchRule= Operator.scala:38) >> at = com.appier.rt.rt_match.flink.operator.MatchRuleOperator.flatMap1(MatchRule= Operator.scala:14) >> at org.apache.flink.streaming.api.operators.co = .CoStreamFlatMap.processElement1(CoStreamFlatMap= .java:53) >> at org.apache.flink.streaming.runtime.io = .StreamTwoInputProcessor.processInput(StreamTwoInputPr= ocessor.java:243) >> at = org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputSt= reamTask.java:91) >> at = org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java= :264) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: org.apache.kafka.common.errors.NetworkException: The = server disconnected before a response was received. >> ``` >>=20 >> Best Regards, >> Tony Wei >>=20 >> 2018-05-14 11:36 GMT+08:00 Tony Wei >: >> Hi all, >>=20 >> Recently, my flink job met a problem that caused the job failed and = restarted. >>=20 >> The log is list this screen snapshot >>=20 >> >>=20 >> or this >>=20 >> ``` >> 2018-05-11 13:21:04,582 WARN = org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - = Client session timed out, have not heard from server in 61054ms for = sessionid 0x3054b165fe2006a >> 2018-05-11 13:21:04,583 INFO = org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - = Client session timed out, have not heard from server in 61054ms for = sessionid 0x3054b165fe2006a, closing socket connection and attempting = reconnect >> 2018-05-11 13:21:04,683 INFO = org.apache.flink.shaded.curator.org.apache.curator.framework.state.Connect= ionStateManager - State change: SUSPENDED >> 2018-05-11 13:21:04,686 WARN = org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService = - Connection to ZooKeeper suspended. Can no longer retrieve the leader = from ZooKeeper. >> 2018-05-11 13:21:04,689 INFO = org.apache.kafka.clients.producer.KafkaProducer - Closing = the Kafka producer with timeoutMillis =3D 9223372036854775807 ms. >> 2018-05-11 13:21:04,694 INFO = org.apache.kafka.clients.producer.KafkaProducer - Closing = the Kafka producer with timeoutMillis =3D 9223372036854775807 ms. >> 2018-05-11 13:21:04,698 INFO = org.apache.flink.runtime.taskmanager.Task - = match-rule -> (get-ordinary -> Sink: kafka-sink, get-cd -> Sink: = kafka-sink-cd) (4/32) (65a4044ac963e083f2635fe24e7f2403) switched from = RUNNING to FAILED. >> java.lang.Exception: Failed to send data to Kafka: The server = disconnected before a response was received. >> ``` >>=20 >> Logs showed `org.apache.kafka.clients.producer.KafkaProducer - = Closing the Kafka producer with timeoutMillis =3D 9223372036854775807 = ms.` This timeout value is Long.MAX_VALUE. It happened when someone = called `producer.close()`. >>=20 >> And I also saw the log said = `org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - = Client session timed out, have not heard from server in 61054ms for = sessionid 0x3054b165fe2006a, closing socket connection and attempting = reconnect` >> and = `org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService = - Connection to ZooKeeper suspended. Can no longer retrieve the leader = from ZooKeeper.` >>=20 >> I have checked zookeeper and kafka and there was no error during that = period. >> I was wondering if TM will stop the tasks when it lost zookeeper = client in HA mode. Since I didn't see any document or mailing thread = discuss this, I'm not sure if this is the reason that made kafka = producer closed. >> Could someone who know HA well? Or someone know what happened in my = job? >>=20 >> My flink cluster version is 1.4.0 with 2 masters and 10 slaves. My = zookeeper cluster version is 3.4.11 with 3 nodes. >> The `high-availability.zookeeper.client.session-timeout` is default = value: 60000 ms. >> The `maxSessionTimeout` in zoo.cfg is 40000ms. >> I have already change the maxSessionTimeout to 120000ms this morning. >>=20 >> This problem happened many many times during the last weekend and = made my kafka log delay grew up. Please help me. Thank you very much! >>=20 >> Best Regards, >> Tony Wei >>=20 >>=20 >>=20 >>=20 >>=20 >>=20 >=20 >=20 --Apple-Mail=_6C2BE91C-8DAD-4505-BF14-ED9B58545D75 Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8 I = just stumbled on this same problem without any associated ZK issues. We = had a Kafka broker fail that caused this issue:

2018-07-18 =
02:48:13,497 INFO  =
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: =
Produce: <output_topic_name> (2/4) =
(7e7d61b286d90c51bbd20a15796633f2) switched from RUNNING to FAILED.
java.lang.Exception: Failed to send data to Kafka: The server =
disconnected before a response was received.
	at =
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkEr=
roneous(FlinkKafkaProducerBase.java:373)
	at =
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(F=
linkKafkaProducer010.java:288)
	at =
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamS=
ink.java:56)
	at =
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(St=
reamInputProcessor.java:207)
	at =
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputSt=
reamTask.java:69)
	at =
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java=
:264)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.NetworkException: The server =
disconnected before a response was received.

This is the kind of error we = should be robust to - the Kafka cluster will (reasonably quickly) = recover and give a new broker for a particular partition (in this case, = partition #2). Maybe retries should be the default configuration? I = believe the client uses the Kafka defaults (acks=3D0, retries=3D0), but = we typically run with acks=3D1 (or all) and retries=3DMAX_INT. Do I need = to do anything more than that to get a more robust producer?

Ron
On May = 16, 2018, at 7:45 PM, Tony Wei <tony19920430@gmail.com> wrote:

Hi Ufuk, Piotr

Thanks for all of your replies. I knew that jobs are = cancelled if the JM looses the connection to ZK, but JM didn't loose = connection in my case.
My job failed because of the = exception from KafkaProducer. However, it happened before and after that = exception that TM lost ZK = connection.
So, as Piotr said, it looks like = an error in Kafka producer and I will pay more attention on it to see if = there is something unexpected happens again.

Best Regards,
Tony = Wei

2018-05-15 19:56 GMT+08:00 Piotr Nowojski <piotr@data-artisans.com>:
Hi,

It = looks like there was an error in asynchronous job of sending the records = to Kafka. Probably this is a collateral damage of loosing connection to = zookeeper. 

Piotrek

On 15 May 2018, at 13:33, Ufuk Celebi = <uce@apache.org> wrote:

Hey Tony,

thanks for the detailed = report.

- In = Flink 1.4, jobs are cancelled if the JM looses the connection to ZK and = recovered when the connection is re-established (and one JM becomes = leader again).

-= Regarding the KafkaProducer: I'm not sure from the log message whether = Flink closes the KafkaProducer because the job is cancelled or because = there is a connectivity issue to the Kafka cluster. Including Piotr (cc) = in this thread who has worked on the KafkaProducer in the past. If it is = a connectivity issue, it might also explain why you lost the connection = to ZK.

Glad to = hear that everything is back to normal. Keep us updated if something = unexpected happens again.

=E2=80=93 Ufuk


On= Tue, May 15, 2018 at 6:28 AM, Tony Wei <tony19920430@gmail.com> wrote:
Hi all,

I restarted the cluster = and changed the log level to DEBUG, and raised the parallelism of my = streaming job from 32 to 40.
However, the problem = just disappeared and I don't know why.
I will = remain these settings for a while. If the error happen again, I will = bring more informations back for help. Thank you.

Best Regards,
Tony Wei

2018-05-14 14:24 GMT+08:00 Tony Wei <tony19920430@gmail.com>:
Hi all,

After I changed the `high-availability.zookeeper.client.session-timeout` and `maxSessionTimeout` to 120000ms, the exception still = occurred.

Here is the log = snippet. It seems this is nothing to do with zookeeper client timeout, = but I still don't know why kafka producer would be closed without any = task state changed.

```
2018-05-14 05:18:53,468 WARN  = org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - = Client session timed out, have not heard from server in 82828ms for = sessionid 0x305f957eb8d000a
2018-05-14 05:18:53,468 INFO  = org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - = Client session timed out, have not heard from server in 82828ms for = sessionid 0x305f957eb8d000a, closing socket connection and attempting = reconnect
2018-05-14 05:18:53,571 INFO  = org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: = SUSPENDED
2018-05-14 05:18:53,574 WARN  = org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can = no longer retrieve the leader from ZooKeeper.
2018-05-14 05:18:53,850 WARN  = org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - = SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was = found in specified JAAS configuration file: = '/mnt/jaas-466390940757021791.conf'. Will continue = connection to Zookeeper server without SASL authentication, if Zookeeper = server allows it.
2018-05-14 05:18:53,850 INFO  = org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - = Opening socket connection to server = XXX.XXX.XXX.XXX:2181
2018-05-14 05:18:53,852 ERROR = org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - = Authentication failed
2018-05-14 05:18:53,853 INFO  = org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - = Socket connection established to XXX.XXX.XXX.XXX:2181, initiating = session
2018-05-14 05:18:53,859 INFO  = org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - = Session establishment complete on server XXX.XXX.XXX.XXX:2181, sessionid = =3D 0x305f957eb8d000a, negotiated timeout =3D 120000
2018-05-14 05:18:53,860 INFO  = org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: = RECONNECTED
2018-05-14 05:18:53,860 INFO  = org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. = Leader retrieval can be restarted.
2018-05-14 05:28:54,781 INFO  = org.apache.kafka.clients.producer.KafkaProducer  =              - Closing the Kafka = producer with timeoutMillis =3D 9223372036854775807 ms.
2018-05-14 05:28:54,829 INFO  = org.apache.kafka.clients.producer.KafkaProducer  =              - Closing the Kafka = producer with timeoutMillis =3D 9223372036854775807 ms.
2018-05-14 05:28:54,918 INFO  = org.apache.flink.runtime.taskmanager.Task    =                  - = match-rule -> (get-ordinary -> Sink: kafka-sink, get-cd -> = Sink: kafka-sink-cd) (1/32) (e3462ff8bb565bb0cf4de49ffc2595fb) switched from RUNNING to FAILED.
java.lang.Exception: Failed to send data to Kafka: The server = disconnected before a response was received.
= at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
= at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:288)
= at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
= at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:464)
= at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:441)
= at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:415)
= at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
= at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
= at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
= at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:464)
= at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:441)
= at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:415)
= at org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.collect(CopyingDirectedOutput.java:62)
= at org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.collect(CopyingDirectedOutput.java:34)
= at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
= at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
= at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
= at com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$anonfun$flatMap1$4.apply(MatchRuleOperator.scala:39)
= at com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$anonfun$flatMap1$4.apply(MatchRuleOperator.scala:38)
= at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
= at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
= at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
= at scala.collection.immutable.Map$Map2.foreach(Map.scala:137)
= at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
= at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
= at com.appier.rt.rt_match.flink.operator.MatchRuleOperator.flatMap1(MatchRuleOperator.scala:38)
= at com.appier.rt.rt_match.flink.operator.MatchRuleOperator.flatMap1(MatchRuleOperator.scala:14)
= at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement1(CoStreamFlatMap.java:53)
= at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:243)
= at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
= at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
= at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
= at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response = was received.
```

Best = Regards,
Tony = Wei

2018-05-14 11:36 GMT+08:00 Tony Wei <tony19920430@gmail.com>:
Hi all,

Recently, my flink job met a problem = that caused the job failed and restarted.

The log is list this screen = snapshot

<exception.png>

or this

```
2018-05-11 = 13:21:04,582 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - = Client session timed out, have not heard from server in 61054ms for = sessionid 0x3054b165fe2006a
2018-05-11 = 13:21:04,583 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - = Client session timed out, have not heard from server in 61054ms for = sessionid 0x3054b165fe2006a, closing socket connection and attempting = reconnect
2018-05-11 = 13:21:04,683 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: = SUSPENDED
2018-05-11 = 13:21:04,686 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can = no longer retrieve the leader from ZooKeeper.
2018-05-11 13:21:04,689 INFO  = org.apache.kafka.clients.producer.KafkaProducer  =              - Closing the Kafka = producer with timeoutMillis =3D 9223372036854775807 ms.
2018-05-11 13:21:04,694 INFO  = org.apache.kafka.clients.producer.KafkaProducer  =              - Closing the Kafka = producer with timeoutMillis =3D 9223372036854775807 ms.
2018-05-11 13:21:04,698 INFO  = org.apache.flink.runtime.taskmanager.Task    =                  - = match-rule -> (get-ordinary -> Sink: kafka-sink, get-cd -> = Sink: kafka-sink-cd) (4/32) (65a4044ac963e083f2635fe24e7f2403) switched from RUNNING to FAILED.
java.lang.Exception: Failed to send data to Kafka: The server = disconnected before a response was received.
```

Logs showed `org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with = timeoutMillis =3D 9223372036854775807 ms.` This timeout value is Long.MAX_VALUE. It happened when someone called `producer.close()`.
And I also saw the log said `org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - = Client session timed out, have not heard from server in 61054ms for = sessionid 0x3054b165fe2006a, closing socket connection and attempting = reconnect`
and `org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can = no longer retrieve the leader from ZooKeeper.`

I have checked = zookeeper and kafka and there was no error during that period.
I was wondering if TM will stop the = tasks when it lost zookeeper client in HA mode. Since I didn't see any = document or mailing thread discuss this, I'm not sure if this is the = reason that made kafka producer closed.
Could someone who know HA well? Or someone know what happened = in my job?

My = flink cluster version is 1.4.0 with 2 masters and 10 slaves. My = zookeeper cluster version is 3.4.11 with 3 nodes.
The= `high-availability.zookeeper.client.session-timeout` is default value: 60000 = ms.
The `maxSessionTimeout` in = zoo.cfg is 40000ms.
I have already change the maxSessionTimeout to 120000ms this morning.

This problem happened many many = times during the last weekend and made my kafka log delay grew up. = Please help me. Thank you very much!

Best Regards,
Tony Wei








= --Apple-Mail=_6C2BE91C-8DAD-4505-BF14-ED9B58545D75--