Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B478918472 for ; Tue, 8 Mar 2016 12:19:54 +0000 (UTC) Received: (qmail 3559 invoked by uid 500); 8 Mar 2016 12:19:54 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 3465 invoked by uid 500); 8 Mar 2016 12:19:54 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 3455 invoked by uid 99); 8 Mar 2016 12:19:54 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Mar 2016 12:19:54 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 0DD00C0319 for ; Tue, 8 Mar 2016 12:19:54 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.67 X-Spam-Level: * X-Spam-Status: No, score=1.67 tagged_above=-999 required=6.31 tests=[HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RP_MATCHES_RCVD=-0.329, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id DcCn_gaQ1I4L for ; Tue, 8 Mar 2016 12:19:50 +0000 (UTC) Received: from zimbra.tngtech.com (zimbra.tngtech.com [212.204.93.107]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id C4C8E5FAEE for ; Tue, 8 Mar 2016 12:19:49 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by zimbra.tngtech.com (Postfix) with ESMTP id 7A014B0057E for ; Tue, 8 Mar 2016 13:19:49 +0100 (CET) Received: from zimbra.tngtech.com ([127.0.0.1]) by localhost (zimbra.tngtech.com [127.0.0.1]) (amavisd-new, port 10032) with ESMTP id TJXykyQd9yDm for ; Tue, 8 Mar 2016 13:19:48 +0100 (CET) Received: from localhost (localhost [127.0.0.1]) by zimbra.tngtech.com (Postfix) with ESMTP id B4596B006F2 for ; Tue, 8 Mar 2016 13:19:47 +0100 (CET) X-Virus-Scanned: amavisd-new at tngtech.com Received: from zimbra.tngtech.com ([127.0.0.1]) by localhost (zimbra.tngtech.com [127.0.0.1]) (amavisd-new, port 10026) with ESMTP id 0hPxm-Js7LPS for ; Tue, 8 Mar 2016 13:19:47 +0100 (CET) Received: from [10.4.4.174] (unknown [10.4.4.174]) by zimbra.tngtech.com (Postfix) with ESMTPSA id 425FFB0057E for ; Tue, 8 Mar 2016 13:19:47 +0100 (CET) Content-Type: multipart/signed; boundary="Apple-Mail=_1E9E6D06-646A-4AA4-9CAF-F592C0EF2F4A"; protocol="application/pgp-signature"; micalg=pgp-sha256 Mime-Version: 1.0 (Mac OS X Mail 9.2 \(3112\)) Subject: Re: Jobmanager HA with Rolling Sink in HDFS X-Pgp-Agent: GPGMail 2.6b2 From: Maximilian Bode In-Reply-To: <758CA569-6136-4E75-966F-A6AFCD21759E@apache.org> Date: Tue, 8 Mar 2016 13:19:46 +0100 Message-Id: References: <94248245-7C64-40E0-81A7-91B555398DC3@tngtech.com> <758CA569-6136-4E75-966F-A6AFCD21759E@apache.org> To: user@flink.apache.org X-Mailer: Apple Mail (2.3112) --Apple-Mail=_1E9E6D06-646A-4AA4-9CAF-F592C0EF2F4A Content-Type: multipart/alternative; boundary="Apple-Mail=_F9CA2C03-FB89-440E-A72F-4F0FE5B2359C" --Apple-Mail=_F9CA2C03-FB89-440E-A72F-4F0FE5B2359C Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 Hi Aljoscha, oh I see. I was under the impression this file was used internally and = the output being completed at the end. Ok, so I extracted the relevant = lines using for i in part-*; do head -c $(cat "_$i.valid-length" | strings) = "$i" > "$i.final"; done which seems to do the trick. Unfortunately, now some records are missing again. In particular, there = are the files part-0-0, part-1-0, ..., part-10-0, part-11-0, each with = corresponding .valid-length files part-0-1, part-1-1, ..., part-10-0 in the bucket, where job parallelism=3D12. So it looks to us as if one = of the files was not even created in the second attempt. This behavior = seems to be what somewhat reproducible, cf. my earlier email where the = part-11 file disappeared as well. Thanks again for your help. Cheers, Max =E2=80=94 Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterf=C3=B6hring Gesch=C3=A4ftsf=C3=BChrer: Henrik Klagges, Christoph Stock, Dr. Robert = Dahlke Sitz: Unterf=C3=B6hring * Amtsgericht M=C3=BCnchen * HRB 135082 > Am 08.03.2016 um 11:05 schrieb Aljoscha Krettek : >=20 > Hi, > are you taking the =E2=80=9C.valid-length=E2=80=9D files into account. = The problem with doing =E2=80=9Cexactly-once=E2=80=9D with HDFS is that = before Hadoop 2.7 it was not possible to truncate files. So the trick = we=E2=80=99re using is to write the length up to which a file is valid = if we would normally need to truncate it. (If the job fails in the = middle of writing the output files have to be truncated to a valid = position.) For example, say you have an output file part-8-0. Now, if = there exists a file part-8-0.valid-length this file tells you up to = which position the file part-8-0 is valid. So you should only read up to = this point. >=20 > The name of the =E2=80=9C.valid-length=E2=80=9D suffix can also be = configured, by the way, as can all the other stuff. >=20 > If this is not the problem then I definitely have to investigate = further. I=E2=80=99ll also look into the Hadoop 2.4.1 build problem. >=20 > Cheers, > Aljoscha >> On 08 Mar 2016, at 10:26, Maximilian Bode = wrote: >>=20 >> Hi Aljoscha, >> thanks again for getting back to me. I built from your branch and the = exception is not occurring anymore. The RollingSink state can be = restored. >>=20 >> Still, the exactly-once guarantee seems not to be fulfilled, there = are always some extra records after killing either a task manager or the = job manager. Do you have an idea where this behavior might be coming = from? (I guess concrete numbers will not help greatly as there are so = many parameters influencing them. Still, in our test scenario, we = produce 2 million records in a Kafka queue but in the final output files = there are on the order of 2.1 million records, so a 5% error. The job is = running in a per-job YARN session with n=3D3, s=3D4 with a checkpointing = interval of 10s.) >>=20 >> On another (maybe unrelated) note: when I pulled your branch, the = Travis build did not go through for -Dhadoop.version=3D2.4.1. I have not = looked into this further as of now, is this one of the tests known to = fail sometimes? >>=20 >> Cheers, >> Max >> >> =E2=80=94 >> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterf=C3=B6hring >> Gesch=C3=A4ftsf=C3=BChrer: Henrik Klagges, Christoph Stock, Dr. = Robert Dahlke >> Sitz: Unterf=C3=B6hring * Amtsgericht M=C3=BCnchen * HRB 135082 >>=20 >>> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek = : >>>=20 >>> Hi Maximilian, >>> sorry for the delay, we where very busy with the release last week. = I had a hunch about the problem but I think I found a fix now. The = problem is in snapshot restore. When restoring, the sink tries to clean = up any files that where previously in progress. If Flink restores to the = same snapshot twice in a row then it will try to clean up the leftover = files twice but they are not there anymore, this causes the exception. >>>=20 >>> I have a fix in my branch: = https://github.com/aljoscha/flink/tree/rolling-sink-fix >>>=20 >>> Could you maybe try if this solves your problem? Which version of = Flink are you using? You would have to build from source to try it out. = Alternatively I could build it and put it onto a maven snapshot = repository for you to try it out. >>>=20 >>> Cheers, >>> Aljoscha >>>> On 03 Mar 2016, at 14:50, Aljoscha Krettek = wrote: >>>>=20 >>>> Hi, >>>> did you check whether there are any files at your specified HDFS = output location? If yes, which files are there? >>>>=20 >>>> Cheers, >>>> Aljoscha >>>>> On 03 Mar 2016, at 14:29, Maximilian Bode = wrote: >>>>>=20 >>>>> Just for the sake of completeness: this also happens when killing = a task manager and is therefore probably unrelated to job manager HA. >>>>>=20 >>>>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode = : >>>>>>=20 >>>>>> Hi everyone, >>>>>>=20 >>>>>> unfortunately, I am running into another problem trying to = establish exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS). >>>>>>=20 >>>>>> When using >>>>>>=20 >>>>>> RollingSink> sink =3D new = RollingSink>("hdfs://our.machine.com:8020/h= dfs/dir/outbound"); >>>>>> sink.setBucketer(new NonRollingBucketer()); >>>>>> output.addSink(sink); >>>>>>=20 >>>>>> and then killing the job manager, the new job manager is unable = to restore the old state throwing >>>>>> --- >>>>>> java.lang.Exception: Could not restore checkpointed state to = operators and functions >>>>>> at = org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTas= k.java:454) >>>>>> at = org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java= :209) >>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>> at java.lang.Thread.run(Thread.java:744) >>>>>> Caused by: java.lang.Exception: Failed to restore state to = function: In-Progress file = hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved = to pending nor is still in progress. >>>>>> at = org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restore= State(AbstractUdfStreamOperator.java:168) >>>>>> at = org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTas= k.java:446) >>>>>> ... 3 more >>>>>> Caused by: java.lang.RuntimeException: In-Progress file = hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved = to pending nor is still in progress. >>>>>> at = org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingS= ink.java:686) >>>>>> at = org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingS= ink.java:122) >>>>>> at = org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restore= State(AbstractUdfStreamOperator.java:165) >>>>>> ... 4 more >>>>>> --- >>>>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in = fact using 2.4.0 =E2=80=93 might this be the same issue? >>>>>>=20 >>>>>> Another thing I could think of is that the job is not configured = correctly and there is some sort of timing issue. The checkpoint = interval is 10 seconds, everything else was left at default value. Then = again, as the NonRollingBucketer is used, there should not be any timing = issues, right? >>>>>>=20 >>>>>> Cheers, >>>>>> Max >>>>>>=20 >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-2979 >>>>>>=20 >>>>>> =E2=80=94 >>>>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com >>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterf=C3=B6hri= ng >>>>>> Gesch=C3=A4ftsf=C3=BChrer: Henrik Klagges, Christoph Stock, Dr. = Robert Dahlke >>>>>> Sitz: Unterf=C3=B6hring * Amtsgericht M=C3=BCnchen * HRB 135082 >>>>>>=20 >>>>>=20 >>>>=20 >>>=20 >>=20 >=20 --Apple-Mail=_F9CA2C03-FB89-440E-A72F-4F0FE5B2359C Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8 Hi Aljoscha,

oh I see. I was under the impression this file was used = internally and the output being completed at the end. Ok, so I extracted = the relevant lines using
for i in = part-*; do head -c $(cat "_$i.valid-length" | strings) "$i" > = "$i.final"; done
which seems to do the = trick.

Unfortunately, now some records are missing again. In = particular, there are the files
part-0-0, = part-1-0, ..., part-10-0, part-11-0, each with corresponding = .valid-length files
part-0-1, part-1-1, ..., = part-10-0
in the bucket, where job parallelism=3D12. = So it looks to us as if one of the files was not even created in the = second attempt. This behavior seems to be what somewhat reproducible, = cf. my earlier email where the part-11 file disappeared as = well.

Thanks = again for your help.

Cheers,
 Max
=E2=80=94 
Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
TNG Technology = Consulting GmbH, Betastr. 13a, 85774 Unterf=C3=B6hring
Gesch=C3=A4ftsf=C3=BChrer: Henrik Klagges, Christoph Stock, = Dr. Robert Dahlke
Sitz: Unterf=C3=B6hring * = Amtsgericht M=C3=BCnchen * HRB = 135082

Am 08.03.2016 um 11:05 schrieb Aljoscha Krettek <aljoscha@apache.org>:

Hi,are you taking the =E2=80=9C.valid-length=E2=80=9D files = into account. The problem with doing =E2=80=9Cexactly-once=E2=80=9D with = HDFS is that before Hadoop 2.7 it was not possible to truncate files. So = the trick we=E2=80=99re using is to write the length up to which a file = is valid if we would normally need to truncate it. (If the job fails in = the middle of writing the output files have to be truncated to a valid = position.) For example, say you have an output file part-8-0. Now, if = there exists a file part-8-0.valid-length this file tells you up to = which position the file part-8-0 is valid. So you should only read up to = this point.

The name of the = =E2=80=9C.valid-length=E2=80=9D suffix can also be configured, by the = way, as can all the other stuff.

If this is = not the problem then I definitely have to investigate further. I=E2=80=99l= l also look into the Hadoop 2.4.1 build problem.

Cheers,
Aljoscha
On 08 Mar 2016, at 10:26, Maximilian Bode = <maximilian.bode@tngtech.com> wrote:

Hi Aljoscha,
thanks again for getting back to = me. I built from your branch and the exception is not occurring anymore. = The RollingSink state can be restored.

Still,= the exactly-once guarantee seems not to be fulfilled, there are always = some extra records after killing either a task manager or the job = manager. Do you have an idea where this behavior might be coming from? = (I guess concrete numbers will not help greatly as there are so many = parameters influencing them. Still, in our test scenario, we produce 2 = million records in a Kafka queue but in the final output files there are = on the order of 2.1 million records, so a 5% error. The job is running = in a per-job YARN session with n=3D3, s=3D4 with a checkpointing = interval of 10s.)

On another (maybe = unrelated) note: when I pulled your branch, the Travis build did not go = through for -Dhadoop.version=3D2.4.1. I have not looked into this = further as of now, is this one of the tests known to fail sometimes?

Cheers,
Max
<travis.log>
=E2=80=94
Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
TNG Technology = Consulting GmbH, Betastr. 13a, 85774 Unterf=C3=B6hring
Gesch=C3=A4ftsf=C3=BChrer: Henrik Klagges, Christoph Stock, = Dr. Robert Dahlke
Sitz: Unterf=C3=B6hring * Amtsgericht = M=C3=BCnchen * HRB 135082

Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek = <aljoscha@apache.org>:

Hi = Maximilian,
sorry for the delay, we where very busy with = the release last week. I had a hunch about the problem but I think I = found a fix now. The problem is in snapshot restore. When restoring, the = sink tries to clean up any files that where previously in progress. If = Flink restores to the same snapshot twice in a row then it will try to = clean up the leftover files twice but they are not there anymore, this = causes the exception.

I have a fix in my = branch: https://github.com/aljoscha/flink/tree/rolling-sink-fix

Could you maybe try if this solves your = problem? Which version of Flink are you using? You would have to build = from source to try it out. Alternatively I could build it and put it = onto a maven snapshot repository for you to try it out.

Cheers,
Aljoscha
On 03 Mar 2016, at 14:50, Aljoscha Krettek = <aljoscha@apache.org> wrote:

Hi,
did you check whether there are any files = at your specified HDFS output location? If yes, which files are = there?

Cheers,
Aljoscha
On 03 Mar 2016, at = 14:29, Maximilian Bode <maximilian.bode@tngtech.com> wrote:

Just for the sake of completeness: this also happens when = killing a task manager and is therefore probably unrelated to job = manager HA.

Am 03.03.2016 um 14:17 schrieb Maximilian Bode <maximilian.bode@tngtech.com>:

Hi everyone,

unfortunately, I am = running into another problem trying to establish exactly once guarantees = (Kafka -> Flink 1.0.0-rc3 -> HDFS).

When using

RollingSink<Tuple3<Integer,Integer,String>> sink = =3D new RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
sink.setBucketer(new NonRollingBucketer());
output.addSink(sink);

and then = killing the job manager, the new job manager is unable to restore the = old state throwing
---
java.lang.Exception: = Could not restore checkpointed state to operators and functions
= at = org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTas= k.java:454)
at = org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java= :209)
at = org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
= at java.lang.Thread.run(Thread.java:744)
Caused by: = java.lang.Exception: Failed to restore state to function: In-Progress = file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 = was neither moved to pending nor is still in progress.
at = org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restore= State(AbstractUdfStreamOperator.java:168)
at = org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTas= k.java:446)
... 3 more
Caused = by: java.lang.RuntimeException: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 = was neither moved to pending nor is still in progress.
at = org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingS= ink.java:686)
at = org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingS= ink.java:122)
at = org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restore= State(AbstractUdfStreamOperator.java:165)
... 4 = more
---
I found a resolved issue [1] = concerning Hadoop 2.7.1. We are in fact using 2.4.0 =E2=80=93 might this = be the same issue?

