flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: k means - waiting for dataset
Date Fri, 22 May 2015 14:52:35 GMT
Sorry, I don't understand the question.

Can you describe a bit better what you mean with "how i can sum all points
and share thoug the counter" ?

Thanks!

On Fri, May 22, 2015 at 2:06 PM, Pa Rö <paul.roewer1990@googlemail.com>
wrote:

> i have fix a bug at the input reading, but the results are still different.
>
> i think i have local the problem, in the other implementation i sum all
> geo points/time points and share thougt the counter.
> but in flink i sum two points and share thougt two, and sum the next...
>
> the method is the following:
>
> // sums and counts point coordinates
>     private static final class CentroidAccumulator implements
> ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {
>
>         private static final long serialVersionUID = -4868797820391121771L;
>
>         public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer,
> GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) {
>             return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0,
> addAndDiv(val1.f0,val1.f1,val2.f1));
>         }
>     }
>
>     private static GeoTimeDataTupel addAndDiv(int
> clusterid,GeoTimeDataTupel input1, GeoTimeDataTupel input2){
>         long time = (input1.getTime()+input2.getTime())/2;
>         List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
>         list.add(input1.getGeo());
>         list.add(input2.getGeo());
>         LatLongSeriable geo = Geometry.getGeoCenterOf(list);
>
>         return new GeoTimeDataTupel(geo,time,"POINT");
>     }
>
> how i can sum all points and share thoug the counter?
>
>
> 2015-05-22 9:53 GMT+02:00 Pa Rö <paul.roewer1990@googlemail.com>:
>
>> hi,
>> if i print the centroids all are show in the output. i have implement k
>> means with map reduce und spark. by same input, i get the same output. but
>> in flink i get a one cluster output with this input set. (i use csv files
>> from the GDELT projekt)
>>
>> here my class:
>>
>> public class FlinkMain {
>>
>>
>>     public static void main(String[] args) {
>>         //load properties
>>         Properties pro = new Properties();
>>         try {
>>             pro.load(new
>> FileInputStream("./resources/config.properties"));
>>         } catch (Exception e) {
>>             e.printStackTrace();
>>         }
>>         int maxIteration =
>> 1;//Integer.parseInt(pro.getProperty("maxiterations"));
>>         String outputPath = pro.getProperty("flink.output");
>>         // set up execution environment
>>         ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>>         // get input points
>>         DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
>>         DataSet<GeoTimeDataCenter> centroids = getCentroidDataSet(env);
>>         // set number of bulk iterations for KMeans algorithm
>>         IterativeDataSet<GeoTimeDataCenter> loop =
>> centroids.iterate(maxIteration);
>>         DataSet<GeoTimeDataCenter> newCentroids = points
>>             // compute closest centroid for each point
>>             .map(new SelectNearestCenter()).withBroadcastSet(loop,
>> "centroids")
>>             // count and sum point coordinates for each centroid
>>             .groupBy(0).reduce(new CentroidAccumulator())
>>             // compute new centroids from point counts and coordinate sums
>>             .map(new CentroidAverager());
>>         // feed new centroids back into next iteration
>>         DataSet<GeoTimeDataCenter> finalCentroids =
>> loop.closeWith(newCentroids);
>>         DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints =
>> points
>>             // assign points to final clusters
>>             .map(new
>> SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
>>         // emit result
>>         clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
>>         finalCentroids.writeAsText(outputPath+"/centers");//print();
>>         // execute program
>>         try {
>>             env.execute("KMeans Flink");
>>         } catch (Exception e) {
>>             e.printStackTrace();
>>         }
>>     }
>>
>>     private static final class SelectNearestCenter extends
>> RichMapFunction<GeoTimeDataTupel,Tuple2<Integer,GeoTimeDataTupel>> {
>>
>>         private static final long serialVersionUID =
>> -2729445046389350264L;
>>         private Collection<GeoTimeDataCenter> centroids;
>>
>>         @Override
>>         public void open(Configuration parameters) throws Exception {
>>             this.centroids =
>> getRuntimeContext().getBroadcastVariable("centroids");
>>         }
>>
>>         @Override
>>         public Tuple2<Integer, GeoTimeDataTupel> map(GeoTimeDataTupel
>> point) throws Exception {
>>             double minDistance = Double.MAX_VALUE;
>>             int closestCentroidId= -1;
>>
>>             // check all cluster centers
>>             for(GeoTimeDataCenter centroid : centroids) {
>>                 // compute distance
>>                 double distance = Distance.ComputeDist(point, centroid);
>>                 // update nearest cluster if necessary
>>                 if(distance < minDistance) {
>>                     minDistance = distance;
>>                     closestCentroidId = centroid.getId();
>>                 }
>>             }
>>             // emit a new record with the center id and the data point
>>             return new Tuple2<Integer,
>> GeoTimeDataTupel>(closestCentroidId, point);
>>         }
>>     }
>>
>>     // sums and counts point coordinates
>>     private static final class CentroidAccumulator implements
>> ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {
>>
>>         private static final long serialVersionUID =
>> -4868797820391121771L;
>>
>>         public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer,
>> GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) {
>>             return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0,
>> addAndDiv(val1.f1,val2.f1));
>>         }
>>     }
>>
>>     private static GeoTimeDataTupel addAndDiv(GeoTimeDataTupel input1,
>> GeoTimeDataTupel input2){
>>         long time = (input1.getTime()+input2.getTime())/2;
>>         List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
>>         list.add(input1.getGeo());
>>         list.add(input2.getGeo());
>>         LatLongSeriable geo = Geometry.getGeoCenterOf(list);
>>
>>         return new GeoTimeDataTupel(geo,time,"POINT");
>>     }
>>
>>     // computes new centroid from coordinate sum and count of points
>>     private static final class CentroidAverager implements
>> MapFunction<Tuple2<Integer, GeoTimeDataTupel>, GeoTimeDataCenter> {
>>
>>         private static final long serialVersionUID =
>> -2687234478847261803L;
>>
>>         public GeoTimeDataCenter map(Tuple2<Integer, GeoTimeDataTupel>
>> value) {
>>             return new GeoTimeDataCenter(value.f0,
>> value.f1.getGeo(),value.f1.getTime());
>>         }
>>     }
>>
>>     private static DataSet<GeoTimeDataTupel>
>> getPointDataSet(ExecutionEnvironment env) {
>>         // load properties
>>         Properties pro = new Properties();
>>         try {
>>             pro.load(new
>> FileInputStream("./resources/config.properties"));
>>         } catch (Exception e) {
>>             e.printStackTrace();
>>         }
>>         String inputFile = pro.getProperty("input");
>>         // map csv file
>>         return env.readCsvFile(inputFile)
>>             .ignoreInvalidLines()
>>             .fieldDelimiter('\u0009')
>>             //.fieldDelimiter("\t")
>>             //.lineDelimiter("\n")
>>             .includeFields(true, true, false, false, false, false, false,
>> false, false, false, false
>>                     , false, false, false, false, false, false, false,
>> false, false, false
>>                     , false, false, false, false, false, false, false,
>> false, false, false
>>                     , false, false, false, false, false, false, false,
>> false, true, true
>>                     , false, false, false, false, false, false, false,
>> false, false, false
>>                     , false, false, false, false, false, false, false,
>> false)
>>             //.includeFields(true,true,true,true)
>>             .types(String.class, Long.class, Double.class, Double.class)
>>             .map(new TuplePointConverter());
>>     }
>>
>>     private static final class TuplePointConverter implements
>> MapFunction<Tuple4<String, Long, Double, Double>, GeoTimeDataTupel>{
>>
>>         private static final long serialVersionUID = 3485560278562719538L;
>>
>>         public GeoTimeDataTupel map(Tuple4<String, Long, Double, Double>
>> t) throws Exception {
>>             return new GeoTimeDataTupel(new LatLongSeriable(t.f2, t.f3),
>> t.f1, t.f0);
>>         }
>>     }
>>
>>     private static DataSet<GeoTimeDataCenter>
>> getCentroidDataSet(ExecutionEnvironment env) {
>>         // load properties
>>         Properties pro = new Properties();
>>         try {
>>             pro.load(new
>> FileInputStream("./resources/config.properties"));
>>         } catch (Exception e) {
>>             e.printStackTrace();
>>         }
>>         String seedFile = pro.getProperty("seed.file");
>>         boolean seedFlag =
>> Boolean.parseBoolean(pro.getProperty("seed.flag"));
>>         // get points from file or random
>>         if(seedFlag || !(new File(seedFile+"-1").exists())) {
>>             Seeding.randomSeeding();
>>         }
>>         // map csv file
>>         return env.readCsvFile(seedFile+"-1")
>>             .lineDelimiter("\n")
>>             .fieldDelimiter('\u0009')
>>             //.fieldDelimiter("\t")
>>             .includeFields(true, true, true, true)
>>             .types(Integer.class, Double.class, Double.class, Long.class)
>>             .map(new TupleCentroidConverter());
>>     }
>>
>>     private static final class TupleCentroidConverter implements
>> MapFunction<Tuple4<Integer, Double, Double, Long>, GeoTimeDataCenter>{
>>
>>         private static final long serialVersionUID =
>> -1046538744363026794L;
>>
>>         public GeoTimeDataCenter map(Tuple4<Integer, Double, Double,
>> Long> t) throws Exception {
>>             return new GeoTimeDataCenter(t.f0,new LatLongSeriable(t.f1,
>> t.f2), t.f3);
>>         }
>>     }
>> }
>>
>> 2015-05-21 14:22 GMT+02:00 Till Rohrmann <trohrmann@apache.org>:
>>
>>> Concerning your first problem that you only see one resulting centroid,
>>> your code looks good modulo the parts you haven't posted.
>>>
>>> However, your problem could simply be caused by a bad selection of
>>> initial centroids. If, for example, all centroids except for one don't get
>>> any points assigned, then only one centroid will survive the iteration
>>> step. How do you do it?
>>>
>>> To check that all centroids are read you can print the contents of the
>>> centroids DataSet. Furthermore, you can simply println the new centroids
>>> after each iteration step. In local mode you can then observe the
>>> computation.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, May 21, 2015 at 12:23 PM, Stephan Ewen <sewen@apache.org> wrote:
>>>
>>>> Hi!
>>>>
>>>> This problem should not depend on any user code. There are no user-code
>>>> dependent actors in Flink.
>>>>
>>>> Is there more stack trace that you can send us? It looks like it misses
>>>> the core exception that is causing the issue is not part of the stack trace.
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>>
>>>> On Thu, May 21, 2015 at 11:11 AM, Pa Rö <paul.roewer1990@googlemail.com
>>>> > wrote:
>>>>
>>>>> hi flink community,
>>>>>
>>>>> i have implement k-means for clustering temporal geo data. i use the
>>>>> following github project and my own data structure:
>>>>>
>>>>> https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
>>>>>
>>>>> not i have the problem, that flink read the centroids from file and
>>>>> work parallel futher. if i look at the results, i have the feeling, that
>>>>> the prgramm load only one centroid point.
>>>>>
>>>>> i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the
>>>>> following exception:
>>>>> ERROR actor.OneForOneStrategy: exception during creation
>>>>> akka.actor.ActorInitializationException: exception during creation
>>>>>     at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
>>>>>     at akka.actor.ActorCell.create(ActorCell.scala:578)
>>>>>     at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
>>>>>     at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
>>>>>     at
>>>>> akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>>>>>     at
>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>>>>     at
>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>     at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>     at
>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>     at
>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>> Caused by: java.lang.reflect.InvocationTargetException
>>>>>     at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>>>> Method)
>>>>>     at
>>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>>>>>     at
>>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>>     at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>>>>>     at akka.util.Reflect$.instantiate(Reflect.scala:65)
>>>>>     at akka.actor.Props.newActor(Props.scala:337)
>>>>>     at akka.actor.ActorCell.newActor(ActorCell.scala:534)
>>>>>     at akka.actor.ActorCell.create(ActorCell.scala:560)
>>>>>     ... 9 more
>>>>>
>>>>> how can i say flink, that it should be wait for loading dataset, and
>>>>> what say this exception?
>>>>>
>>>>> best regards,
>>>>> paul
>>>>>
>>>>
>>>>
>>>
>>
>

Mime
View raw message