flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From subash basnet <yasub...@gmail.com>
Subject Broadcast and read broadcast variable in DataStream
Date Mon, 16 May 2016 19:37:08 GMT
Hello all,

How could I broadcast the variable in Datastream or perform similar
operation so that I could read the value as in DataSet:
IterativeDataSet<Centroid> *loop* = centroids.iterate(numIterations);
DataSet<Centroid> *newCentroids* = points.map(new SelectNearestCenter()).
*withBroadcastSet*(*loop*, "*centroids*") ...
INSIDE map function:
@Override public void open(...){ .
this.*centroids* = getRuntimeContext().getBroadcastVariable("*centroids*");
}

Is defining '*loop*' as a global variable is only the option to use it in
the map functions. Any other possible methods.
When I use *loop *as global variable and read it inside map function as
below via *DataStreamUtils*:

private static IterativeStream<Centroid> *loop*;
...
*loop* = centroids.iterate(numIterations);
...
INSIDE map function
@Override public void open(...){
Iterator<Centroid> iter = DataStreamUtils.collect(*loop*);
this.*centroids* = Lists.newArrayList(iter);
}

It throws below exception upon execution:
Exception in thread "Thread-13" java.lang.RuntimeException: Exception in
execute()
at
org.apache.flink.contrib.streaming.DataStreamUtils$CallExecute.run(DataStreamUtils.java:82)
*Caused by: java.lang.IllegalStateException: Iteration
FeedbackTransformation{id=15, name='Feedback',
outputType=PojoType<wikiedits.StockAnalysis$Centroid, fields = [id: String,
pt: BasicArrayTypeInfo<Double>]>, parallelism=4} does not have any feedback
edges.*
at
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformFeedback(StreamGraphGenerator.java:295)
at
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:166)
at
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformSink(StreamGraphGenerator.java:441)
at
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:158)
at
org.apache.flink.streaming.api.graph.StreamGraphGenerator.generateInternal(StreamGraphGenerator.java:127)
at
org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:119)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1197)
at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:86)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1170)
at
org.apache.flink.contrib.streaming.DataStreamUtils$CallExecute.run(DataStreamUtils.java:80)


Could you please suggest me possible cause and solution to this exception,
as I am not able to see any other option beside to use global variable in
absence of broadcast of variable in datastream.

Best Regards,
Subash Basnet

Mime
View raw message