Another thing I could = think of is that the job is not configured correctly and there is some = sort of timing issue. The checkpoint interval is 10 seconds, everything = else was left at default value. Then again, as the NonRollingBucketer is = used, there should not be any timing issues, right?

Cheers,
Max

[1] https://issues.apache.org/jira/browse/FLINK-2979

=E2=80=94
Maximilian Bode * = Junior Consultant * maximilian.bode@tngtech.com
TNG Technology = Consulting GmbH, Betastr. 13a, 85774 Unterf=C3=B6hring
Gesch=C3=A4ftsf=C3=BChrer: Henrik Klagges, Christoph Stock, = Dr. Robert Dahlke
Sitz: Unterf=C3=B6hring * Amtsgericht = M=C3=BCnchen * HRB 135082







= --Apple-Mail=_F9CA2C03-FB89-440E-A72F-4F0FE5B2359C-- --Apple-Mail=_1E9E6D06-646A-4AA4-9CAF-F592C0EF2F4A Content-Transfer-Encoding: 7bit Content-Disposition: attachment; filename=signature.asc Content-Type: application/pgp-signature; name=signature.asc Content-Description: Message signed with OpenPGP using GPGMail -----BEGIN PGP SIGNATURE----- iQEcBAEBCAAGBQJW3sNiAAoJEORguq51JMZa180IAKn25m3NWnhNDOypT0kNTLU0 Q3e5V7nbU9UaH7hQNzwUJ1iF28S4/6T7ocGsoMGHWi8l59z6lY4tdIGK7+Gl1Qpl tn7yVIzY81DFxz6OfwdPig1gJ1txXQChuLE0bswFy0mpTeSMQM+aj55e+FgSseez 99VKZoPMy/HSJsjb60+O3lQ4B1WzCK1yEtedmH/Pj9/caFBg5aWV0ThMTu8GP/Jz XGD53QIEl+tCx5nuP8w3lcm6j/Ckv0KbRS3+HtA/HoZqzOQO6qW13Pjisxj9eVvi O0Dud8aJXya9Au09d6Egm7YPteIshs7Gk4jCXo4G2Acp1z+AdeOVSvu/mUoVh7k= =5HrA -----END PGP SIGNATURE----- --Apple-Mail=_1E9E6D06-646A-4AA4-9CAF-F592C0EF2F4A--