Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8EB7B19C90 for ; Mon, 7 Mar 2016 16:37:04 +0000 (UTC) Received: (qmail 34541 invoked by uid 500); 7 Mar 2016 16:37:04 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 34447 invoked by uid 500); 7 Mar 2016 16:37:04 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 34437 invoked by uid 99); 7 Mar 2016 16:37:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Mar 2016 16:37:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id CE0BE1806BD for ; Mon, 7 Mar 2016 16:37:03 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.67 X-Spam-Level: * X-Spam-Status: No, score=1.67 tagged_above=-999 required=6.31 tests=[HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RP_MATCHES_RCVD=-0.329, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id Qv2852NADVME for ; Mon, 7 Mar 2016 16:37:00 +0000 (UTC) Received: from zimbra.tngtech.com (zimbra.tngtech.com [212.204.93.107]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 4BB0D5FADC for ; Mon, 7 Mar 2016 16:37:00 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by zimbra.tngtech.com (Postfix) with ESMTP id 3A12BB0025F for ; Mon, 7 Mar 2016 17:36:54 +0100 (CET) Received: from zimbra.tngtech.com ([127.0.0.1]) by localhost (zimbra.tngtech.com [127.0.0.1]) (amavisd-new, port 10032) with ESMTP id YWc7g83PoQid for ; Mon, 7 Mar 2016 17:36:53 +0100 (CET) Received: from localhost (localhost [127.0.0.1]) by zimbra.tngtech.com (Postfix) with ESMTP id 9306FB00436 for ; Mon, 7 Mar 2016 17:36:53 +0100 (CET) X-Virus-Scanned: amavisd-new at tngtech.com Received: from zimbra.tngtech.com ([127.0.0.1]) by localhost (zimbra.tngtech.com [127.0.0.1]) (amavisd-new, port 10026) with ESMTP id wwB4AWBpMuUl for ; Mon, 7 Mar 2016 17:36:53 +0100 (CET) Received: from [172.31.254.117] (unknown [82.113.113.81]) by zimbra.tngtech.com (Postfix) with ESMTPSA id 69B48B0025F for ; Mon, 7 Mar 2016 17:36:53 +0100 (CET) Content-Type: multipart/signed; boundary="Apple-Mail=_E001464D-7E6D-4A8F-8797-AD5B5931169B"; protocol="application/pgp-signature"; micalg=pgp-sha256 Mime-Version: 1.0 (Mac OS X Mail 9.2 \(3112\)) Subject: Re: Jobmanager HA with Rolling Sink in HDFS X-Pgp-Agent: GPGMail 2.6b2 From: Maximilian Bode In-Reply-To: Date: Mon, 7 Mar 2016 17:36:53 +0100 Message-Id: References: <94248245-7C64-40E0-81A7-91B555398DC3@tngtech.com> To: user@flink.apache.org X-Mailer: Apple Mail (2.3112) --Apple-Mail=_E001464D-7E6D-4A8F-8797-AD5B5931169B Content-Type: multipart/alternative; boundary="Apple-Mail=_F58E3EC0-EEFE-4423-91B0-8AA62DC42FC6" --Apple-Mail=_F58E3EC0-EEFE-4423-91B0-8AA62DC42FC6 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 Hi Aljoscha, thank you very much, I will try if this fixes the problem and get back = to you. I am using 1.0.0 as of today :) Cheers, Max =E2=80=94 Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterf=C3=B6hring Gesch=C3=A4ftsf=C3=BChrer: Henrik Klagges, Christoph Stock, Dr. Robert = Dahlke Sitz: Unterf=C3=B6hring * Amtsgericht M=C3=BCnchen * HRB 135082 > Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek : >=20 > Hi Maximilian, > sorry for the delay, we where very busy with the release last week. I = had a hunch about the problem but I think I found a fix now. The problem = is in snapshot restore. When restoring, the sink tries to clean up any = files that where previously in progress. If Flink restores to the same = snapshot twice in a row then it will try to clean up the leftover files = twice but they are not there anymore, this causes the exception. >=20 > I have a fix in my branch: = https://github.com/aljoscha/flink/tree/rolling-sink-fix >=20 > Could you maybe try if this solves your problem? Which version of = Flink are you using? You would have to build from source to try it out. = Alternatively I could build it and put it onto a maven snapshot = repository for you to try it out. >=20 > Cheers, > Aljoscha >> On 03 Mar 2016, at 14:50, Aljoscha Krettek = wrote: >>=20 >> Hi, >> did you check whether there are any files at your specified HDFS = output location? If yes, which files are there? >>=20 >> Cheers, >> Aljoscha >>> On 03 Mar 2016, at 14:29, Maximilian Bode = wrote: >>>=20 >>> Just for the sake of completeness: this also happens when killing a = task manager and is therefore probably unrelated to job manager HA. >>>=20 >>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode = : >>>>=20 >>>> Hi everyone, >>>>=20 >>>> unfortunately, I am running into another problem trying to = establish exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS). >>>>=20 >>>> When using >>>>=20 >>>> RollingSink> sink =3D new = RollingSink>("hdfs://our.machine.com:8020/h= dfs/dir/outbound"); >>>> sink.setBucketer(new NonRollingBucketer()); >>>> output.addSink(sink); >>>>=20 >>>> and then killing the job manager, the new job manager is unable to = restore the old state throwing >>>> --- >>>> java.lang.Exception: Could not restore checkpointed state to = operators and functions >>>> at = org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTas= k.java:454) >>>> at = org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java= :209) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>> at java.lang.Thread.run(Thread.java:744) >>>> Caused by: java.lang.Exception: Failed to restore state to = function: In-Progress file = hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved = to pending nor is still in progress. >>>> at = org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restore= State(AbstractUdfStreamOperator.java:168) >>>> at = org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTas= k.java:446) >>>> ... 3 more >>>> Caused by: java.lang.RuntimeException: In-Progress file = hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved = to pending nor is still in progress. >>>> at = org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingS= ink.java:686) >>>> at = org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingS= ink.java:122) >>>> at = org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restore= State(AbstractUdfStreamOperator.java:165) >>>> ... 4 more >>>> --- >>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in = fact using 2.4.0 =E2=80=93 might this be the same issue? >>>>=20 >>>> Another thing I could think of is that the job is not configured = correctly and there is some sort of timing issue. The checkpoint = interval is 10 seconds, everything else was left at default value. Then = again, as the NonRollingBucketer is used, there should not be any timing = issues, right? >>>>=20 >>>> Cheers, >>>> Max >>>>=20 >>>> [1] https://issues.apache.org/jira/browse/FLINK-2979 >>>>=20 >>>> =E2=80=94 >>>> Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com >>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterf=C3=B6hring= >>>> Gesch=C3=A4ftsf=C3=BChrer: Henrik Klagges, Christoph Stock, Dr. = Robert Dahlke >>>> Sitz: Unterf=C3=B6hring * Amtsgericht M=C3=BCnchen * HRB 135082 >>>>=20 >>>=20 >>=20 >=20 --Apple-Mail=_F58E3EC0-EEFE-4423-91B0-8AA62DC42FC6 Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8 Hi Aljoscha,

thank you very much, I will try if this fixes the problem and = get back to you. I am using 1.0.0 as of today :)

