flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Paul Röwer <paul.roewer1...@googlemail.com>
Subject Re: k means - waiting for dataset
Date Fri, 22 May 2015 21:09:10 GMT
good evening,

sorry, my english is not the best.

by comupte the new centroid, i will sum all points of the cluster and 
form the new center.
in my other implementation firstly i sum all point and at the end i 
divides by number of points.
to example: (1+2+3+4)/4=2,5

at flink i reduce always two point to one,
for the example upstairs: (1+2)/2=1,5 --> (1,5+3)/2=2,25 --> (2,25+4)=3,125

how can i rewrite my function so, that it work like my other implementation?

best regards,
paul


Am 22.05.2015 um 16:52 schrieb Stephan Ewen:
> Sorry, I don't understand the question.
>
> Can you describe a bit better what you mean with "how i can sum all 
> points and share thoug the counter" ?
>
> Thanks!
>
> On Fri, May 22, 2015 at 2:06 PM, Pa Rö <paul.roewer1990@googlemail.com 
> <mailto:paul.roewer1990@googlemail.com>> wrote:
>
>     i have fix a bug at the input reading, but the results are still
>     different.
>
>     i think i have local the problem, in the other implementation i
>     sum all geo points/time points and share thougt the counter.
>     but in flink i sum two points and share thougt two, and sum the
>     next...
>
>     the method is the following:
>
>     // sums and counts point coordinates
>         private static final class CentroidAccumulator implements
>     ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {
>
>             private static final long serialVersionUID =
>     -4868797820391121771L;
>
>             public Tuple2<Integer, GeoTimeDataTupel>
>     reduce(Tuple2<Integer, GeoTimeDataTupel> val1, Tuple2<Integer,
>     GeoTimeDataTupel> val2) {
>                 return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0,
>     addAndDiv(val1.f0,val1.f1,val2.f1));
>             }
>         }
>
>         private static GeoTimeDataTupel addAndDiv(int
>     clusterid,GeoTimeDataTupel input1, GeoTimeDataTupel input2){
>             long time = (input1.getTime()+input2.getTime())/2;
>             List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
>             list.add(input1.getGeo());
>             list.add(input2.getGeo());
>             LatLongSeriable geo = Geometry.getGeoCenterOf(list);
>
>             return new GeoTimeDataTupel(geo,time,"POINT");
>         }
>
>     how i can sum all points and share thoug the counter?
>
>
>     2015-05-22 9:53 GMT+02:00 Pa Rö <paul.roewer1990@googlemail.com
>     <mailto:paul.roewer1990@googlemail.com>>:
>
>         hi,
>         if i print the centroids all are show in the output. i have
>         implement k means with map reduce und spark. by same input, i
>         get the same output. but in flink i get a one cluster output
>         with this input set. (i use csv files from the GDELT projekt)
>
>         here my class:
>
>         public class FlinkMain {
>
>
>             public static void main(String[] args) {
>                 //load properties
>                 Properties pro = new Properties();
>                 try {
>                     pro.load(new
>         FileInputStream("./resources/config.properties"));
>                 } catch (Exception e) {
>                     e.printStackTrace();
>                 }
>                 int maxIteration =
>         1;//Integer.parseInt(pro.getProperty("maxiterations"));
>                 String outputPath = pro.getProperty("flink.output");
>                 // set up execution environment
>                 ExecutionEnvironment env =
>         ExecutionEnvironment.getExecutionEnvironment();
>                 // get input points
>                 DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
>                 DataSet<GeoTimeDataCenter> centroids =
>         getCentroidDataSet(env);
>                 // set number of bulk iterations for KMeans algorithm
>         IterativeDataSet<GeoTimeDataCenter> loop =
>         centroids.iterate(maxIteration);
>                 DataSet<GeoTimeDataCenter> newCentroids = points
>                     // compute closest centroid for each point
>                     .map(new
>         SelectNearestCenter()).withBroadcastSet(loop, "centroids")
>                     // count and sum point coordinates for each centroid
>                     .groupBy(0).reduce(new CentroidAccumulator())
>                     // compute new centroids from point counts and
>         coordinate sums
>                     .map(new CentroidAverager());
>                 // feed new centroids back into next iteration
>                 DataSet<GeoTimeDataCenter> finalCentroids =
>         loop.closeWith(newCentroids);
>                 DataSet<Tuple2<Integer, GeoTimeDataTupel>>
>         clusteredPoints = points
>                     // assign points to final clusters
>                     .map(new
>         SelectNearestCenter()).withBroadcastSet(finalCentroids,
>         "centroids");
>                 // emit result
>         clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
>         finalCentroids.writeAsText(outputPath+"/centers");//print();
>                 // execute program
>                 try {
>                     env.execute("KMeans Flink");
>                 } catch (Exception e) {
>                     e.printStackTrace();
>                 }
>             }
>
>             private static final class SelectNearestCenter extends
>         RichMapFunction<GeoTimeDataTupel,Tuple2<Integer,GeoTimeDataTupel>>
>         {
>
>                 private static final long serialVersionUID =
>         -2729445046389350264L;
>                 private Collection<GeoTimeDataCenter> centroids;
>
>                 @Override
>                 public void open(Configuration parameters) throws
>         Exception {
>                     this.centroids =
>         getRuntimeContext().getBroadcastVariable("centroids");
>                 }
>
>                 @Override
>                 public Tuple2<Integer, GeoTimeDataTupel>
>         map(GeoTimeDataTupel point) throws Exception {
>                     double minDistance = Double.MAX_VALUE;
>                     int closestCentroidId= -1;
>
>                     // check all cluster centers
>                     for(GeoTimeDataCenter centroid : centroids) {
>                         // compute distance
>                         double distance = Distance.ComputeDist(point,
>         centroid);
>                         // update nearest cluster if necessary
>                         if(distance < minDistance) {
>                             minDistance = distance;
>                             closestCentroidId = centroid.getId();
>                         }
>                     }
>                     // emit a new record with the center id and the
>         data point
>                     return new Tuple2<Integer,
>         GeoTimeDataTupel>(closestCentroidId, point);
>                 }
>             }
>
>             // sums and counts point coordinates
>             private static final class CentroidAccumulator implements
>         ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {
>
>                 private static final long serialVersionUID =
>         -4868797820391121771L;
>
>                 public Tuple2<Integer, GeoTimeDataTupel>
>         reduce(Tuple2<Integer, GeoTimeDataTupel> val1, Tuple2<Integer,
>         GeoTimeDataTupel> val2) {
>                     return new Tuple2<Integer,
>         GeoTimeDataTupel>(val1.f0, addAndDiv(val1.f1,val2.f1));
>                 }
>             }
>
>             private static GeoTimeDataTupel addAndDiv(GeoTimeDataTupel
>         input1, GeoTimeDataTupel input2){
>                 long time = (input1.getTime()+input2.getTime())/2;
>                 List<LatLongSeriable> list = new
>         ArrayList<LatLongSeriable>();
>                 list.add(input1.getGeo());
>                 list.add(input2.getGeo());
>                 LatLongSeriable geo = Geometry.getGeoCenterOf(list);
>
>                 return new GeoTimeDataTupel(geo,time,"POINT");
>             }
>
>             // computes new centroid from coordinate sum and count of
>         points
>             private static final class CentroidAverager implements
>         MapFunction<Tuple2<Integer, GeoTimeDataTupel>,
>         GeoTimeDataCenter> {
>
>                 private static final long serialVersionUID =
>         -2687234478847261803L;
>
>                 public GeoTimeDataCenter map(Tuple2<Integer,
>         GeoTimeDataTupel> value) {
>                     return new GeoTimeDataCenter(value.f0,
>         value.f1.getGeo(),value.f1.getTime());
>                 }
>             }
>
>             private static DataSet<GeoTimeDataTupel>
>         getPointDataSet(ExecutionEnvironment env) {
>                 // load properties
>                 Properties pro = new Properties();
>                 try {
>                     pro.load(new
>         FileInputStream("./resources/config.properties"));
>                 } catch (Exception e) {
>                     e.printStackTrace();
>                 }
>                 String inputFile = pro.getProperty("input");
>                 // map csv file
>                 return env.readCsvFile(inputFile)
>                     .ignoreInvalidLines()
>                     .fieldDelimiter('\u0009')
>                     //.fieldDelimiter("\t")
>                     //.lineDelimiter("\n")
>                     .includeFields(true, true, false, false, false,
>         false, false, false, false, false, false
>                             , false, false, false, false, false,
>         false, false, false, false, false
>                             , false, false, false, false, false,
>         false, false, false, false, false
>                             , false, false, false, false, false,
>         false, false, false, true, true
>                             , false, false, false, false, false,
>         false, false, false, false, false
>                             , false, false, false, false, false,
>         false, false, false)
>         //.includeFields(true,true,true,true)
>                     .types(String.class, Long.class, Double.class,
>         Double.class)
>                     .map(new TuplePointConverter());
>             }
>
>             private static final class TuplePointConverter implements
>         MapFunction<Tuple4<String, Long, Double, Double>,
>         GeoTimeDataTupel>{
>
>                 private static final long serialVersionUID =
>         3485560278562719538L;
>
>                 public GeoTimeDataTupel map(Tuple4<String, Long,
>         Double, Double> t) throws Exception {
>                     return new GeoTimeDataTupel(new
>         LatLongSeriable(t.f2, t.f3), t.f1, t.f0);
>                 }
>             }
>
>             private static DataSet<GeoTimeDataCenter>
>         getCentroidDataSet(ExecutionEnvironment env) {
>                 // load properties
>                 Properties pro = new Properties();
>                 try {
>                     pro.load(new
>         FileInputStream("./resources/config.properties"));
>                 } catch (Exception e) {
>                     e.printStackTrace();
>                 }
>                 String seedFile = pro.getProperty("seed.file");
>                 boolean seedFlag =
>         Boolean.parseBoolean(pro.getProperty("seed.flag"));
>                 // get points from file or random
>                 if(seedFlag || !(new File(seedFile+"-1").exists())) {
>                     Seeding.randomSeeding();
>                 }
>                 // map csv file
>                 return env.readCsvFile(seedFile+"-1")
>                     .lineDelimiter("\n")
>                     .fieldDelimiter('\u0009')
>                     //.fieldDelimiter("\t")
>                     .includeFields(true, true, true, true)
>                     .types(Integer.class, Double.class, Double.class,
>         Long.class)
>                     .map(new TupleCentroidConverter());
>             }
>
>             private static final class TupleCentroidConverter
>         implements MapFunction<Tuple4<Integer, Double, Double, Long>,
>         GeoTimeDataCenter>{
>
>                 private static final long serialVersionUID =
>         -1046538744363026794L;
>
>                 public GeoTimeDataCenter map(Tuple4<Integer, Double,
>         Double, Long> t) throws Exception {
>                     return new GeoTimeDataCenter(t.f0,new
>         LatLongSeriable(t.f1, t.f2), t.f3);
>                 }
>             }
>         }
>
>         2015-05-21 14:22 GMT+02:00 Till Rohrmann <trohrmann@apache.org
>         <mailto:trohrmann@apache.org>>:
>
>             Concerning your first problem that you only see one
>             resulting centroid, your code looks good modulo the parts
>             you haven't posted.
>
>             However, your problem could simply be caused by a bad
>             selection of initial centroids. If, for example, all
>             centroids except for one don't get any points assigned,
>             then only one centroid will survive the iteration step.
>             How do you do it?
>
>             To check that all centroids are read you can print the
>             contents of the centroids DataSet. Furthermore, you can
>             simply println the new centroids after each iteration
>             step. In local mode you can then observe the computation.
>
>             Cheers,
>             Till
>
>             On Thu, May 21, 2015 at 12:23 PM, Stephan Ewen
>             <sewen@apache.org <mailto:sewen@apache.org>> wrote:
>
>                 Hi!
>
>                 This problem should not depend on any user code. There
>                 are no user-code dependent actors in Flink.
>
>                 Is there more stack trace that you can send us? It
>                 looks like it misses the core exception that is
>                 causing the issue is not part of the stack trace.
>
>                 Greetings,
>                 Stephan
>
>
>
>                 On Thu, May 21, 2015 at 11:11 AM, Pa Rö
>                 <paul.roewer1990@googlemail.com
>                 <mailto:paul.roewer1990@googlemail.com>> wrote:
>
>                     hi flink community,
>
>                     i have implement k-means for clustering temporal
>                     geo data. i use the following github project and
>                     my own data structure:
>                     https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
>
>                     not i have the problem, that flink read the
>                     centroids from file and work parallel futher. if i
>                     look at the results, i have the feeling, that the
>                     prgramm load only one centroid point.
>
>                     i work with flink 0.8.1, if i update to 0.9
>                     milestone 1 i get the following exception:
>                     ERROR actor.OneForOneStrategy: exception during
>                     creation
>                     akka.actor.ActorInitializationException: exception
>                     during creation
>                         at
>                     akka.actor.ActorInitializationException$.apply(Actor.scala:218)
>                         at
>                     akka.actor.ActorCell.create(ActorCell.scala:578)
>                         at
>                     akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
>                         at
>                     akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
>                         at
>                     akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>                         at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>                         at
>                     akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>                         at
>                     scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>                         at
>                     scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>                         at
>                     scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>                         at
>                     scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>                     Caused by: java.lang.reflect.InvocationTargetException
>                         at
>                     sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>                     Method)
>                         at
>                     sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>                         at
>                     sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>                         at
>                     java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>                         at
>                     akka.util.Reflect$.instantiate(Reflect.scala:65)
>                         at akka.actor.Props.newActor(Props.scala:337)
>                         at
>                     akka.actor.ActorCell.newActor(ActorCell.scala:534)
>                         at
>                     akka.actor.ActorCell.create(ActorCell.scala:560)
>                         ... 9 more
>
>                     how can i say flink, that it should be wait for
>                     loading dataset, and what say this exception?
>
>                     best regards,
>                     paul
>
>
>
>
>
>


Mime
View raw message