flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maximilian Bode <maximilian.b...@tngtech.com>
Subject Jobmanager HA with Rolling Sink in HDFS
Date Thu, 03 Mar 2016 13:17:26 GMT
Hi everyone,

unfortunately, I am running into another problem trying to establish exactly once guarantees
(Kafka -> Flink 1.0.0-rc3 -> HDFS).

When using

RollingSink<Tuple3<Integer,Integer,String>> sink = new RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
sink.setBucketer(new NonRollingBucketer());
output.addSink(sink);

and then killing the job manager, the new job manager is unable to restore the old state throwing
---
java.lang.Exception: Could not restore checkpointed state to operators and functions
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
	at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.Exception: Failed to restore state to function: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0
was neither moved to pending nor is still in progress.
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
	... 3 more
Caused by: java.lang.RuntimeException: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0
was neither moved to pending nor is still in progress.
	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
	... 4 more
---
I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using 2.4.0 – might
this be the same issue?

Another thing I could think of is that the job is not configured correctly and there is some
sort of timing issue. The checkpoint interval is 10 seconds, everything else was left at default
value. Then again, as the NonRollingBucketer is used, there should not be any timing issues,
right?

Cheers,
 Max

[1] https://issues.apache.org/jira/browse/FLINK-2979

—
Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Mime
View raw message