Cheers,
 Max
=E2=80=94 
Maximilian Bode * Junior Consultant * maximilian.bode@tngtech.com
TNG Technology = Consulting GmbH, Betastr. 13a, 85774 Unterf=C3=B6hring
Gesch=C3=A4ftsf=C3=BChrer: Henrik Klagges, Christoph Stock, = Dr. Robert Dahlke
Sitz: Unterf=C3=B6hring * = Amtsgericht M=C3=BCnchen * HRB = 135082

Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <aljoscha@apache.org>:

Hi = Maximilian,
sorry for the delay, we where very busy with = the release last week. I had a hunch about the problem but I think I = found a fix now. The problem is in snapshot restore. When restoring, the = sink tries to clean up any files that where previously in progress. If = Flink restores to the same snapshot twice in a row then it will try to = clean up the leftover files twice but they are not there anymore, this = causes the exception.

I have a fix in my = branch: https://github.com/aljoscha/flink/tree/rolling-sink-fix

Could you maybe try if this solves your = problem? Which version of Flink are you using? You would have to build = from source to try it out. Alternatively I could build it and put it = onto a maven snapshot repository for you to try it out.

Cheers,
Aljoscha
On 03 Mar 2016, at 14:50, Aljoscha Krettek = <aljoscha@apache.org> wrote:

