flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: Question about DataStream serialization
Date Tue, 01 Dec 2015 17:21:27 GMT
Hi Radu,

both emails reached the mailing list :)

You can not reference to DataSets or DataStreams from inside user defined
functions. Both are just abstractions for a data set or stream, so the
elements are not really inside the set.

We don't have any support for mixing the DataSet and DataStream API.

For your use case, I would recommend you to use a RichFlatMapFunction and
in the open() call read the text file.



On Tue, Dec 1, 2015 at 5:03 PM, Radu Tudoran <radu.tudoran@huawei.com>
wrote:

>
>
> Hello,
>
>
>
> I am not sure if this message was received on the user list, if so I
> apologies for duplicate messages
>
>
>
> I have the following scenario
>
>
>
> ·         Reading a fixed set
>
> DataStream<String> *fixedset* = env.readtextFile(…
>
> ·         Reading a continuous stream of data
>
> DataStream<String> *stream* = ….
>
>
>
> I would need that for each event read from the continuous stream to make
> some operations onit and on the *fixedsettoghether*
>
>
>
>
>
> I have tried something like
>
>
>
> final myObject.referenceStaticSet = fixedset;
>
> stream.map(new MapFunction<String, String>() {
>
>                      @Override
>
>                      public String map(String arg0) throws Exception {
>
>
>
>                            //for example:   final string2add = arg0;
>
>                                                                 //the goal
> of below function would be to add the string2add to the fixedset
>
>                            myObject.referenceStaticSet =
> myObject.referenceStaticSet.flatMap(new FlatMapFunction<String, String>() {
>
>
>
>                                   @Override
>
>                                   public void flatMap(String arg0,
> Collector<String> arg1)
>
>
> //for example adding to the fixed set also the string2add object:
> arg1.collect(string2add);
>
>
>                                 }
>
> …
>
> }
>
>
>
> However,  I get an exception (Exception in thread "main"
> *org.apache.flink.api.common.InvalidProgramException*: ) that object is
> not serializable (Object MyClass$3@a71081 not serializable )
>
>
>
> Looking into this I see that the issues is that the DataStream<> is not
> serializable. What would be the solution to this issue?
>
>
>
> As I mentioned before, I would like that for each event from the
> continuous stream to use the initial fixed set, add the event to it and
> apply an operation.
>
> Stephan was mentioning at some point some possibility to create a DataSet
> and launch a batch processing while operating in stream mode– in case this
> is possible, can you give me a reference for it, because it might be the
> good solution to  use in case. I am thinking that I could keep the fixed
> set as a DataSet and as each new event comes, transform it into a dataset
> and then join with reference set and apply an operation
>
>
>
> Regards,
>
>
>
>
>
>
>
>
>
> Dr. Radu Tudoran
>
> Research Engineer
>
> IT R&D Division
>
>
>
> [image: cid:image007.jpg@01CD52EB.AD060EE0]
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>
> European Research Center
>
> Riesstrasse 25, 80992 München
>
>
>
> E-mail: *radu.tudoran@huawei.com <radu.tudoran@huawei.com>*
>
> Mobile: +49 15209084330
>
> Telephone: +49 891588344173
>
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
>
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
>
>
>
> *From:* Vieru, Mihail [mailto:mihail.vieru@zalando.de]
> *Sent:* Tuesday, December 01, 2015 4:55 PM
> *To:* user@flink.apache.org
> *Subject:* NPE with Flink Streaming from Kafka
>
>
>
> Hi,
>
> we get the following NullPointerException after ~50 minutes when running a
> streaming job with windowing and state that reads data from Kafka and
> writes the result to local FS.
>
> There are around 170 million messages to be processed, Flink 0.10.1 stops
> at ~8 million.
>
> Flink runs locally, started with the "start-cluster-streaming.sh" script.
>
>
> 12/01/2015 15:06:24    Job execution switched to status RUNNING.
> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
> to SCHEDULED
> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
> to DEPLOYING
> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
> SCHEDULED
> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
> DEPLOYING
> 12/01/2015 15:06:24    Source: Custom Source -> Map -> Map(1/1) switched
> to RUNNING
> 12/01/2015 15:06:24    Fast TumblingTimeWindows(5000) of Reduce at
> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
> RUNNING
> 12/01/2015 15:56:08    Fast TumblingTimeWindows(5000) of Reduce at
> main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to
> CANCELED
> 12/01/2015 15:56:08    Source: Custom Source -> Map -> Map(1/1) switched
> to FAILED
> java.lang.Exception
>     at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>     at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
>     at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>     at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>     at
> org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115)
>     at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817)
>     at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>     at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
>     at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
>     at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>     at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332)
>     at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala)
>     at
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112)
>     at
> org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80)
>     at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632)
>
> Any ideas on what could cause this behaviour?
>
>
>
> Best,
>
> Mihail
>

Mime
View raw message