flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pa Rö <paul.roewer1...@googlemail.com>
Subject filter as termination condition
Date Tue, 21 Jul 2015 15:05:21 GMT
hello,

i have define a filter for the termination condition by k-means.
if i run my app it always compute only one iteration.

i think the problem is here:
DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids,
newCentroids.join(loop).where("*").equalTo("*").filter(new MyFilter()));
or maybe the filter function:
    public static final class MyFilter implements
FilterFunction<Tuple2<GeoTimeDataCenter, GeoTimeDataCenter>> {

        private static final long serialVersionUID = 5868635346889117617L;

        public boolean filter(Tuple2<GeoTimeDataCenter, GeoTimeDataCenter>
tuple) throws Exception {
            if(tuple.f0.equals(tuple.f1)) {
                return true;
            }
            else {
                return false;
            }
        }
    }

best regards,
paul

my full code here:

    public void run() {
        //load properties
        Properties pro = new Properties();
        FileSystem fs = null;
        try {

pro.load(FlinkMain.class.getResourceAsStream("/config.properties"));
            fs = FileSystem.get(new
URI(pro.getProperty("hdfs.namenode")),new
org.apache.hadoop.conf.Configuration());
        } catch (Exception e) {
            e.printStackTrace();
        }

        int maxIteration =
Integer.parseInt(pro.getProperty("maxiterations"));
        String outputPath =
fs.getHomeDirectory()+pro.getProperty("flink.output");
        // set up execution environment
        ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
        // get input points
        DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
        DataSet<GeoTimeDataCenter> centroids = null;
        try {
            centroids = getCentroidDataSet(env);
        } catch (Exception e1) {
            e1.printStackTrace();
        }
        // set number of bulk iterations for KMeans algorithm
        IterativeDataSet<GeoTimeDataCenter> loop =
centroids.iterate(maxIteration);
        DataSet<GeoTimeDataCenter> newCentroids = points
            // compute closest centroid for each point
            .map(new
SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop,
"centroids")
            // count and sum point coordinates for each centroid
            .groupBy(0).reduceGroup(new CentroidAccumulator())
            // compute new centroids from point counts and coordinate sums
            .map(new CentroidAverager(this.getBenchmarkCounter()));
        // feed new centroids back into next iteration with termination
condition
        DataSet<GeoTimeDataCenter> finalCentroids =
loop.closeWith(newCentroids,
newCentroids.join(loop).where("*").equalTo("*").filter(new MyFilter()));
        DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
            // assign points to final clusters
            .map(new
SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids");
        // emit result
        clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
        finalCentroids.writeAsText(outputPath+"/centers");//print();
        // execute program
        try {
            env.execute("KMeans Flink");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static final class MyFilter implements
FilterFunction<Tuple2<GeoTimeDataCenter, GeoTimeDataCenter>> {

        private static final long serialVersionUID = 5868635346889117617L;

        public boolean filter(Tuple2<GeoTimeDataCenter, GeoTimeDataCenter>
tuple) throws Exception {
            if(tuple.f0.equals(tuple.f1)) {
                return true;
            }
            else {
                return false;
            }
        }
    }

Mime
View raw message