Hi,
did you check whether there are any files = at your specified HDFS output location? If yes, which files are = there?

Cheers,
Aljoscha
On 03 Mar 2016, at = 14:29, Maximilian Bode <maximilian.bode@tngtech.com> wrote:

Just for the sake of completeness: this also happens when = killing a task manager and is therefore probably unrelated to job = manager HA.

Am 03.03.2016 um 14:17 schrieb Maximilian Bode <maximilian.bode@tngtech.com>:

Hi everyone,

unfortunately, I am = running into another problem trying to establish exactly once guarantees = (Kafka -> Flink 1.0.0-rc3 -> HDFS).

When using

RollingSink<Tuple3<Integer,Integer,String>> sink = =3D new RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
sink.setBucketer(new NonRollingBucketer());
output.addSink(sink);

and then = killing the job manager, the new job manager is unable to restore the = old state throwing
---
java.lang.Exception: = Could not restore checkpointed state to operators and functions
= at = org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTas= k.java:454)
at = org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java= :209)
at = org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
= at java.lang.Thread.run(Thread.java:744)
Caused by: = java.lang.Exception: Failed to restore state to function: In-Progress = file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 = was neither moved to pending nor is still in progress.
at = org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restore= State(AbstractUdfStreamOperator.java:168)
at = org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTas= k.java:446)
... 3 more
Caused = by: java.lang.RuntimeException: In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 = was neither moved to pending nor is still in progress.
at = org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingS= ink.java:686)
at = org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingS= ink.java:122)
at = org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restore= State(AbstractUdfStreamOperator.java:165)
... 4 = more
---
I found a resolved issue [1] = concerning Hadoop 2.7.1. We are in fact using 2.4.0 =E2=80=93 might this = be the same issue?

Another thing I could = think of is that the job is not configured correctly and there is some = sort of timing issue. The checkpoint interval is 10 seconds, everything = else was left at default value. Then again, as the NonRollingBucketer is = used, there should not be any timing issues, right?

Cheers,
Max

[1] https://issues.apache.org/jira/browse/FLINK-2979

=E2=80=94
Maximilian Bode * = Junior Consultant * maximilian.bode@tngtech.com
TNG Technology = Consulting GmbH, Betastr. 13a, 85774 Unterf=C3=B6hring
Gesch=C3=A4ftsf=C3=BChrer: Henrik Klagges, Christoph Stock, = Dr. Robert Dahlke
Sitz: Unterf=C3=B6hring * Amtsgericht = M=C3=BCnchen * HRB 135082





= --Apple-Mail=_F58E3EC0-EEFE-4423-91B0-8AA62DC42FC6-- --Apple-Mail=_E001464D-7E6D-4A8F-8797-AD5B5931169B Content-Transfer-Encoding: 7bit Content-Disposition: attachment; filename=signature.asc Content-Type: application/pgp-signature; name=signature.asc Content-Description: Message signed with OpenPGP using GPGMail -----BEGIN PGP SIGNATURE----- iQEcBAEBCAAGBQJW3a4lAAoJEORguq51JMZaGMkIALJdkNI8Gls4nJZL/KREsC5B NMgFMvYH5IUHn96uFWut99J0Ol9mBvpxBK6DwHgpOiUTTmrrEnd2oxkgbZ3bPlBo HZCPeH77WooNZVhuCo0Bb4zdpFeTJcxGpm48FbeN+Ovo2bUkSeFUeDISG7D8RDyr yLhVFWANXzHTvuY6Q11RFBhXhURHVzFA4EsRk1bKnIlfXJRHPISzLaqbuQjJpHfE cvCnrmh6J6mXV8sre6/9Iu5qpZ/ZAECjhNSI7PaHjtKIKx/yk9UAvjcYkiRkds/c Mitx/4fKVhoHqHuxjEbZOfVYpseYMcH2uHNJYnwu84G0z4jD7nfLxPRxfBsO2nQ= =BEau -----END PGP SIGNATURE----- --Apple-Mail=_E001464D-7E6D-4A8F-8797-AD5B5931169B--