flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sachin Goel <sachingoel0...@gmail.com>
Subject Re: loop break operation
Date Mon, 20 Jul 2015 10:16:50 GMT
Gah. Sorry.
In the closeWith call, give a second argument which determines if the
iteration should be stopped.

-- Sachin Goel
Computer Science, IIT Delhi
m. +91-9871457685
On Jul 20, 2015 3:21 PM, "Pa Rö" <paul.roewer1990@googlemail.com> wrote:

> i not found the "iterateWithTermination" function, only "iterate" and
> "iterateDelta". i use flink 0.9.0 with java.
>
> 2015-07-20 11:30 GMT+02:00 Sachin Goel <sachingoel0101@gmail.com>:
>
>> Hi
>> You can use iterateWithTermination to terminate before max iterations.
>> The feedback for iteration then would be (next solution, isConverged) where
>> isConverged is an empty data set if you wish to terminate.
>> However, this is something I have a pull request for:
>> https://github.com/apache/flink/pull/918. Take a look.
>>
>> -- Sachin Goel
>> Computer Science, IIT Delhi
>> m. +91-9871457685
>>
>> On Mon, Jul 20, 2015 at 2:55 PM, Pa Rö <paul.roewer1990@googlemail.com>
>> wrote:
>>
>>> hello community,
>>>
>>> i have write a k-means app in flink, now i want change my terminate
>>> condition from max iteration to checking the changing of the cluster
>>> centers, but i don't know how i can break the flink loop. here my execution
>>> code of flink:
>>>
>>> public void run() {
>>>         //load properties
>>>         Properties pro = new Properties();
>>>         FileSystem fs = null;
>>>         try {
>>>
>>> pro.load(FlinkMain.class.getResourceAsStream("/config.properties"));
>>>             fs = FileSystem.get(new
>>> URI(pro.getProperty("hdfs.namenode")),new
>>> org.apache.hadoop.conf.Configuration());
>>>         } catch (Exception e) {
>>>             e.printStackTrace();
>>>         }
>>>
>>>         int maxIteration =
>>> Integer.parseInt(pro.getProperty("maxiterations"));
>>>         String outputPath =
>>> fs.getHomeDirectory()+pro.getProperty("flink.output");
>>>         // set up execution environment
>>>         ExecutionEnvironment env =
>>> ExecutionEnvironment.getExecutionEnvironment();
>>>         // get input points
>>>         DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
>>>         DataSet<GeoTimeDataCenter> centroids = null;
>>>         try {
>>>             centroids = getCentroidDataSet(env);
>>>         } catch (Exception e1) {
>>>             e1.printStackTrace();
>>>         }
>>>         // set number of bulk iterations for KMeans algorithm
>>>         IterativeDataSet<GeoTimeDataCenter> loop =
>>> centroids.iterate(maxIteration);
>>>         DataSet<GeoTimeDataCenter> newCentroids = points
>>>             // compute closest centroid for each point
>>>             .map(new
>>> SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop,
>>> "centroids")
>>>             // count and sum point coordinates for each centroid
>>>             .groupBy(0).reduceGroup(new CentroidAccumulator())
>>>             // compute new centroids from point counts and coordinate
>>> sums
>>>             .map(new CentroidAverager(this.getBenchmarkCounter()));
>>>         // feed new centroids back into next iteration
>>>         DataSet<GeoTimeDataCenter> finalCentroids =
>>> loop.closeWith(newCentroids);
>>>         DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints
=
>>> points
>>>             // assign points to final clusters
>>>             .map(new
>>> SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids");
>>>         // emit result
>>>         clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
>>>         finalCentroids.writeAsText(outputPath+"/centers");//print();
>>>         // execute program
>>>         try {
>>>             env.execute("KMeans Flink");
>>>         } catch (Exception e) {
>>>             e.printStackTrace();
>>>         }
>>>     }
>>>
>>> is it possible to use a contruct like: if(centroids equals points){break
>>> the loop}???
>>>
>>> best regards,
>>> paul
>>>
>>
>>
>

Mime
View raw message