flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From subash basnet <yasub...@gmail.com>
Subject Understanding iteration error message
Date Wed, 20 Jul 2016 09:33:02 GMT
Hello all,

When I execute the below streaming code:
DataStream<Centroid> *centroids* = newCentroidDataStream.map(new
TupleCentroidConverter());
ConnectedIterativeStreams<Point, Centroid> loop =
 *points*.iterate().withFeedbackType(Centroid.class);
DataStream<Centroid> *newCentroids* = loop.flatMap(new
SelectNearestCenter(10)).map(new CountAppender()).keyBy(0)
.reduce(new CentroidAccumulator()).map(new CentroidAverager());
DataStream<Centroid> finalCentroids = loop.closeWith(*centroids*
.broadcast());

the following exception arises:
Exception in thread "main" java.lang.UnsupportedOperationException:* Cannot
close an iteration with a feedback DataStream that does not originate from
said iteration.*
at
org.apache.flink.streaming.api.datastream.IterativeStream$ConnectedIterativeStreams.closeWith(IterativeStream.java:181)

If I use loop.closeWith(newCentroids.broadcast()) it works fine. I am not
able to fully understand the error message. Could you explain it more in
depth the error message in relation to above code.

Best Regards,
Subash Basnet

Mime
View raw message