flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: filter as termination condition
Date Wed, 22 Jul 2015 12:38:54 GMT
Sachin is right that the filter has to be inverted. Furthermore, the join
operation is not right here. You have to do a kind of a left outer join
where you only keep the elements which join with NULL. Here is an example
of how one could do it [1].

Cheers,
Till

[1]
http://stackoverflow.com/questions/31558326/apache-flink-filter-as-termination-condition/31559947#31559947
​

On Wed, Jul 22, 2015 at 2:23 PM, Sachin Goel <sachingoel0101@gmail.com>
wrote:

> It appears that you're returning true when the previous and current
> solution are the same. You should instead return false in that case,
> because this is when the iteration should terminate.
> Further, instead of joining, it would be a good idea to broadcast the new
> solution to the old solution [or the other way around] and have some
> tolerance value instead of an exact equality check.
>
> Cheers!
> Sachin
>
> -- Sachin Goel
> Computer Science, IIT Delhi
> m. +91-9871457685
> On Jul 22, 2015 5:46 PM, "Stephan Ewen" <sewen@apache.org> wrote:
>
>> Termination happens if the "termination criterion" data set is empty.
>>
>> Maybe your filter is too aggressive and filters out everything, or the
>> join is wrong and nothing joins...
>>
>> On Tue, Jul 21, 2015 at 5:05 PM, Pa Rö <paul.roewer1990@googlemail.com>
>> wrote:
>>
>>> hello,
>>>
>>> i have define a filter for the termination condition by k-means.
>>> if i run my app it always compute only one iteration.
>>>
>>> i think the problem is here:
>>> DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids,
>>> newCentroids.join(loop).where("*").equalTo("*").filter(new MyFilter()));
>>> or maybe the filter function:
>>>     public static final class MyFilter implements
>>> FilterFunction<Tuple2<GeoTimeDataCenter, GeoTimeDataCenter>> {
>>>
>>>         private static final long serialVersionUID =
>>> 5868635346889117617L;
>>>
>>>         public boolean filter(Tuple2<GeoTimeDataCenter,
>>> GeoTimeDataCenter> tuple) throws Exception {
>>>             if(tuple.f0.equals(tuple.f1)) {
>>>                 return true;
>>>             }
>>>             else {
>>>                 return false;
>>>             }
>>>         }
>>>     }
>>>
>>> best regards,
>>> paul
>>>
>>> my full code here:
>>>
>>>     public void run() {
>>>         //load properties
>>>         Properties pro = new Properties();
>>>         FileSystem fs = null;
>>>         try {
>>>
>>> pro.load(FlinkMain.class.getResourceAsStream("/config.properties"));
>>>             fs = FileSystem.get(new
>>> URI(pro.getProperty("hdfs.namenode")),new
>>> org.apache.hadoop.conf.Configuration());
>>>         } catch (Exception e) {
>>>             e.printStackTrace();
>>>         }
>>>
>>>         int maxIteration =
>>> Integer.parseInt(pro.getProperty("maxiterations"));
>>>         String outputPath =
>>> fs.getHomeDirectory()+pro.getProperty("flink.output");
>>>         // set up execution environment
>>>         ExecutionEnvironment env =
>>> ExecutionEnvironment.getExecutionEnvironment();
>>>         // get input points
>>>         DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
>>>         DataSet<GeoTimeDataCenter> centroids = null;
>>>         try {
>>>             centroids = getCentroidDataSet(env);
>>>         } catch (Exception e1) {
>>>             e1.printStackTrace();
>>>         }
>>>         // set number of bulk iterations for KMeans algorithm
>>>         IterativeDataSet<GeoTimeDataCenter> loop =
>>> centroids.iterate(maxIteration);
>>>         DataSet<GeoTimeDataCenter> newCentroids = points
>>>             // compute closest centroid for each point
>>>             .map(new
>>> SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop,
>>> "centroids")
>>>             // count and sum point coordinates for each centroid
>>>             .groupBy(0).reduceGroup(new CentroidAccumulator())
>>>             // compute new centroids from point counts and coordinate
>>> sums
>>>             .map(new CentroidAverager(this.getBenchmarkCounter()));
>>>         // feed new centroids back into next iteration with termination
>>> condition
>>>         DataSet<GeoTimeDataCenter> finalCentroids =
>>> loop.closeWith(newCentroids,
>>> newCentroids.join(loop).where("*").equalTo("*").filter(new MyFilter()));
>>>         DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints
=
>>> points
>>>             // assign points to final clusters
>>>             .map(new
>>> SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids");
>>>         // emit result
>>>         clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
>>>         finalCentroids.writeAsText(outputPath+"/centers");//print();
>>>         // execute program
>>>         try {
>>>             env.execute("KMeans Flink");
>>>         } catch (Exception e) {
>>>             e.printStackTrace();
>>>         }
>>>     }
>>>
>>>     public static final class MyFilter implements
>>> FilterFunction<Tuple2<GeoTimeDataCenter, GeoTimeDataCenter>> {
>>>
>>>         private static final long serialVersionUID =
>>> 5868635346889117617L;
>>>
>>>         public boolean filter(Tuple2<GeoTimeDataCenter,
>>> GeoTimeDataCenter> tuple) throws Exception {
>>>             if(tuple.f0.equals(tuple.f1)) {
>>>                 return true;
>>>             }
>>>             else {
>>>                 return false;
>>>             }
>>>         }
>>>     }
>>>
>>
>>

Mime
View raw message