Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 715DE200C88 for ; Fri, 2 Jun 2017 11:04:51 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6FE7F160BD2; Fri, 2 Jun 2017 09:04:51 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 67716160BD1 for ; Fri, 2 Jun 2017 11:04:50 +0200 (CEST) Received: (qmail 7441 invoked by uid 500); 2 Jun 2017 09:04:49 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 7432 invoked by uid 99); 2 Jun 2017 09:04:49 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Jun 2017 09:04:49 +0000 Received: from Tzu-Lis-MBP.fritz.box.mail (dslb-084-059-068-070.084.059.pools.vodafone-ip.de [84.59.68.70]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 5EA0F1A0015 for ; Fri, 2 Jun 2017 09:04:48 +0000 (UTC) Date: Fri, 2 Jun 2017 11:04:44 +0200 From: "Tzu-Li (Gordon) Tai" To: user@flink.apache.org Message-ID: In-Reply-To: <1496350765094-13443.post@n4.nabble.com> References: <1486069128798-11413.post@n4.nabble.com> <1496268944314-13412.post@n4.nabble.com> <548BDD77-C9D0-4411-8B19-6F03B166D2B1@data-artisans.com> <1496350765094-13443.post@n4.nabble.com> Subject: Re: Fink: KafkaProducer Data Loss X-Mailer: Airmail (420) MIME-Version: 1.0 Content-Type: multipart/alternative; boundary="59312a2c_3c7fc32d_1625d" archived-at: Fri, 02 Jun 2017 09:04:51 -0000 --59312a2c_3c7fc32d_1625d Content-Type: text/plain; charset="utf-8" Content-Transfer-Encoding: quoted-printable Content-Disposition: inline Hi Ninad, Unfortunately I don=E2=80=99t think the provided logs shed any light here= . It does complain about: 2017-06-01 20:22:44,400 WARN=C2=A0 org.apache.kafka.clients.producer.internals.Sender - Got error=C2=A0 produce response with correlation id 4 on topic-partit=C2=A0 ion topic.http.stream.event.processor-0, retrying (99999 attempts left).=C2= =A0 Error: NOT=5FENOUGH=5FREPLICAS=C2=A0 , not sure if this may be related to not being build with the Cloudera bi= naries. Could you provide info on how exactly you=E2=80=99re verifying the lost m= essages=3F Cheers, Gordon On 1 June 2017 at 11:14:17 PM, ninad (nninad=40gmail.com) wrote: Thanks Gordon and Kostas. =20 Gordon, =20 =22When a failure occurs in the job, =46link uses the last completed chec= kpoint =20 to restart the job. In the case of the =46link Kafka producer, this =20 essentially makes sure that records which did not make it into Kafka and = =20 caused the last run to fail are reprocessed and sent to Kafka again.=22 =20 This is exactly what we were expecting. Thanks for confirming. However, w= e =20 still do not see messages in Kafka. =20 All the Kafka properties are as expected: =20 Replication: 3 =20 Min ISR: 2 =20 acks: all =20 We also tried this with =46link 1.2.1. Now, we haven't tested this with t= he =20 standalone configuration. We will test it to see if the result is differe= nt. =20 That being said, we're running this on cloudera YARN/hadoop cluster. But = we =20 haven't built =46LINK against cloudera binaries. The logs certainly don't= =20 indicate that being the problem. =20 Please let us know how we can troubleshoot this. =20 I have attached the JobManager and TaskManager log files for reference. =20 Relevant logs from the logs files: =20 *Job Manager* =20 2017-06-01 20:22:44,499 IN=46O =20 org.apache.flink.runtime.executiongraph.ExecutionGraph - =20 TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDesc =20 riptor=7Bserializer=3Dorg.apache.flink.api.java.typeutils.runtime.TupleSe= rializer=40e56b3293=7D, =20 ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521 =20 )) -> Sink: sink.http.sep (1/1) (b4a5c72b52779ab9b2b093b85b8b20c9) switch= ed =20 from RUNNING to =46AILED. =20 2017-06-01 20:22:44,530 IN=46O =20 org.apache.flink.runtime.executiongraph.ExecutionGraph - Restarting =20 the job event-filter (510a7a83f509adace6704e7f2caa0b75). =20 2017-06-01 20:22:44,534 IN=46O =20 org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestarter -= =20 Delaying retry of job execution for 10000 ms ... =20 2017-06-01 20:22:48,233 DEBUG =20 org.apache.flink.runtime.metrics.dump.MetricDumpSerialization - =46ailed = to =20 serialize gauge. =20 2017-06-01 20:22:54,535 DEBUG =20 org.apache.flink.runtime.executiongraph.ExecutionGraph - Resetting =20 execution vertex Source: Custom Source (1/1) for new execution. =20 2017-06-01 20:22:54,535 DEBUG =20 org.apache.flink.runtime.executiongraph.ExecutionGraph - Resetting =20 execution vertex TriggerWindow(ProcessingTimeSessionWindows(30000), =20 ListStateDescriptor=7Bserializer=3Dorg.apache.flink.api.java.typeutils.ru= ntime.TupleSerializer=40e56b3293=7D, =20 ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521)) -= > =20 Sink: sink.http.sep (1/1) for new execution. =20 2017-06-01 20:22:54,535 IN=46O =20 org.apache.flink.runtime.executiongraph.ExecutionGraph - Job =20 event-filter (510a7a83f509adace6704e7f2caa0b75) switched from state =20 RESTARTING to CREATED. =20 2017-06-01 20:22:54,536 IN=46O =20 org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - =20 Recovering checkpoints from ZooKeeper. =20 2017-06-01 20:22:54,543 IN=46O =20 org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - =20 =46ound 1 checkpoints in ZooKeeper. =20 2017-06-01 20:22:54,543 IN=46O =20 org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - =20 Trying to retrieve checkpoint 7. =20 2017-06-01 20:22:54,585 IN=46O =20 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring =20 from latest valid checkpoint: Checkpoint 7 =40 1496348508893 for =20 510a7a83f509adace6704e7f2caa0b75. =20 2017-06-01 20:22:54,591 IN=46O =20 org.apache.flink.runtime.executiongraph.ExecutionGraph - Job =20 event-filter (510a7a83f509adace6704e7f2caa0b75) switched from state CREAT= ED =20 to RUNNING. =20 *Task Manager 1* =20 2017-06-01 20:22:44,400 WARN =20 org.apache.kafka.clients.producer.internals.Sender - Got error =20 produce response with correlation id 4 on topic-partit =20 ion topic.http.stream.event.processor-0, retrying (99999 attempts left). = =20 Error: NOT=5FENOUGH=5FREPLICAS =20 2017-06-01 20:22:44,426 IN=46O org.apache.flink.runtime.taskmanager.Task = =20 - Attempting to fail task externally TriggerWindow(ProcessingTimeS =20 essionWindows(30000), =20 ListStateDescriptor=7Bserializer=3Dorg.apache.flink.api.java.typeutils.ru= ntime.TupleSerializer=40e56b3293=7D, =20 ProcessingTimeTrigger(), WindowedS =20 tream.apply(WindowedStream.java:521)) -> Sink: sink.http.sep (1/1) =20 (b4a5c72b52779ab9b2b093b85b8b20c9). =20 2017-06-01 20:22:44,427 IN=46O org.apache.flink.runtime.taskmanager.Task = =20 - TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDesc =20 riptor=7Bserializer=3Dorg.apache.flink.api.java.typeutils.runtime.TupleSe= rializer=40e56b3293=7D, =20 ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521 =20 )) -> Sink: sink.http.sep (1/1) (b4a5c72b52779ab9b2b093b85b8b20c9) switch= ed =20 from RUNNING to =46AILED. =20 TimerException=7Bjava.lang.RuntimeException: Could not forward element to= next =20 operator=7D =20 *Task Manager 2* jobManager.log =20 =20 taskManager.log =20 =20 taskManager.log =20 =20 2017-06-01 20:22:54,741 DEBUG =20 org.apache.flink.runtime.io.network.partition.ResultPartition - Source: =20 Custom Source (1/1) (8ee2c8a628968bc3f8006f0740bb8ad1): =20 Initialized ResultPartition =20 8d68b9c00d6a329d70ee2bf1ed320318=408ee2c8a628968bc3f8006f0740bb8ad1 =20 =5BPIPELINED, 1 subpartitions, 1 pending references=5D =20 2017-06-01 20:22:54,760 IN=46O org.apache.flink.yarn.YarnTaskManager =20 - Received task Source: Custom Source (1/1) =20 2017-06-01 20:27:30,388 WARN org.apache.kafka.clients.NetworkClient =20 - Error while fetching metadata with correlation id 1 : =20 =7Btopic.event.filter=3DLEADER=5FNOT=5FAVAILABLE=7D =20 -- =20 View this message in context: http://apache-flink-user-mailing-list-archi= ve.2336050.n4.nabble.com/=46ink-KafkaProducer-Data-Loss-tp11413p13443.htm= l =20 Sent from the Apache =46link User Mailing List archive. mailing list arch= ive at Nabble.com. =20 --59312a2c_3c7fc32d_1625d Content-Type: text/html; charset="utf-8" Content-Transfer-Encoding: quoted-printable Content-Disposition: inline