flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maximilian Bode <maximilian.b...@tngtech.com>
Subject Re: Jobmanager HA with Rolling Sink in HDFS
Date Tue, 08 Mar 2016 09:26:01 GMT
Hi Aljoscha,
thanks again for getting back to me. I built from your branch and the exception is not occurring
anymore. The RollingSink state can be restored.

Still, the exactly-once guarantee seems not to be fulfilled, there are always some extra records
after killing either a task manager or the job manager. Do you have an idea where this behavior
might be coming from? (I guess concrete numbers will not help greatly as there are so many
parameters influencing them. Still, in our test scenario, we produce 2 million records in
a Kafka queue but in the final output files there are on the order of 2.1 million records,
so a 5% error. The job is running in a per-job YARN session with n=3, s=4 with a checkpointing
interval of 10s.)

On another (maybe unrelated) note: when I pulled your branch, the Travis build did not go
through for -Dhadoop.version=2.4.1. I have not looked into this further as of now, is this
one of the tests known to fail sometimes?

Cheers,
 Max

—
Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <aljoscha@apache.org>:
> 
> Hi Maximilian,
> sorry for the delay, we where very busy with the release last week. I had a hunch about
the problem but I think I found a fix now. The problem is in snapshot restore. When restoring,
the sink tries to clean up any files that where previously in progress. If Flink restores
to the same snapshot twice in a row then it will try to clean up the leftover files twice
but they are not there anymore, this causes the exception.
> 
> I have a fix in my branch: https://github.com/aljoscha/flink/tree/rolling-sink-fix
> 
> Could you maybe try if this solves your problem? Which version of Flink are you using?
You would have to build from source to try it out. Alternatively I could build it and put
it onto a maven snapshot repository for you to try it out.
> 
> Cheers,
> Aljoscha
>> On 03 Mar 2016, at 14:50, Aljoscha Krettek <aljoscha@apache.org> wrote:
>> 
>> Hi,
>> did you check whether there are any files at your specified HDFS output location?
If yes, which files are there?
>> 
>> Cheers,
>> Aljoscha
>>> On 03 Mar 2016, at 14:29, Maximilian Bode <maximilian.bode@tngtech.com>
wrote:
>>> 
>>> Just for the sake of completeness: this also happens when killing a task manager
and is therefore probably unrelated to job manager HA.
>>> 
>>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode <maximilian.bode@tngtech.com>:
>>>> 
>>>> Hi everyone,
>>>> 
>>>> unfortunately, I am running into another problem trying to establish exactly
once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
>>>> 
>>>> When using
>>>> 
>>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
>>>> sink.setBucketer(new NonRollingBucketer());
>>>> output.addSink(sink);
>>>> 
>>>> and then killing the job manager, the new job manager is unable to restore
the old state throwing
>>>> ---
>>>> java.lang.Exception: Could not restore checkpointed state to operators and
functions
>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>> 	at java.lang.Thread.run(Thread.java:744)
>>>> Caused by: java.lang.Exception: Failed to restore state to function: In-Progress
file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved to pending nor
is still in progress.
>>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
>>>> 	... 3 more
>>>> Caused by: java.lang.RuntimeException: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0
was neither moved to pending nor is still in progress.
>>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
>>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
>>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>>>> 	... 4 more
>>>> ---
>>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using
2.4.0 – might this be the same issue?
>>>> 
>>>> Another thing I could think of is that the job is not configured correctly
and there is some sort of timing issue. The checkpoint interval is 10 seconds, everything
else was left at default value. Then again, as the NonRollingBucketer is used, there should
not be any timing issues, right?
>>>> 
>>>> Cheers,
>>>> Max
>>>> 
>>>> [1] https://issues.apache.org/jira/browse/FLINK-2979
>>>> 
>>>> —
>>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>> 
>>> 
>> 
> 


Mime
View raw message