flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pa Rö <paul.roewer1...@googlemail.com>
Subject Re: k means - waiting for dataset
Date Thu, 21 May 2015 09:33:55 GMT
hi,
the exception came with version 0.9.
with version 0.8.1 came no exception, but the results are foobar.

here my main:

public static void main(String[] args) {
        //load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        int maxIteration =
2;//Integer.parseInt(pro.getProperty("maxiterations"));
        String outputPath = pro.getProperty("flink.output");
        // set up execution environment
        ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
        // get input points
        DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
        DataSet<GeoTimeDataCenter> centroids = getCentroidDataSet(env);
        // set number of bulk iterations for KMeans algorithm
        IterativeDataSet<GeoTimeDataCenter> loop =
centroids.iterate(maxIteration);

        DataSet<GeoTimeDataCenter> newCentroids = points
            // compute closest centroid for each point
            .map(new SelectNearestCenter()).withBroadcastSet(loop,
"centroids")
            // count and sum point coordinates for each centroid
            .groupBy(0).reduce(new CentroidAccumulator())
            // compute new centroids from point counts and coordinate sums
            .map(new CentroidAverager());
        // feed new centroids back into next iteration
        DataSet<GeoTimeDataCenter> finalCentroids =
loop.closeWith(newCentroids);
        DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
            // assign points to final clusters
            .map(new
SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
        // emit result
        clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
        finalCentroids.writeAsText(outputPath+"/centers");//print();
        // execute program
        try {
            env.execute("KMeans Flink");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

2015-05-21 11:28 GMT+02:00 Till Rohrmann <trohrmann@apache.org>:

> Hi Paul,
>
> could you share your code with us so that we see whether there is any
> error.
>
> Does this error also occurs with 0.9-SNAPSHOT?
>
> Cheers,
> Till
>
> Che
>
> On Thu, May 21, 2015 at 11:11 AM, Pa Rö <paul.roewer1990@googlemail.com>
> wrote:
>
>> hi flink community,
>>
>> i have implement k-means for clustering temporal geo data. i use the
>> following github project and my own data structure:
>>
>> https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
>>
>> not i have the problem, that flink read the centroids from file and work
>> parallel futher. if i look at the results, i have the feeling, that the
>> prgramm load only one centroid point.
>>
>> i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the
>> following exception:
>> ERROR actor.OneForOneStrategy: exception during creation
>> akka.actor.ActorInitializationException: exception during creation
>>     at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
>>     at akka.actor.ActorCell.create(ActorCell.scala:578)
>>     at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
>>     at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
>>     at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>>     at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>>     at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>     at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>     at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>     at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>     at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: java.lang.reflect.InvocationTargetException
>>     at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>>     at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>>     at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>     at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>>     at akka.util.Reflect$.instantiate(Reflect.scala:65)
>>     at akka.actor.Props.newActor(Props.scala:337)
>>     at akka.actor.ActorCell.newActor(ActorCell.scala:534)
>>     at akka.actor.ActorCell.create(ActorCell.scala:560)
>>     ... 9 more
>>
>> how can i say flink, that it should be wait for loading dataset, and what
>> say this exception?
>>
>> best regards,
>> paul
>>
>
>

Mime
View raw message