flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yun Tang <myas...@live.com>
Subject Re: Job continuously failing after Checkpoint Restore
Date Wed, 06 Mar 2019 17:14:07 GMT
Hi Laura

>From the exception stack, there exist two possible reasons causing this NPE. Either the
KafkaTopicPartition is null or field topic of that KafkaTopicPartition form the union state
is null. No matter what reason, the problem might existed in the KryoSerializer which used
to de/serialize the KafkaTopicPartition class. Gordon (in CC) who is expert at serialization
might offer more insights.

Before further discussion, would you please offer more information:

  1.  what version of Kafka did you use?
  2.  Did you ever meet this problem ever?
  3.  Have you ever changed anything before resuming your job?
  4.  If trying to restore checkpoint-60 again by submitting another job, will you also meet
this NPE continuously again?

Best
Yun Tang
________________________________
From: Laura Uzc√°tegui <laura.uzcateguij@gmail.com>
Sent: Wednesday, March 6, 2019 21:35
To: user
Subject: Job continuously failing after Checkpoint Restore

Hi,

We are currently running a Flink Job that has 3 operators.

Source ---> Filter ---> Sink

As soon as the job is started it tries to recover from the latest Checkpoint


[05-Mar-2019 13:09:55.365 UTC] INFO <CheckpointCoordinator> Restoring from latest valid
checkpoint: Checkpoint 60 @ 1551788864502 for fd697c91437216e773bb862cbae56e0f.


Then under operators initialization, specifically Source operator which reads from Kafka topics
using a regex pattern, the job starts to fail with the following exception:


[05-Mar-2019 13:10:11.756 UTC] INFO <ExecutionGraph> Job Data Lake Ingestion (fd697c91437216e773bb862cbae56e0f)
switched from state RUNNING to FAILING. java.lang.NullPointerException     at org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition$Comparator.compare(KafkaTopicPartition.java:126)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition$Comparator.compare(KafkaTopicPartition.java:123)
    at java.util.TreeMap.put(Unknown Source)     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:724)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)   
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)     at java.lang.Thread.run(Unknown
Source)


I was wondering if anyone has seen this before?


My assumption will be


We are currently running with the following settings:


  *   Flink version: 1.4.2
  *   Docker image with the job embedded
  *   Job Parallelism : 8

Cheers,

Laura

Mime
View raw message