From Fabian Hueske <fhue...@gmail.com>
Subject Re: k means - waiting for dataset
Date Fri, 22 May 2015 21:29:22 GMT
```There are two ways to do that:

1) You use a GroupReduceFunction, which gives you an iterator over all
2) You use the ReduceFunction to compute the sum and the count at the same
time (e.g., in two fields of a Tuple2) and use a MapFunction to do the
final division.

I'd go with the first choice. It's easier.

Best, Fabian

2015-05-22 23:09 GMT+02:00 Paul Röwer <paul.roewer1990@googlemail.com>:

>  good evening,
>
> sorry, my english is not the best.
>
> by comupte the new centroid, i will sum all points of the cluster and form
> the new center.
> in my other implementation firstly i sum all point and at the end i
> divides by number of points.
> to example: (1+2+3+4)/4=2,5
>
> at flink i reduce always two point to one,
> for the example upstairs: (1+2)/2=1,5 --> (1,5+3)/2=2,25 --> (2,25+4)=3,125
>
> how can i rewrite my function so, that it work like my other
> implementation?
>
> best regards,
> paul
>
>
>
> Am 22.05.2015 um 16:52 schrieb Stephan Ewen:
>
> Sorry, I don't understand the question.
>
>  Can you describe a bit better what you mean with "how i can sum all
> points and share thoug the counter" ?
>
>  Thanks!
>
> On Fri, May 22, 2015 at 2:06 PM, Pa Rö <paul.roewer1990@googlemail.com>
> wrote:
>
>>   i have fix a bug at the input reading, but the results are still
>> different.
>>
>> i think i have local the problem, in the other implementation i sum all
>> geo points/time points and share thougt the counter.
>>  but in flink i sum two points and share thougt two, and sum the next...
>>
>>  the method is the following:
>>
>> // sums and counts point coordinates
>>     private static final class CentroidAccumulator implements
>> ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {
>>
>>         private static final long serialVersionUID =
>> -4868797820391121771L;
>>
>>         public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer,
>> GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) {
>>             return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0,
>>         }
>>     }
>>
>> clusterid,GeoTimeDataTupel input1, GeoTimeDataTupel input2){
>>         long time = (input1.getTime()+input2.getTime())/2;
>>         List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
>>         LatLongSeriable geo = Geometry.getGeoCenterOf(list);
>>
>>         return new GeoTimeDataTupel(geo,time,"POINT");
>>     }
>>
>>  how i can sum all points and share thoug the counter?
>>
>>
>> 2015-05-22 9:53 GMT+02:00 Pa Rö <paul.roewer1990@googlemail.com>:
>>
>>>  hi,
>>>  if i print the centroids all are show in the output. i have implement k
>>> means with map reduce und spark. by same input, i get the same output. but
>>> in flink i get a one cluster output with this input set. (i use csv files
>>> from the GDELT projekt)
>>>
>>>  here my class:
>>>
>>>
>>>
>>>     public static void main(String[] args) {
>>>         Properties pro = new Properties();
>>>         try {
>>> FileInputStream("./resources/config.properties"));
>>>         } catch (Exception e) {
>>>             e.printStackTrace();
>>>         }
>>>         int maxIteration =
>>> 1;//Integer.parseInt(pro.getProperty("maxiterations"));
>>>         // set up execution environment
>>>         ExecutionEnvironment env =
>>> ExecutionEnvironment.getExecutionEnvironment();
>>>         // get input points
>>>         DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
>>>         DataSet<GeoTimeDataCenter> centroids = getCentroidDataSet(env);
>>>         // set number of bulk iterations for KMeans algorithm
>>>         IterativeDataSet<GeoTimeDataCenter> loop =
>>> centroids.iterate(maxIteration);
>>>         DataSet<GeoTimeDataCenter> newCentroids = points
>>>             // compute closest centroid for each point
>>> "centroids")
>>>             // count and sum point coordinates for each centroid
>>>             .groupBy(0).reduce(new CentroidAccumulator())
>>>             // compute new centroids from point counts and coordinate
>>> sums
>>>             .map(new CentroidAverager());
>>>         // feed new centroids back into next iteration
>>>         DataSet<GeoTimeDataCenter> finalCentroids =
>>> loop.closeWith(newCentroids);
>>>         DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints
=
>>> points
>>>             // assign points to final clusters
>>>             .map(new
>>>         // emit result
>>>         clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
>>>         finalCentroids.writeAsText(outputPath+"/centers");//print();
>>>         // execute program
>>>         try {
>>>         } catch (Exception e) {
>>>             e.printStackTrace();
>>>         }
>>>     }
>>>
>>>      private static final class SelectNearestCenter extends
>>> RichMapFunction<GeoTimeDataTupel,Tuple2<Integer,GeoTimeDataTupel>>
{
>>>
>>>         private static final long serialVersionUID =
>>> -2729445046389350264L;
>>>         private Collection<GeoTimeDataCenter> centroids;
>>>
>>>         @Override
>>>         public void open(Configuration parameters) throws Exception {
>>>             this.centroids =
>>>         }
>>>
>>>         @Override
>>>         public Tuple2<Integer, GeoTimeDataTupel> map(GeoTimeDataTupel
>>> point) throws Exception {
>>>             double minDistance = Double.MAX_VALUE;
>>>             int closestCentroidId= -1;
>>>
>>>             // check all cluster centers
>>>             for(GeoTimeDataCenter centroid : centroids) {
>>>                 // compute distance
>>>                 double distance = Distance.ComputeDist(point, centroid);
>>>                 // update nearest cluster if necessary
>>>                 if(distance < minDistance) {
>>>                     minDistance = distance;
>>>                     closestCentroidId = centroid.getId();
>>>                 }
>>>             }
>>>             // emit a new record with the center id and the data point
>>>             return new Tuple2<Integer,
>>> GeoTimeDataTupel>(closestCentroidId, point);
>>>         }
>>>     }
>>>
>>>     // sums and counts point coordinates
>>>     private static final class CentroidAccumulator implements
>>> ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {
>>>
>>>         private static final long serialVersionUID =
>>> -4868797820391121771L;
>>>
>>>         public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer,
>>> GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) {
>>>             return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0,
>>>         }
>>>     }
>>>
>>>     private static GeoTimeDataTupel addAndDiv(GeoTimeDataTupel input1,
>>> GeoTimeDataTupel input2){
>>>         long time = (input1.getTime()+input2.getTime())/2;
>>>         List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
>>>         LatLongSeriable geo = Geometry.getGeoCenterOf(list);
>>>
>>>         return new GeoTimeDataTupel(geo,time,"POINT");
>>>     }
>>>
>>>     // computes new centroid from coordinate sum and count of points
>>>     private static final class CentroidAverager implements
>>> MapFunction<Tuple2<Integer, GeoTimeDataTupel>, GeoTimeDataCenter>
{
>>>
>>>         private static final long serialVersionUID =
>>> -2687234478847261803L;
>>>
>>>         public GeoTimeDataCenter map(Tuple2<Integer, GeoTimeDataTupel>
>>> value) {
>>>             return new GeoTimeDataCenter(value.f0,
>>> value.f1.getGeo(),value.f1.getTime());
>>>         }
>>>     }
>>>
>>>     private static DataSet<GeoTimeDataTupel>
>>> getPointDataSet(ExecutionEnvironment env) {
>>>         Properties pro = new Properties();
>>>         try {
>>> FileInputStream("./resources/config.properties"));
>>>         } catch (Exception e) {
>>>             e.printStackTrace();
>>>         }
>>>         String inputFile = pro.getProperty("input");
>>>         // map csv file
>>>             .ignoreInvalidLines()
>>>             .fieldDelimiter('\u0009')
>>>             //.fieldDelimiter("\t")
>>>             //.lineDelimiter("\n")
>>>             .includeFields(true, true, false, false, false, false,
>>> false, false, false, false, false
>>>                     , false, false, false, false, false, false, false,
>>> false, false, false
>>>                     , false, false, false, false, false, false, false,
>>> false, false, false
>>>                     , false, false, false, false, false, false, false,
>>> false, true, true
>>>                     , false, false, false, false, false, false, false,
>>> false, false, false
>>>                     , false, false, false, false, false, false, false,
>>> false)
>>>             //.includeFields(true,true,true,true)
>>>             .types(String.class, Long.class, Double.class, Double.class)
>>>             .map(new TuplePointConverter());
>>>     }
>>>
>>>     private static final class TuplePointConverter implements
>>> MapFunction<Tuple4<String, Long, Double, Double>, GeoTimeDataTupel>{
>>>
>>>         private static final long serialVersionUID =
>>> 3485560278562719538L;
>>>
>>>         public GeoTimeDataTupel map(Tuple4<String, Long, Double, Double>
>>> t) throws Exception {
>>>             return new GeoTimeDataTupel(new LatLongSeriable(t.f2, t.f3),
>>> t.f1, t.f0);
>>>         }
>>>     }
>>>
>>>     private static DataSet<GeoTimeDataCenter>
>>> getCentroidDataSet(ExecutionEnvironment env) {
>>>         Properties pro = new Properties();
>>>         try {
>>> FileInputStream("./resources/config.properties"));
>>>         } catch (Exception e) {
>>>             e.printStackTrace();
>>>         }
>>>         String seedFile = pro.getProperty("seed.file");
>>>         boolean seedFlag =
>>> Boolean.parseBoolean(pro.getProperty("seed.flag"));
>>>         // get points from file or random
>>>         if(seedFlag || !(new File(seedFile+"-1").exists())) {
>>>             Seeding.randomSeeding();
>>>         }
>>>         // map csv file
>>>             .lineDelimiter("\n")
>>>             .fieldDelimiter('\u0009')
>>>             //.fieldDelimiter("\t")
>>>             .includeFields(true, true, true, true)
>>>             .types(Integer.class, Double.class, Double.class, Long.class)
>>>             .map(new TupleCentroidConverter());
>>>     }
>>>
>>>     private static final class TupleCentroidConverter implements
>>> MapFunction<Tuple4<Integer, Double, Double, Long>, GeoTimeDataCenter>{
>>>
>>>         private static final long serialVersionUID =
>>> -1046538744363026794L;
>>>
>>>         public GeoTimeDataCenter map(Tuple4<Integer, Double, Double,
>>> Long> t) throws Exception {
>>>             return new GeoTimeDataCenter(t.f0,new LatLongSeriable(t.f1,
>>> t.f2), t.f3);
>>>         }
>>>     }
>>> }
>>>
>>> 2015-05-21 14:22 GMT+02:00 Till Rohrmann <trohrmann@apache.org>:
>>>
>>>> Concerning your first problem that you only see one resulting centroid,
>>>> your code looks good modulo the parts you haven't posted.
>>>>
>>>>  However, your problem could simply be caused by a bad selection of
>>>> initial centroids. If, for example, all centroids except for one don't get
>>>> any points assigned, then only one centroid will survive the iteration
>>>> step. How do you do it?
>>>>
>>>>  To check that all centroids are read you can print the contents of
>>>> the centroids DataSet. Furthermore, you can simply println the new
>>>> centroids after each iteration step. In local mode you can then observe the
>>>> computation.
>>>>
>>>>  Cheers,
>>>> Till
>>>>
>>>> On Thu, May 21, 2015 at 12:23 PM, Stephan Ewen <sewen@apache.org>
>>>> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>>  This problem should not depend on any user code. There are no
>>>>> user-code dependent actors in Flink.
>>>>>
>>>>>  Is there more stack trace that you can send us? It looks like it
>>>>> misses the core exception that is causing the issue is not part of the
>>>>> stack trace.
>>>>>
>>>>>  Greetings,
>>>>> Stephan
>>>>>
>>>>>
>>>>>
>>>>> On Thu, May 21, 2015 at 11:11 AM, Pa Rö <
>>>>>
>>>>>>
>>>>>>  i have implement k-means for clustering temporal geo data. i use
the
>>>>>> following github project and my own data structure:
>>>>>>
>>>>>>
>>>>>>  not i have the problem, that flink read the centroids from file
and
>>>>>> work parallel futher. if i look at the results, i have the feeling,
that
>>>>>> the prgramm load only one centroid point.
>>>>>>
>>>>>>  i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the
>>>>>> following exception:
>>>>>> ERROR actor.OneForOneStrategy: exception during creation
>>>>>> akka.actor.ActorInitializationException: exception during creation
>>>>>>     at akka.actor.ActorInitializationException\$.apply(Actor.scala:218)
>>>>>>     at akka.actor.ActorCell.create(ActorCell.scala:578)
>>>>>>     at akka.actor.ActorCell.invokeAll\$1(ActorCell.scala:425)
>>>>>>     at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
>>>>>>     at
>>>>>> akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>>>>>>     at
>>>>>>     at
>>>>>>     at
>>>>>>     at
>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>     at
>>>>>> Caused by: java.lang.reflect.InvocationTargetException
>>>>>>     at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>>>>> Method)
>>>>>>     at
>>>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>>>>>>     at
>>>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>>>     at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>>>>>>     at akka.util.Reflect\$.instantiate(Reflect.scala:65)
>>>>>>     at akka.actor.Props.newActor(Props.scala:337)
>>>>>>     at akka.actor.ActorCell.newActor(ActorCell.scala:534)
>>>>>>     at akka.actor.ActorCell.create(ActorCell.scala:560)
>>>>>>     ... 9 more
>>>>>>
and
>>>>>> what say this exception?
>>>>>>
>>>>>>  best regards,
>>>>>>  paul
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>
>

```
