Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7AC9B18E3D for ; Tue, 8 Dec 2015 07:43:14 +0000 (UTC) Received: (qmail 41201 invoked by uid 500); 8 Dec 2015 07:43:14 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 41104 invoked by uid 500); 8 Dec 2015 07:43:14 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 41095 invoked by uid 99); 8 Dec 2015 07:43:14 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Dec 2015 07:43:14 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id BF203180999 for ; Tue, 8 Dec 2015 07:43:13 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.501 X-Spam-Level: * X-Spam-Status: No, score=1.501 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, KAM_NOCONFIDENCE=0.5, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id pDwi0i77yy8N for ; Tue, 8 Dec 2015 07:43:04 +0000 (UTC) Received: from mx2.mailbox.org (mx2.mailbox.org [80.241.60.215]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id F02E8428ED for ; Tue, 8 Dec 2015 07:43:03 +0000 (UTC) Received: from smtp1.mailbox.org (smtp1.mailbox.org [80.241.60.240]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by mx2.mailbox.org (Postfix) with ESMTPS id 6D39D41F60 for ; Tue, 8 Dec 2015 08:42:27 +0100 (CET) X-Virus-Scanned: amavisd-new at heinlein-support.de Received: from smtp1.mailbox.org ([80.241.60.240]) by gerste.heinlein-support.de (gerste.heinlein-support.de [91.198.250.173]) (amavisd-new, port 10030) with ESMTP id u80qyYiRn4Nk for ; Tue, 8 Dec 2015 08:42:25 +0100 (CET) Subject: Re: Question about DataStream serialization To: user@flink.apache.org References: <8B754047F81D6B4290B9F4CE928333A517A1B791@lhreml503-mbx> <8B754047F81D6B4290B9F4CE928333A517A1D201@lhreml503-mbx> From: "Matthias J. Sax" X-Enigmail-Draft-Status: N1110 Message-ID: <566689B4.4050104@apache.org> Date: Tue, 8 Dec 2015 08:41:40 +0100 User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:38.0) Gecko/20100101 Icedove/38.4.0 MIME-Version: 1.0 In-Reply-To: <8B754047F81D6B4290B9F4CE928333A517A1D201@lhreml503-mbx> Content-Type: multipart/signed; micalg=pgp-sha256; protocol="application/pgp-signature"; boundary="OxoparncvXt9nw3BA3bt111vMsbQvheLp" This is an OpenPGP/MIME signed message (RFC 4880 and 3156) --OxoparncvXt9nw3BA3bt111vMsbQvheLp Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable Hi Radu, you are right. The open() method is called for each parallel instance of a rich function. Thus, if all instanced use the same code, you might read the same data multiple times. The easiest was to distinguish different instanced within open() is to user the RuntimeContext. If offers two methods "int getNumberOfParallelSubtasks()" and "int getIndexOfThisSubtask()" that you can use to compute your own partitioning within open(). For example (just a sketch): @Override public void open(Configuration parameters) throws Exception { RuntimeContext context =3D super.getRuntimeContext(); int dop =3D context.getNumberOfParallelSubtasks(); int idx =3D context.getIndexOfThisSubtask(); // open file // get size of file in bytes // seek to partition #idx: long seek =3D fileSize * idx / dop; // read "fileSize/dop" bytes } Hope this helps. -Matthias On 12/08/2015 04:28 AM, Radu Tudoran wrote: > Hi, >=20 > =20 >=20 > Taking the example you mentioned of using RichFlatMapFunction and in th= e > open() reading a file. >=20 > Would this open function be executed on each node where the > RichFlatMapFunction gets executed? (I have done some tests and I would > get the feeling it does =E2=80=93 but I wanted to double - check ) >=20 > If so, would this mean that the same data will be loaded multiple times= > on each parallel instance? Is there anyway, this can be prevented and > the data to be hashed and partitioned somehow across nodes? >=20 > =20 >=20 > Would using the operator state help?: >=20 > =E2=80=9C >=20 > OperatorState*<*MyList*>*dataset*;* >=20 > =E2=80=9D >=20 > I would be curious in this case how could the open function look like t= o > initialize the data for this operator state: >=20 > =20 >=20 > =20 >=20 > I have tried to just read a file and write it into the dataset, but I > encountered a strange behavior that would look like the flatmap functio= n > gets executed before the open function, which leads to using an empty > dataset in the flatmap function while when this finish executing the > dataset gets loaded. Is this an error or I am doing something wrong? >=20 > =20 >=20 > =20 >=20 > =20 >=20 > Dr. Radu Tudoran >=20 > Research Engineer >=20 > IT R&D Division >=20 > =20 >=20 > cid:image007.jpg@01CD52EB.AD060EE0 >=20 > HUAWEI TECHNOLOGIES Duesseldorf GmbH >=20 > European Research Center >=20 > Riesstrasse 25, 80992 M=C3=BCnchen >=20 > =20 >=20 > E-mail: _radu.tudoran@huawei.com_ >=20 > Mobile: +49 15209084330 >=20 > Telephone: +49 891588344173 >=20 > =20 >=20 > HUAWEI TECHNOLOGIES Duesseldorf GmbH > Hansaallee 205, 40549 D=C3=BCsseldorf, Germany, www.huawei.com > > Registered Office: D=C3=BCsseldorf, Register Court D=C3=BCsseldorf, HRB= 56063, > Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN > Sitz der Gesellschaft: D=C3=BCsseldorf, Amtsgericht D=C3=BCsseldorf, HR= B 56063, > Gesch=C3=A4ftsf=C3=BChrer: Jingwen TAO, Wanzhou MENG, Lifang CHEN >=20 > This e-mail and its attachments contain confidential information from > HUAWEI, which is intended only for the person or entity whose address i= s > listed above. Any use of the information contained herein in any way > (including, but not limited to, total or partial disclosure, > reproduction, or dissemination) by persons other than the intended > recipient(s) is prohibited. If you receive this e-mail in error, please= > notify the sender by phone or email immediately and delete it! >=20 > =20 >=20 > *From:*Robert Metzger [mailto:rmetzger@apache.org] > *Sent:* Tuesday, December 01, 2015 6:21 PM > *To:* user@flink.apache.org > *Cc:* Goetz Brasche > *Subject:* Re: Question about DataStream serialization >=20 > =20 >=20 > Hi Radu, >=20 > =20 >=20 > both emails reached the mailing list :) >=20 > =20 >=20 > You can not reference to DataSets or DataStreams from inside user > defined functions. Both are just abstractions for a data set or stream,= > so the elements are not really inside the set.=20 >=20 > =20 >=20 > We don't have any support for mixing the DataSet and DataStream API. >=20 > =20 >=20 > For your use case, I would recommend you to use a RichFlatMapFunction > and in the open() call read the text file. >=20 > =20 >=20 > =20 >=20 > =20 >=20 > On Tue, Dec 1, 2015 at 5:03 PM, Radu Tudoran > wrote: >=20 > =20 >=20 > Hello, >=20 > =20 >=20 > I am not sure if this message was received on the user list, if so I > apologies for duplicate messages >=20 > =20 >=20 > I have the following scenario =20 >=20 > =20 >=20 > =C2=B7 Reading a fixed set >=20 > DataStream /fixedset/ =3D env.readtextFile(=E2=80=A6 >=20 > =C2=B7 Reading a continuous stream of data >=20 > DataStream /stream/ =3D =E2=80=A6. >=20 > =20 >=20 > I would need that for each event read from the continuous stream to mak= e > some operations onit and on the /fixedsettoghether/ >=20 > =20 >=20 > =20 >=20 > I have tried something like >=20 > =20 >=20 > final myObject.referenceStaticSet =3D fixedset; >=20 > stream.map(new MapFunction() { >=20 > @Override >=20 > public String map(String arg0) throws Exception { >=20 > =20 >=20 > //for example: final string2add =3D arg0; >=20 > //the > goal of below function would be to add the string2add to the fixedset >=20 > myObject.referenceStaticSet =3D > myObject.referenceStaticSet.flatMap(new FlatMapFunction= () { >=20 > =20 >=20 > @Override >=20 > public void flatMap(String arg0, > Collector arg1) >=20 > = =20 > //for example adding to the fixed set also the string2add object: =20 > arg1.collect(string2add); >=20 > =20 > } >=20 > =E2=80=A6 >=20 > } >=20 > =20 >=20 > However, I get an exception (Exception in thread "main" > _org.apache.flink.api.common.InvalidProgramException_: ) that object is= > not serializable (Object MyClass$3@a71081 not serializable ) >=20 > =20 >=20 > Looking into this I see that the issues is that the DataStream<> is not= > serializable. What would be the solution to this issue? >=20 > =20 >=20 > As I mentioned before, I would like that for each event from the > continuous stream to use the initial fixed set, add the event to it and= > apply an operation. >=20 > Stephan was mentioning at some point some possibility to create a > DataSet and launch a batch processing while operating in stream mode=E2= =80=93 in > case this is possible, can you give me a reference for it, because it > might be the good solution to use in case. I am thinking that I could > keep the fixed set as a DataSet and as each new event comes, transform > it into a dataset and then join with reference set and apply an operati= on >=20 > =20 >=20 > Regards, >=20 > =20 >=20 > =20 >=20 > =20 >=20 > =20 >=20 > Dr. Radu Tudoran >=20 > Research Engineer >=20 > IT R&D Division >=20 > =20 >=20 > cid:image007.jpg@01CD52EB.AD060EE0 >=20 > HUAWEI TECHNOLOGIES Duesseldorf GmbH >=20 > European Research Center >=20 > Riesstrasse 25, 80992 M=C3=BCnchen >=20 > =20 >=20 > E-mail: _radu.tudoran@huawei.com _ >=20 > Mobile: +49 15209084330 >=20 > Telephone: +49 891588344173 >=20 > =20 >=20 > HUAWEI TECHNOLOGIES Duesseldorf GmbH > Hansaallee 205, 40549 D=C3=BCsseldorf, Germany, www.huawei.com > > Registered Office: D=C3=BCsseldorf, Register Court D=C3=BCsseldorf, HRB= 56063, > Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN > Sitz der Gesellschaft: D=C3=BCsseldorf, Amtsgericht D=C3=BCsseldorf, HR= B 56063, > Gesch=C3=A4ftsf=C3=BChrer: Jingwen TAO, Wanzhou MENG, Lifang CHEN >=20 > This e-mail and its attachments contain confidential information from > HUAWEI, which is intended only for the person or entity whose address i= s > listed above. Any use of the information contained herein in any way > (including, but not limited to, total or partial disclosure, > reproduction, or dissemination) by persons other than the intended > recipient(s) is prohibited. If you receive this e-mail in error, please= > notify the sender by phone or email immediately and delete it! >=20 > =20 >=20 > *From:*Vieru, Mihail [mailto:mihail.vieru@zalando.de > ] > *Sent:* Tuesday, December 01, 2015 4:55 PM > *To:* user@flink.apache.org > *Subject:* NPE with Flink Streaming from Kafka >=20 > =20 >=20 > Hi, >=20 > we get the following NullPointerException after ~50 minutes when runnin= g > a streaming job with windowing and state that reads data from Kafka and= > writes the result to local FS. >=20 > There are around 170 million messages to be processed, Flink 0.10.1 > stops at ~8 million. >=20 > Flink runs locally, started with the "start-cluster-streaming.sh" scrip= t. >=20 >=20 > 12/01/2015 15:06:24 Job execution switched to status RUNNING. > 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switche= d > to SCHEDULED > 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switche= d > to DEPLOYING > 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at > main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to > SCHEDULED > 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at > main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to > DEPLOYING > 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switche= d > to RUNNING > 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at > main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to > RUNNING > 12/01/2015 15:56:08 Fast TumblingTimeWindows(5000) of Reduce at > main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to > CANCELED > 12/01/2015 15:56:08 Source: Custom Source -> Map -> Map(1/1) switche= d > to FAILED > java.lang.Exception > at > org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run= (LegacyFetcher.java:242) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(Flin= kKafkaConsumer.java:397) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.= java:58) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStr= eamTask.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.j= ava:218) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at > org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:= 115) > at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817) > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:6= 75) > at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:8= 13) > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) > at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332) > at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala) > at > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHa= ndler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112) > at > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHa= ndler.commit(ZookeeperOffsetHandler.java:80) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$Periodic= OffsetCommitter.run(FlinkKafkaConsumer.java:632) >=20 > Any ideas on what could cause this behaviour? >=20 > =20 >=20 > Best, >=20 > Mihail >=20 > =20 >=20 --OxoparncvXt9nw3BA3bt111vMsbQvheLp Content-Type: application/pgp-signature; name="signature.asc" Content-Description: OpenPGP digital signature Content-Disposition: attachment; filename="signature.asc" -----BEGIN PGP SIGNATURE----- Version: GnuPG v2 iQIcBAEBCAAGBQJWZom1AAoJEFCVK48prEZ4n2gP/3DVrHfuYaOeLhOjHdVArVp3 Z27jDjlgaG64mDKVmPZgSJ5mEPnl2SsLOMmpcypIPd/z+9e/aNzL3qPxS+RZa2mJ e/nhByZ6mQBVbEWJYO+gcdLGHv2+fphp+rqkKW0hCBZlm/lgf9HbCDrLddOF+H1+ L9oqTM36+lDyn7v9hyoSUTAIBCKjiDjRLZuY1KzObcWtn6k90r2VDE4OYlxtMWyz snj+kEBRjMGvleW128uqu3+HO+gQjyFhCCl02/KfEe18OsQajXpR+olStnf0DMAZ dPMbd+sZS2yrL/qhNO6glueumxhHVRwbmYaBenrqz66hPyB5RBl/wzE5qS5a5fVz RChabGLiyeZdFAhP8Ye6wShNkFy8S6OoYkFiF4mZNqgbT5CYiYnAiNJJlnhjEOmh eGpTkfguzdf8gf4+TE5bVIs16xBDsjQw2m14rk4YX0rfIepsjNQ2mfdIvma8UBzL C8sHsMFMgyUwWPCnQQmPeEzmYxaEFgDxMUFhAEGt+Z3nasHaxZfA3xWyKIz6iN6x C6AWt2dmHsDbTZbuiEzzf3gt6pgyqlb7rFLfWQigaHQTgnwpRUYX7CLePmfR1poe cHS/ZUoewm4V6EiVDaSafjjRnYBMsacMXn80w3OVtZvmemBueesXiIEKezc+1NEY BpoWNG2DKlsbu0OCVwfx =nDdl -----END PGP SIGNATURE----- --OxoparncvXt9nw3BA3bt111vMsbQvheLp--