flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From subash basnet <yasub...@gmail.com>
Subject DataStreamUtils conversion problem, showing varied results for same code
Date Thu, 21 Jul 2016 10:20:02 GMT
Hello all,

My task to cluster the stream of points around the centroids, I am using
DataStreamUtils to collect the stream and pass it on to the map function to
perform the necessary action. Below is the code:

DataStream<Point> points = newDataStream.map(new getPoints());
DataStream<Centroid> centroids = newCentroidDataStream.map(new
TupleCentroidConverter());

Iterator<Centroid> iter = *DataStreamUtils*.collect(centroids);
Collection<Centroid> collectionCentroids = Lists.newArrayList(iter);
DataStream<Centroid> newCentroids = points.map(new
SelectNearestCenter(collectionCentroids))
.map(new CountAppender()).keyBy(0).reduce(new
CentroidAccumulator()).map(new CentroidAverager());

Iterator<Centroid> iter1 = *DataStreamUtils*.collect(newCentroids);
Collection<Centroid> finalCentroidsCollection = Lists.newArrayList(iter1);
DataStream<Tuple2<String, Point>> clusteredPoints = points
// assign points to final clusters
.map(new SelectNearestCenter(finalCentroidsCollection));
*clusteredPoints*.print();

public static final class SelectNearestCenter extends
RichMapFunction<Point, Tuple2<String, Point>> {
private Collection<Centroid> centroids;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
public SelectNearestCenter(Collection<Centroid> centroids) {
this.centroids = centroids;
}
@Override
public Tuple2<String, Point> map(Point p) throws Exception {

double minDistance = Double.MAX_VALUE;
String closestCentroidId = "-1";
                        ..................
return new Tuple2<String, Point>(closestCentroidId, p);
}
}

Cases:
1. Waited for around 10mins, and the *clusteredPoints *got printed but with
centroid id as '-1' for all the points. And the execution ends after a
certain time, due to multiple execution since there is one already inside
the datastreamutil.
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.. How to get rid of this exception.

1> (-1, 121.86 121.87 121.8149 121.8149 60600.0)
4> (-1, 121.52 121.52 121.45 121.485 28800.0)
.........

2. Waited for around 10mins, the *clusteredPoints *got printed with the
centroid id as desired shown below. The *clusteredPoint* also gets printed
in the console in streaming manner.  And throws no exception at all. The
streaming continues.

1> (Wed Jul 20 16:45:01 CEST 2016, 121.555 121.56 121.53 121.5385 69300.0)
1> (Wed Jul 20 18:19:00 CEST 2016, 121.8699 121.89 121.86 121.86 25700.0)
3> (Wed Jul 20 16:41:59 CEST 2016, 121.415 121.47 121.41 121.4658 38400.0)
1> (Wed Jul 20 18:13:59 CEST 2016, 121.86 121.87 121.8149 121.8149 60600.0)
4> (Wed Jul 20 16:43:59 CEST 2016, 121.52 121.52 121.45 121.485 28800.0)
3> (Wed Jul 20 18:16:59 CEST 2016, 121.8716 121.92 121.85 121.9141 64500.0)
4> (Wed Jul 20 18:15:00 CEST 2016, 121.92 121.92 121.88 121.88 53500.0)
4> (Wed Jul 20 18:12:04 CEST 2016, 121.82 121.82 121.74 121.74 43600.0)
..............

3. The *clusteredPoints* is printed with the centroid id as desired in
streaming manner. But after certain duration the exception same as in case
1 is thrown and the program ends abruptly.

Why so much variation in result on executing the same code. Now, in case of
centroid id as '-1' in case 1, I would not be able to perform operations
later on as all the *clusteredPoints* have the same centroid id '-1' which
should have been rather timestamp as shown in case 2. How could be the
solution to this issue.

Best Regards,
Subash Basnet

Mime
View raw message