flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Jobmanager HA with Rolling Sink in HDFS
Date Tue, 08 Mar 2016 12:31:24 GMT
Hi,
a missing part file for one of the parallel sinks is not necessarily a problem. This can happen
if that parallel instance of the sink never received data after the job successfully restarted.


Missing data, however, is a problem. Maybe I need some more information about your setup:

 - When are you inspecting the part files?
 - Do you shutdown the Flink Job before checking? If so, how do you shut it down.
 - When do you know whether all the data from Kafka was consumed by Flink and has passed through
the pipeline into HDFS?

Cheers,
Aljoscha
> On 08 Mar 2016, at 13:19, Maximilian Bode <maximilian.bode@tngtech.com> wrote:
> 
> Hi Aljoscha,
> 
> oh I see. I was under the impression this file was used internally and the output being
completed at the end. Ok, so I extracted the relevant lines using
> 	for i in part-*; do head -c $(cat "_$i.valid-length" | strings) "$i" > "$i.final";
done
> which seems to do the trick.
> 
> Unfortunately, now some records are missing again. In particular, there are the files
> 	part-0-0, part-1-0, ..., part-10-0, part-11-0, each with corresponding .valid-length
files
> 	part-0-1, part-1-1, ..., part-10-0
> in the bucket, where job parallelism=12. So it looks to us as if one of the files was
not even created in the second attempt. This behavior seems to be what somewhat reproducible,
cf. my earlier email where the part-11 file disappeared as well.
> 
> Thanks again for your help.
> 
> Cheers,
>  Max
> — 
> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 
>> Am 08.03.2016 um 11:05 schrieb Aljoscha Krettek <aljoscha@apache.org>:
>> 
>> Hi,
>> are you taking the “.valid-length” files into account. The problem with doing
“exactly-once” with HDFS is that before Hadoop 2.7 it was not possible to truncate files.
So the trick we’re using is to write the length up to which a file is valid if we would
normally need to truncate it. (If the job fails in the middle of writing the output files
have to be truncated to a valid position.) For example, say you have an output file part-8-0.
Now, if there exists a file part-8-0.valid-length this file tells you up to which position
the file part-8-0 is valid. So you should only read up to this point.
>> 
>> The name of the “.valid-length” suffix can also be configured, by the way, as
can all the other stuff.
>> 
>> If this is not the problem then I definitely have to investigate further. I’ll
also look into the Hadoop 2.4.1 build problem.
>> 
>> Cheers,
>> Aljoscha
>>> On 08 Mar 2016, at 10:26, Maximilian Bode <maximilian.bode@tngtech.com>
wrote:
>>> 
>>> Hi Aljoscha,
>>> thanks again for getting back to me. I built from your branch and the exception
is not occurring anymore. The RollingSink state can be restored.
>>> 
>>> Still, the exactly-once guarantee seems not to be fulfilled, there are always
some extra records after killing either a task manager or the job manager. Do you have an
idea where this behavior might be coming from? (I guess concrete numbers will not help greatly
as there are so many parameters influencing them. Still, in our test scenario, we produce
2 million records in a Kafka queue but in the final output files there are on the order of
2.1 million records, so a 5% error. The job is running in a per-job YARN session with n=3,
s=4 with a checkpointing interval of 10s.)
>>> 
>>> On another (maybe unrelated) note: when I pulled your branch, the Travis build
did not go through for -Dhadoop.version=2.4.1. I have not looked into this further as of now,
is this one of the tests known to fail sometimes?
>>> 
>>> Cheers,
>>> Max
>>> <travis.log>
>>> — 
>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>> 
>>>> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <aljoscha@apache.org>:
>>>> 
>>>> Hi Maximilian,
>>>> sorry for the delay, we where very busy with the release last week. I had
a hunch about the problem but I think I found a fix now. The problem is in snapshot restore.
When restoring, the sink tries to clean up any files that where previously in progress. If
Flink restores to the same snapshot twice in a row then it will try to clean up the leftover
files twice but they are not there anymore, this causes the exception.
>>>> 
>>>> I have a fix in my branch: https://github.com/aljoscha/flink/tree/rolling-sink-fix
>>>> 
>>>> Could you maybe try if this solves your problem? Which version of Flink are
you using? You would have to build from source to try it out. Alternatively I could build
it and put it onto a maven snapshot repository for you to try it out.
>>>> 
>>>> Cheers,
>>>> Aljoscha
>>>>> On 03 Mar 2016, at 14:50, Aljoscha Krettek <aljoscha@apache.org>
wrote:
>>>>> 
>>>>> Hi,
>>>>> did you check whether there are any files at your specified HDFS output
location? If yes, which files are there?
>>>>> 
>>>>> Cheers,
>>>>> Aljoscha
>>>>>> On 03 Mar 2016, at 14:29, Maximilian Bode <maximilian.bode@tngtech.com>
wrote:
>>>>>> 
>>>>>> Just for the sake of completeness: this also happens when killing
a task manager and is therefore probably unrelated to job manager HA.
>>>>>> 
>>>>>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode <maximilian.bode@tngtech.com>:
>>>>>>> 
>>>>>>> Hi everyone,
>>>>>>> 
>>>>>>> unfortunately, I am running into another problem trying to establish
exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
>>>>>>> 
>>>>>>> When using
>>>>>>> 
>>>>>>> RollingSink<Tuple3<Integer,Integer,String>> sink
= new RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
>>>>>>> sink.setBucketer(new NonRollingBucketer());
>>>>>>> output.addSink(sink);
>>>>>>> 
>>>>>>> and then killing the job manager, the new job manager is unable
to restore the old state throwing
>>>>>>> ---
>>>>>>> java.lang.Exception: Could not restore checkpointed state to
operators and functions
>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>>>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>> 	at java.lang.Thread.run(Thread.java:744)
>>>>>>> Caused by: java.lang.Exception: Failed to restore state to function:
In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved
to pending nor is still in progress.
>>>>>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>>>>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
>>>>>>> 	... 3 more
>>>>>>> Caused by: java.lang.RuntimeException: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0
was neither moved to pending nor is still in progress.
>>>>>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
>>>>>>> 	at org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
>>>>>>> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>>>>>>> 	... 4 more
>>>>>>> ---
>>>>>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are
in fact using 2.4.0 – might this be the same issue?
>>>>>>> 
>>>>>>> Another thing I could think of is that the job is not configured
correctly and there is some sort of timing issue. The checkpoint interval is 10 seconds, everything
else was left at default value. Then again, as the NonRollingBucketer is used, there should
not be any timing issues, right?
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Max
>>>>>>> 
>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-2979
>>>>>>> 
>>>>>>> — 
>>>>>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert
Dahlke
>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 


Mime
View raw message