flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From subash basnet <yasub...@gmail.com>
Subject adding source not serializable exception in streaming implementation
Date Tue, 19 Apr 2016 12:32:56 GMT
Hello all,

My requirement is to re-read the csv file from a file path at certain time
intervals and process the csv data. The csv file gets updated at regular
intervals.
Below is my code:
StreamExecutionEnvironment see =
StreamExecutionEnvironment.getExecutionEnvironment();
*DataStream<String> dataStream = getCsvDataStream(see);*
DataStream<Stock> edits = see.addSource(new FetchStock("path/to/csv"));

In FetchStock.java
public class FetchStock extends RichSourceFunction<Stock> {
public FetchStock(String csvPath) {
this.csvPath = csvPath;
}
}

I am trying to adapt code from *WikipediaAnalysis, *but getting the below
not serializable exception on adding source:
*Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: Object
wikiedits.FetchStock@d7b1517 not serializable*
at
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1075)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1057)
at wikiedits.StockAnalysis.main(StockAnalysis.java:30)
*Caused by: java.io.NotSerializableException:
org.apache.flink.streaming.api.environment.LocalStreamEnvironment*
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
at
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
... 6 more


I have attached Stock.java which is just a model with getters and setters.
Not sure what am I doing wrong.

Best Regards,
Subash Basnet

Mime
View raw message