flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sachin Goel <sachingoel0...@gmail.com>
Subject Re: filter as termination condition
Date Wed, 22 Jul 2015 12:23:54 GMT
It appears that you're returning true when the previous and current
solution are the same. You should instead return false in that case,
because this is when the iteration should terminate.
Further, instead of joining, it would be a good idea to broadcast the new
solution to the old solution [or the other way around] and have some
tolerance value instead of an exact equality check.

Cheers!
Sachin

-- Sachin Goel
Computer Science, IIT Delhi
m. +91-9871457685
On Jul 22, 2015 5:46 PM, "Stephan Ewen" <sewen@apache.org> wrote:

> Termination happens if the "termination criterion" data set is empty.
>
> Maybe your filter is too aggressive and filters out everything, or the
> join is wrong and nothing joins...
>
> On Tue, Jul 21, 2015 at 5:05 PM, Pa Rö <paul.roewer1990@googlemail.com>
> wrote:
>
>> hello,
>>
>> i have define a filter for the termination condition by k-means.
>> if i run my app it always compute only one iteration.
>>
>> i think the problem is here:
>> DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids,
>> newCentroids.join(loop).where("*").equalTo("*").filter(new MyFilter()));
>> or maybe the filter function:
>>     public static final class MyFilter implements
>> FilterFunction<Tuple2<GeoTimeDataCenter, GeoTimeDataCenter>> {
>>
>>         private static final long serialVersionUID = 5868635346889117617L;
>>
>>         public boolean filter(Tuple2<GeoTimeDataCenter,
>> GeoTimeDataCenter> tuple) throws Exception {
>>             if(tuple.f0.equals(tuple.f1)) {
>>                 return true;
>>             }
>>             else {
>>                 return false;
>>             }
>>         }
>>     }
>>
>> best regards,
>> paul
>>
>> my full code here:
>>
>>     public void run() {
>>         //load properties
>>         Properties pro = new Properties();
>>         FileSystem fs = null;
>>         try {
>>
>> pro.load(FlinkMain.class.getResourceAsStream("/config.properties"));
>>             fs = FileSystem.get(new
>> URI(pro.getProperty("hdfs.namenode")),new
>> org.apache.hadoop.conf.Configuration());
>>         } catch (Exception e) {
>>             e.printStackTrace();
>>         }
>>
>>         int maxIteration =
>> Integer.parseInt(pro.getProperty("maxiterations"));
>>         String outputPath =
>> fs.getHomeDirectory()+pro.getProperty("flink.output");
>>         // set up execution environment
>>         ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>>         // get input points
>>         DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
>>         DataSet<GeoTimeDataCenter> centroids = null;
>>         try {
>>             centroids = getCentroidDataSet(env);
>>         } catch (Exception e1) {
>>             e1.printStackTrace();
>>         }
>>         // set number of bulk iterations for KMeans algorithm
>>         IterativeDataSet<GeoTimeDataCenter> loop =
>> centroids.iterate(maxIteration);
>>         DataSet<GeoTimeDataCenter> newCentroids = points
>>             // compute closest centroid for each point
>>             .map(new
>> SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop,
>> "centroids")
>>             // count and sum point coordinates for each centroid
>>             .groupBy(0).reduceGroup(new CentroidAccumulator())
>>             // compute new centroids from point counts and coordinate sums
>>             .map(new CentroidAverager(this.getBenchmarkCounter()));
>>         // feed new centroids back into next iteration with termination
>> condition
>>         DataSet<GeoTimeDataCenter> finalCentroids =
>> loop.closeWith(newCentroids,
>> newCentroids.join(loop).where("*").equalTo("*").filter(new MyFilter()));
>>         DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints =
>> points
>>             // assign points to final clusters
>>             .map(new
>> SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids");
>>         // emit result
>>         clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
>>         finalCentroids.writeAsText(outputPath+"/centers");//print();
>>         // execute program
>>         try {
>>             env.execute("KMeans Flink");
>>         } catch (Exception e) {
>>             e.printStackTrace();
>>         }
>>     }
>>
>>     public static final class MyFilter implements
>> FilterFunction<Tuple2<GeoTimeDataCenter, GeoTimeDataCenter>> {
>>
>>         private static final long serialVersionUID = 5868635346889117617L;
>>
>>         public boolean filter(Tuple2<GeoTimeDataCenter,
>> GeoTimeDataCenter> tuple) throws Exception {
>>             if(tuple.f0.equals(tuple.f1)) {
>>                 return true;
>>             }
>>             else {
>>                 return false;
>>             }
>>         }
>>     }
>>
>
>

Mime
View raw message