flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shivam Sharma <28shivamsha...@gmail.com>
Subject Re: Facing issue of long running Flink job on Yarn
Date Wed, 20 Dec 2017 11:06:11 GMT
Hi Stefan,

Kafka one node was down. But I want it to restart automatically . How can I
solve it?

Thanks

On Wed, Dec 20, 2017 at 4:24 PM, Stefan Richter <s.richter@data-artisans.com
> wrote:

> Hi,
>
> did you see that the problem starts from a Kafka exception „Failed to send
> data to Kafka: This server is not the leader for that topic-partition.“? Is
> it possible that you had a network issue and the producer could not find
> the leader broker?
>
> Best,
> Stefan
>
> > Am 20.12.2017 um 10:57 schrieb Shivam Sharma <28shivamsharma@gmail.com>:
> >
> > ​Hi ,
> >
> > I am always facing this issue with Flink job on yarn.
> > Basically I am reading data from kafka, transforming it & putting in
> kafka
> > only.
> >
> > My build.sbt is:
> >
> > val flinkVersion = "1.3.2"
> > val flinkKafkaConnect = "0.10.2"
> >
> > libraryDependencies ++= Seq(
> >    "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
> >    "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
> >    "org.apache.flink" %% "flink-table" % flinkVersion,
> >    "org.json4s" %% "json4s-native" % "3.5.3",
> >    "org.json4s" %% "json4s-jackson" % "3.5.3"
> >
> > )
> >
> > *Note: One of the node in our kafka Cluster ​goes down.*
> >
> >
> > java.lang.RuntimeException: Exception occurred while processing valve
> > output watermark:
> > at
> > org.apache.flink.streaming.runtime.io.StreamInputProcessor$
> ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
> > java:289)
> > at
> > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.
> findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.
> > java:173)
> > at
> > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.
> inputWatermark(StatusWatermarkValve.
> > java:108)
> > at
> > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(
> StreamInputProcessor.
> > java:188)
> > at
> > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.
> > java:69)
> > at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.
> > java:263)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.flink.streaming.runtime.tasks.
> > ExceptionInChainedOperatorException: Could not forward element to next
> > operator
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:530)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:503)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:483)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:891)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:869)
> > at
> > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.
> > java:51)
> > at
> > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.
> collect(TimeWindowPropertyCollector.
> > scala:58)
> > at
> > org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunc
> tion.apply(IncrementalAggregateWindowFunction.
> > scala:75)
> > at
> > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow
> Function.apply(IncrementalAggregateTimeWindowFunction.
> > scala:65)
> > at
> > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow
> Function.apply(IncrementalAggregateTimeWindowFunction.
> > scala:36)
> > at
> > org.apache.flink.streaming.runtime.operators.windowing.functions.
> InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunct
> ion.
> > java:44)
> > at
> > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.
> emitWindowContents(WindowOperator.
> > java:597)
> > at
> > org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.onEventTime(WindowOperator.
> > java:504)
> > at
> > org.apache.flink.streaming.api.operators.HeapInternalTimerService.
> advanceWatermark(HeapInternalTimerService.
> > java:275)
> > at
> > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.
> advanceWatermark(InternalTimeServiceManager.
> > java:107)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> processWatermark(AbstractStreamOperator.
> > java:946)
> > at
> > org.apache.flink.streaming.runtime.io.StreamInputProcessor$
> ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
> > java:286)
> > ... 7 more
> > Caused by: org.apache.flink.streaming.runtime.tasks.
> > ExceptionInChainedOperatorException: Could not forward element to next
> > operator
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:530)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:503)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:483)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:891)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:869)
> > at
> > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.
> > java:51)
> > at
> > org.apache.flink.table.runtime.CRowWrappingCollector.
> collect(CRowWrappingCollector.
> > scala:37)
> > at
> > org.apache.flink.table.runtime.CRowWrappingCollector.
> collect(CRowWrappingCollector.
> > scala:28)
> > at DataStreamCalcRule$127.processElement(Unknown Source)
> > at
> > org.apache.flink.table.runtime.CRowProcessRunner.processElement(
> CRowProcessRunner.
> > scala:67)
> > at
> > org.apache.flink.table.runtime.CRowProcessRunner.processElement(
> CRowProcessRunner.
> > scala:35)
> > at
> > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(
> ProcessOperator.
> > java:66)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:528)
> > ... 23 more
> > Caused by: org.apache.flink.streaming.runtime.tasks.
> > ExceptionInChainedOperatorException: Could not forward element to next
> > operator
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:530)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:503)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:483)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:891)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:869)
> > at
> > org.apache.flink.streaming.api.operators.StreamMap.
> processElement(StreamMap.
> > java:41)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:528)
> > ... 35 more
> > Caused by: org.apache.flink.streaming.runtime.tasks.
> > ExceptionInChainedOperatorException: Could not forward element to next
> > operator
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:530)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:503)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:483)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:891)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:869)
> > at
> > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.
> > java:51)
> > at
> > org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
> anonfun$flatMap$1.apply(DataStream.
> > scala:622)
> > at
> > org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
> anonfun$flatMap$1.apply(DataStream.
> > scala:622)
> > at scala.collection.immutable.List.foreach(List.scala:381)
> > at
> > org.apache.flink.streaming.api.scala.DataStream$$anon$6.
> flatMap(DataStream.
> > scala:622)
> > at
> > org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.
> > java:50)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:528)
> > ... 41 more
> > Caused by: java.lang.Exception: Failed to send data to Kafka: This server
> > is not the leader for that topic-partition.
> > at
> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.
> checkErroneous(FlinkKafkaProducerBase.
> > java:373)
> > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
> > .invokeInternal(FlinkKafkaProducer010.java:302)
> > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
> > .invoke(FlinkKafkaProducer010.java:407)
> > at
> > org.apache.flink.streaming.api.operators.StreamSink.
> processElement(StreamSink.
> > java:41)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:528)
> > ... 52 more
> > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException
> :
> > This server is not the leader for that topic-partition.
> > 2017-12-20 05:42:16,008 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> > Persuasion (b65f972af4866ff0548bf7f0438caf8f) switched from state
> RUNNING
> > to FAILING.
> > java.lang.RuntimeException: Exception occurred while processing valve
> > output watermark:
> > at
> > org.apache.flink.streaming.runtime.io.StreamInputProcessor$
> ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
> > java:289)
> > at
> > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.
> findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.
> > java:173)
> > at
> > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.
> inputWatermark(StatusWatermarkValve.
> > java:108)
> > at
> > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(
> StreamInputProcessor.
> > java:188)
> > at
> > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.
> > java:69)
> > at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.
> > java:263)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.flink.streaming.runtime.tasks.
> > ExceptionInChainedOperatorException: Could not forward element to next
> > operator
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:530)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:503)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:483)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:891)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:869)
> > at
> > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.
> > java:51)
> > at
> > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.
> collect(TimeWindowPropertyCollector.
> > scala:58)
> > at
> > org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunc
> tion.apply(IncrementalAggregateWindowFunction.
> > scala:75)
> > at
> > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow
> Function.apply(IncrementalAggregateTimeWindowFunction.
> > scala:65)
> > at
> > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow
> Function.apply(IncrementalAggregateTimeWindowFunction.
> > scala:36)
> > at
> > org.apache.flink.streaming.runtime.operators.windowing.functions.
> InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunct
> ion.
> > java:44)
> > at
> > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.
> emitWindowContents(WindowOperator.
> > java:597)
> > at
> > org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.onEventTime(WindowOperator.
> > java:504)
> > at
> > org.apache.flink.streaming.api.operators.HeapInternalTimerService.
> advanceWatermark(HeapInternalTimerService.
> > java:275)
> > at
> > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.
> advanceWatermark(InternalTimeServiceManager.
> > java:107)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> processWatermark(AbstractStreamOperator.
> > java:946)
> > at
> > org.apache.flink.streaming.runtime.io.StreamInputProcessor$
> ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
> > java:286)
> > ... 7 more
> > Caused by: org.apache.flink.streaming.runtime.tasks.
> > ExceptionInChainedOperatorException: Could not forward element to next
> > operator
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:530)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:503)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:483)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:891)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:869)
> > at
> > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.
> > java:51)
> > at
> > org.apache.flink.table.runtime.CRowWrappingCollector.
> collect(CRowWrappingCollector.
> > scala:37)
> > at
> > org.apache.flink.table.runtime.CRowWrappingCollector.
> collect(CRowWrappingCollector.
> > scala:28)
> > at DataStreamCalcRule$127.processElement(Unknown Source)
> > at
> > org.apache.flink.table.runtime.CRowProcessRunner.processElement(
> CRowProcessRunner.
> > scala:67)
> > at
> > org.apache.flink.table.runtime.CRowProcessRunner.processElement(
> CRowProcessRunner.
> > scala:35)
> > at
> > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(
> ProcessOperator.
> > java:66)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:528)
> > ... 23 more
> > Caused by: org.apache.flink.streaming.runtime.tasks.
> > ExceptionInChainedOperatorException: Could not forward element to next
> > operator
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:530)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:503)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:483)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:891)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:869)
> > at
> > org.apache.flink.streaming.api.operators.StreamMap.
> processElement(StreamMap.
> > java:41)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:528)
> > ... 35 more
> > Caused by: org.apache.flink.streaming.runtime.tasks.
> > ExceptionInChainedOperatorException: Could not forward element to next
> > operator
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:530)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:503)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.
> > java:483)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:891)
> > at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.
> > java:869)
> > at
> > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.
> > java:51)
> > at
> > org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
> anonfun$flatMap$1.apply(DataStream.
> > scala:622)
> > at
> > org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
> anonfun$flatMap$1.apply(DataStream.
> > scala:622)
> > at scala.collection.immutable.List.foreach(List.scala:381)
> > at
> > org.apache.flink.streaming.api.scala.DataStream$$anon$6.
> flatMap(DataStream.
> > scala:622)
> > at
> > org.apache.flink.streaming.api.operators.StreamFlatMap.
> processElement(StreamFlatMap.
> > java:50)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:528)
> > ... 41 more
> > Caused by: java.lang.Exception: Failed to send data to Kafka: This server
> > is not the leader for that topic-partition.
> > at
> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.
> checkErroneous(FlinkKafkaProducerBase.
> > java:373)
> > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
> > .invokeInternal(FlinkKafkaProducer010.java:302)
> > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
> > .invoke(FlinkKafkaProducer010.java:407)
> > at
> > org.apache.flink.streaming.api.operators.StreamSink.
> processElement(StreamSink.
> > java:41)
> > at
> > org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.
> > java:528)
> > ... 52 more
> >
> >
> > Thanks
> >
> > --
> > Shivam Sharma
> > Data Engineer @ Goibibo
> > Indian Institute Of Information Technology, Design and Manufacturing
> > Jabalpur
> > Mobile No- (+91) 8882114744
> > Email:- 28shivamsharma@gmail.com
> > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
> > <https://www.linkedin.com/in/28shivamsharma>*
>
>


-- 
Shivam Sharma
Data Engineer @ Goibibo
Indian Institute Of Information Technology, Design and Manufacturing
Jabalpur
Mobile No- (+91) 8882114744
Email:- 28shivamsharma@gmail.com
LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
<https://www.linkedin.com/in/28shivamsharma>*

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message