Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5CE2118709 for ; Fri, 22 May 2015 21:30:20 +0000 (UTC) Received: (qmail 71449 invoked by uid 500); 22 May 2015 21:30:20 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 71384 invoked by uid 500); 22 May 2015 21:30:20 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 71369 invoked by uid 99); 22 May 2015 21:30:20 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 May 2015 21:30:19 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 7C4F1C6B89 for ; Fri, 22 May 2015 21:30:19 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.9 X-Spam-Level: ** X-Spam-Status: No, score=2.9 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=3, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id z-xXPWKM_Mzv for ; Fri, 22 May 2015 21:30:10 +0000 (UTC) Received: from mail-lb0-f178.google.com (mail-lb0-f178.google.com [209.85.217.178]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id 8E6BC206B2 for ; Fri, 22 May 2015 21:30:09 +0000 (UTC) Received: by lbcmx3 with SMTP id mx3so21051163lbc.1 for ; Fri, 22 May 2015 14:29:22 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=4lJPM9JMVMPGpQ3HjLCbP66t5J3W3e+Ei2A0/tkxZX8=; b=Dg2FLjmubrEzqbEjlOZU++L+B9dryKbR0FTyL4TfeQ53+3i8apjWKuEn2lRK1ayrds eA7N8EQca/R1w1MqxEZCaZhVCLRzZ3Mop7Mr8ZKLkIKDG/GF5NBbBNAVUIBemCnaxjhw 7+tauw4cOmHKA1LlR7TMmtCdOXKL+kbdDrATsxpar6mWPKesQLdoVJbh1dlVcCtkIWj9 soGazue3C7yCtPpH61QanCESyTaBTIMCmkzn3cXA6wE/zMAzsWVUodo2q2LSbn5NFGa3 nwJF/jxJvYUS4a5+/WYcmfI4dvRMJq6CKPXi5wGHop50rx371Q04yq9GZIoeMd7tXml/ OSww== MIME-Version: 1.0 X-Received: by 10.152.7.65 with SMTP id h1mr8041248laa.33.1432330162800; Fri, 22 May 2015 14:29:22 -0700 (PDT) Received: by 10.152.225.171 with HTTP; Fri, 22 May 2015 14:29:22 -0700 (PDT) In-Reply-To: <555F9AF6.3010203@googlemail.com> References: <555F9AF6.3010203@googlemail.com> Date: Fri, 22 May 2015 23:29:22 +0200 Message-ID: Subject: Re: k means - waiting for dataset From: Fabian Hueske To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a11c27cf08e0f7f0516b25a2a --001a11c27cf08e0f7f0516b25a2a Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable There are two ways to do that: 1) You use a GroupReduceFunction, which gives you an iterator over all points similar to Hadoop's ReduceFunction. 2) You use the ReduceFunction to compute the sum and the count at the same time (e.g., in two fields of a Tuple2) and use a MapFunction to do the final division. I'd go with the first choice. It's easier. Best, Fabian 2015-05-22 23:09 GMT+02:00 Paul R=C3=B6wer = : > good evening, > > sorry, my english is not the best. > > by comupte the new centroid, i will sum all points of the cluster and for= m > the new center. > in my other implementation firstly i sum all point and at the end i > divides by number of points. > to example: (1+2+3+4)/4=3D2,5 > > at flink i reduce always two point to one, > for the example upstairs: (1+2)/2=3D1,5 --> (1,5+3)/2=3D2,25 --> (2,25+4)= =3D3,125 > > how can i rewrite my function so, that it work like my other > implementation? > > best regards, > paul > > > > Am 22.05.2015 um 16:52 schrieb Stephan Ewen: > > Sorry, I don't understand the question. > > Can you describe a bit better what you mean with "how i can sum all > points and share thoug the counter" ? > > Thanks! > > On Fri, May 22, 2015 at 2:06 PM, Pa R=C3=B6 > wrote: > >> i have fix a bug at the input reading, but the results are still >> different. >> >> i think i have local the problem, in the other implementation i sum all >> geo points/time points and share thougt the counter. >> but in flink i sum two points and share thougt two, and sum the next... >> >> the method is the following: >> >> // sums and counts point coordinates >> private static final class CentroidAccumulator implements >> ReduceFunction> { >> >> private static final long serialVersionUID =3D >> -4868797820391121771L; >> >> public Tuple2 reduce(Tuple2> GeoTimeDataTupel> val1, Tuple2 val2) { >> return new Tuple2(val1.f0, >> addAndDiv(val1.f0,val1.f1,val2.f1)); >> } >> } >> >> private static GeoTimeDataTupel addAndDiv(int >> clusterid,GeoTimeDataTupel input1, GeoTimeDataTupel input2){ >> long time =3D (input1.getTime()+input2.getTime())/2; >> List list =3D new ArrayList(); >> list.add(input1.getGeo()); >> list.add(input2.getGeo()); >> LatLongSeriable geo =3D Geometry.getGeoCenterOf(list); >> >> return new GeoTimeDataTupel(geo,time,"POINT"); >> } >> >> how i can sum all points and share thoug the counter? >> >> >> 2015-05-22 9:53 GMT+02:00 Pa R=C3=B6 : >> >>> hi, >>> if i print the centroids all are show in the output. i have implement = k >>> means with map reduce und spark. by same input, i get the same output. = but >>> in flink i get a one cluster output with this input set. (i use csv fil= es >>> from the GDELT projekt) >>> >>> here my class: >>> >>> public class FlinkMain { >>> >>> >>> public static void main(String[] args) { >>> //load properties >>> Properties pro =3D new Properties(); >>> try { >>> pro.load(new >>> FileInputStream("./resources/config.properties")); >>> } catch (Exception e) { >>> e.printStackTrace(); >>> } >>> int maxIteration =3D >>> 1;//Integer.parseInt(pro.getProperty("maxiterations")); >>> String outputPath =3D pro.getProperty("flink.output"); >>> // set up execution environment >>> ExecutionEnvironment env =3D >>> ExecutionEnvironment.getExecutionEnvironment(); >>> // get input points >>> DataSet points =3D getPointDataSet(env); >>> DataSet centroids =3D getCentroidDataSet(env= ); >>> // set number of bulk iterations for KMeans algorithm >>> IterativeDataSet loop =3D >>> centroids.iterate(maxIteration); >>> DataSet newCentroids =3D points >>> // compute closest centroid for each point >>> .map(new SelectNearestCenter()).withBroadcastSet(loop, >>> "centroids") >>> // count and sum point coordinates for each centroid >>> .groupBy(0).reduce(new CentroidAccumulator()) >>> // compute new centroids from point counts and coordinate >>> sums >>> .map(new CentroidAverager()); >>> // feed new centroids back into next iteration >>> DataSet finalCentroids =3D >>> loop.closeWith(newCentroids); >>> DataSet> clusteredPoints =3D >>> points >>> // assign points to final clusters >>> .map(new >>> SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids"); >>> // emit result >>> clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " "); >>> finalCentroids.writeAsText(outputPath+"/centers");//print(); >>> // execute program >>> try { >>> env.execute("KMeans Flink"); >>> } catch (Exception e) { >>> e.printStackTrace(); >>> } >>> } >>> >>> private static final class SelectNearestCenter extends >>> RichMapFunction> { >>> >>> private static final long serialVersionUID =3D >>> -2729445046389350264L; >>> private Collection centroids; >>> >>> @Override >>> public void open(Configuration parameters) throws Exception { >>> this.centroids =3D >>> getRuntimeContext().getBroadcastVariable("centroids"); >>> } >>> >>> @Override >>> public Tuple2 map(GeoTimeDataTupel >>> point) throws Exception { >>> double minDistance =3D Double.MAX_VALUE; >>> int closestCentroidId=3D -1; >>> >>> // check all cluster centers >>> for(GeoTimeDataCenter centroid : centroids) { >>> // compute distance >>> double distance =3D Distance.ComputeDist(point, centroi= d); >>> // update nearest cluster if necessary >>> if(distance < minDistance) { >>> minDistance =3D distance; >>> closestCentroidId =3D centroid.getId(); >>> } >>> } >>> // emit a new record with the center id and the data point >>> return new Tuple2>> GeoTimeDataTupel>(closestCentroidId, point); >>> } >>> } >>> >>> // sums and counts point coordinates >>> private static final class CentroidAccumulator implements >>> ReduceFunction> { >>> >>> private static final long serialVersionUID =3D >>> -4868797820391121771L; >>> >>> public Tuple2 reduce(Tuple2>> GeoTimeDataTupel> val1, Tuple2 val2) { >>> return new Tuple2(val1.f0, >>> addAndDiv(val1.f1,val2.f1)); >>> } >>> } >>> >>> private static GeoTimeDataTupel addAndDiv(GeoTimeDataTupel input1, >>> GeoTimeDataTupel input2){ >>> long time =3D (input1.getTime()+input2.getTime())/2; >>> List list =3D new ArrayList()= ; >>> list.add(input1.getGeo()); >>> list.add(input2.getGeo()); >>> LatLongSeriable geo =3D Geometry.getGeoCenterOf(list); >>> >>> return new GeoTimeDataTupel(geo,time,"POINT"); >>> } >>> >>> // computes new centroid from coordinate sum and count of points >>> private static final class CentroidAverager implements >>> MapFunction, GeoTimeDataCenter> { >>> >>> private static final long serialVersionUID =3D >>> -2687234478847261803L; >>> >>> public GeoTimeDataCenter map(Tuple2 >>> value) { >>> return new GeoTimeDataCenter(value.f0, >>> value.f1.getGeo(),value.f1.getTime()); >>> } >>> } >>> >>> private static DataSet >>> getPointDataSet(ExecutionEnvironment env) { >>> // load properties >>> Properties pro =3D new Properties(); >>> try { >>> pro.load(new >>> FileInputStream("./resources/config.properties")); >>> } catch (Exception e) { >>> e.printStackTrace(); >>> } >>> String inputFile =3D pro.getProperty("input"); >>> // map csv file >>> return env.readCsvFile(inputFile) >>> .ignoreInvalidLines() >>> .fieldDelimiter('\u0009') >>> //.fieldDelimiter("\t") >>> //.lineDelimiter("\n") >>> .includeFields(true, true, false, false, false, false, >>> false, false, false, false, false >>> , false, false, false, false, false, false, false, >>> false, false, false >>> , false, false, false, false, false, false, false, >>> false, false, false >>> , false, false, false, false, false, false, false, >>> false, true, true >>> , false, false, false, false, false, false, false, >>> false, false, false >>> , false, false, false, false, false, false, false, >>> false) >>> //.includeFields(true,true,true,true) >>> .types(String.class, Long.class, Double.class, Double.class= ) >>> .map(new TuplePointConverter()); >>> } >>> >>> private static final class TuplePointConverter implements >>> MapFunction, GeoTimeDataTupel>{ >>> >>> private static final long serialVersionUID =3D >>> 3485560278562719538L; >>> >>> public GeoTimeDataTupel map(Tuple4 >>> t) throws Exception { >>> return new GeoTimeDataTupel(new LatLongSeriable(t.f2, t.f3)= , >>> t.f1, t.f0); >>> } >>> } >>> >>> private static DataSet >>> getCentroidDataSet(ExecutionEnvironment env) { >>> // load properties >>> Properties pro =3D new Properties(); >>> try { >>> pro.load(new >>> FileInputStream("./resources/config.properties")); >>> } catch (Exception e) { >>> e.printStackTrace(); >>> } >>> String seedFile =3D pro.getProperty("seed.file"); >>> boolean seedFlag =3D >>> Boolean.parseBoolean(pro.getProperty("seed.flag")); >>> // get points from file or random >>> if(seedFlag || !(new File(seedFile+"-1").exists())) { >>> Seeding.randomSeeding(); >>> } >>> // map csv file >>> return env.readCsvFile(seedFile+"-1") >>> .lineDelimiter("\n") >>> .fieldDelimiter('\u0009') >>> //.fieldDelimiter("\t") >>> .includeFields(true, true, true, true) >>> .types(Integer.class, Double.class, Double.class, Long.clas= s) >>> .map(new TupleCentroidConverter()); >>> } >>> >>> private static final class TupleCentroidConverter implements >>> MapFunction, GeoTimeDataCenter>{ >>> >>> private static final long serialVersionUID =3D >>> -1046538744363026794L; >>> >>> public GeoTimeDataCenter map(Tuple4>> Long> t) throws Exception { >>> return new GeoTimeDataCenter(t.f0,new LatLongSeriable(t.f1, >>> t.f2), t.f3); >>> } >>> } >>> } >>> >>> 2015-05-21 14:22 GMT+02:00 Till Rohrmann : >>> >>>> Concerning your first problem that you only see one resulting centroid= , >>>> your code looks good modulo the parts you haven't posted. >>>> >>>> However, your problem could simply be caused by a bad selection of >>>> initial centroids. If, for example, all centroids except for one don't= get >>>> any points assigned, then only one centroid will survive the iteration >>>> step. How do you do it? >>>> >>>> To check that all centroids are read you can print the contents of >>>> the centroids DataSet. Furthermore, you can simply println the new >>>> centroids after each iteration step. In local mode you can then observ= e the >>>> computation. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Thu, May 21, 2015 at 12:23 PM, Stephan Ewen >>>> wrote: >>>> >>>>> Hi! >>>>> >>>>> This problem should not depend on any user code. There are no >>>>> user-code dependent actors in Flink. >>>>> >>>>> Is there more stack trace that you can send us? It looks like it >>>>> misses the core exception that is causing the issue is not part of th= e >>>>> stack trace. >>>>> >>>>> Greetings, >>>>> Stephan >>>>> >>>>> >>>>> >>>>> On Thu, May 21, 2015 at 11:11 AM, Pa R=C3=B6 < >>>>> paul.roewer1990@googlemail.com> wrote: >>>>> >>>>>> hi flink community, >>>>>> >>>>>> i have implement k-means for clustering temporal geo data. i use th= e >>>>>> following github project and my own data structure: >>>>>> >>>>>> https://github.com/apache/flink/blob/master/flink-examples/flink-jav= a-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.j= ava >>>>>> >>>>>> not i have the problem, that flink read the centroids from file and >>>>>> work parallel futher. if i look at the results, i have the feeling, = that >>>>>> the prgramm load only one centroid point. >>>>>> >>>>>> i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the >>>>>> following exception: >>>>>> ERROR actor.OneForOneStrategy: exception during creation >>>>>> akka.actor.ActorInitializationException: exception during creation >>>>>> at akka.actor.ActorInitializationException$.apply(Actor.scala:21= 8) >>>>>> at akka.actor.ActorCell.create(ActorCell.scala:578) >>>>>> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425) >>>>>> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) >>>>>> at >>>>>> akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) >>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:218) >>>>>> at >>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abs= tractDispatcher.scala:386) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPoo= l.java:1339) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1= 979) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThr= ead.java:107) >>>>>> Caused by: java.lang.reflect.InvocationTargetException >>>>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >>>>>> Method) >>>>>> at >>>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstruc= torAccessorImpl.java:57) >>>>>> at >>>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Delegating= ConstructorAccessorImpl.java:45) >>>>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:52= 6) >>>>>> at akka.util.Reflect$.instantiate(Reflect.scala:65) >>>>>> at akka.actor.Props.newActor(Props.scala:337) >>>>>> at akka.actor.ActorCell.newActor(ActorCell.scala:534) >>>>>> at akka.actor.ActorCell.create(ActorCell.scala:560) >>>>>> ... 9 more >>>>>> >>>>>> how can i say flink, that it should be wait for loading dataset, an= d >>>>>> what say this exception? >>>>>> >>>>>> best regards, >>>>>> paul >>>>>> >>>>> >>>>> >>>> >>> >> > > --001a11c27cf08e0f7f0516b25a2a Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
There are two ways to do that:

1) Yo= u use a GroupReduceFunction, which gives you an iterator over all points si= milar to Hadoop's ReduceFunction.
2) You use the ReduceFunctio= n to compute the sum and the count at the same time (e.g., in two fields of= a Tuple2) and use a MapFunction to do the final division.

I&#= 39;d go with the first choice. It's easier.

Best, Fabian

2015-05-2= 2 23:09 GMT+02:00 Paul R=C3=B6wer <paul.roewer1990@googlemail= .com>:
=20 =20 =20
good evening,

sorry, my english is not the best.

by comupte the new centroid, i will sum all points of the cluster and form the new center.
in my other implementation firstly i sum all point and at the end i divides by number of points.
to example: (1+2+3+4)/4=3D2,5

at flink i reduce always two point to one,
for the example upstairs: (1+2)/2=3D1,5 --> (1,5+3)/2=3D2,25 --> (2,25+4)=3D3,125

how can i rewrite my function so, that it work like my other implementation?

best regards,
paul



Am 22.05.2015 um 16:52 schrieb Stephan Ewen:
Sorry, I don't understand the question.

Can you describe a bit better what you mean with "how i can sum all points and share thoug the counter" ?

Thanks!

On Fri, May 22, 2015 at 2:06 PM, Pa R=C3= =B6 <paul.roewer1990@googlemail.com> wrote:
i have fix a bug at the input reading, but the results are still different.

i think i have local the problem, in the other implementation i sum all geo points/time points and share thougt the counter.
but in flink i sum two points and share thougt two, and sum the next...

the method is the following:

// sums and counts point coordinates
=C2=A0=C2=A0=C2=A0 private static final class CentroidAcc= umulator implements ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {

=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 private static fina= l long serialVersionUID =3D -4868797820391121771L;
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 public Tuple2<In= teger, GeoTimeDataTupel> reduce(Tuple2<Integer, GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0, addAndDiv(val1.f0,val1.f1,val2.f1));
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0
=C2=A0=C2=A0=C2=A0 private static GeoTimeDataTupel addAndDi= v(int clusterid,GeoTimeDataTupel input1, GeoTimeDataTupel input2){
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 long time =3D (input1.getTime()+input2.getTime())/2;
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 List<LatLongSeri= able> list =3D new ArrayList<LatLongSeriable>();
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 list.add(input1.get= Geo());
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 list.add(input2.get= Geo());
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 LatLongSeriable geo= =3D Geometry.getGeoCenterOf(list);

=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 return new GeoTimeD= ataTupel(geo,time,"POINT");
=C2=A0=C2=A0=C2=A0 }

how i can sum all points and share thoug the counter?


2015-05-22 9:53 GMT+02:00 Pa R=C3=B6 <paul.roewer1990@googlemail.com>:
hi,
if i print the centroids all are show in the output. i have implement k means with map reduce und spark. by same input, i get the same output. but in flink i get a one cluster output with this input set. (i use csv files from the GDELT projekt)

here my class:

public class FlinkMain {

=C2=A0=C2=A0=C2=A0
=C2=A0=C2=A0=C2=A0 public static void main(Stri= ng[] args) {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 //load pr= operties
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 Propertie= s pro =3D new Properties();
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 try {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 pro.load(new FileInputStream("./resources/config.proper= ties"));
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 } catch (= Exception e) {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 e.printStackTrace();
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 int maxIt= eration =3D 1;//Integer.parseInt(pro.getProperty("maxi= terations"));
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 String ou= tputPath =3D pro.getProperty("flink.output");
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 // set up= execution environment
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 Execution= Environment env =3D ExecutionEnvironment.getExecutionEnvironment();=
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 // get in= put points
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 DataSet&l= t;GeoTimeDataTupel> points =3D getPointDataSet(env);
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 DataSet&l= t;GeoTimeDataCenter> centroids =3D getCentroidDataSet(env);
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 // set nu= mber of bulk iterations for KMeans algorithm
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 IterativeDataSet<GeoTimeDataCenter> loop =3D centroids.iterate(maxIteration);
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 DataSet&l= t;GeoTimeDataCenter> newCentroids =3D points
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 // compute closest centroid for each point
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 .map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 // count and sum point coordinates for each centroid
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 .groupBy(0).reduce(new CentroidAccumulator())
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 // compute new centroids from point counts and coordinate sums
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 .map(new CentroidAverager());
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 // feed n= ew centroids back into next iteration
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 DataSet&l= t;GeoTimeDataCenter> finalCentroids =3D loop.closeWith(newCentroids);
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 DataSet&l= t;Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints =3D points
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 // assign points to final clusters
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 .map(new SelectNearestCenter()).withBroadcastSet(finalCe= ntroids, "centroids");
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 // emit r= esult
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 clusteredPoints.writeAsCsv(outputPath+"/po= ints", "\n", " ");
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 finalCentroids.writeAsText(outputPath+"/ce= nters");//print();
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 // execut= e program
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 try {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 env.execute("KMeans Flink");
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 } catch (= Exception e) {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 e.printStackTrace();
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0
=C2=A0=C2=A0=C2=A0 private static final class SelectNearestCenter extends RichMapFunction<GeoTimeDataTupel,Tuple2<Integ= er,GeoTimeDataTupel>> {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 private stati= c final long serialVersionUID =3D -2729445046389350264L;
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 private Collection<GeoTimeDataCenter> centroids;
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 @Override
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 public void o= pen(Configuration parameters) throws Exception {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 this.centroids =3D getRuntimeContext().getBroadcastVariable("cent= roids");
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 @Override
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 public Tuple2= <Integer, GeoTimeDataTupel> map(GeoTimeDataTupel point) throws Exception {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 double minDistance =3D Double.MAX_VALUE;
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 int closestCentroidId=3D -1;
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 // check all cluster centers
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 for(GeoTimeDataCenter centroid : centroids) {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 // compute distance
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 double distance =3D Distance.ComputeDist(point, centroid);
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 // update nearest cluster if necessary
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 if(distance < minDistance) {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 minDistance =3D distance;
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 closestCentroidId =3D centroid.getId();
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 }
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 // emit a new record with the center id and the data point
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 return new Tuple2<Integer, GeoTimeDataTupel>(closestCentroidId, point);
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0
=C2=A0=C2=A0=C2=A0 // sums and counts point coordin= ates
=C2=A0=C2=A0=C2=A0 private static final class CentroidAccumulator implements ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {

=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 private stati= c final long serialVersionUID =3D -4868797820391121771L;
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 public Tuple2= <Integer, GeoTimeDataTupel> reduce(Tuple2<Integer, GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0, addAndDiv(val1.f1,val2.f1));
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0
=C2=A0=C2=A0=C2=A0 private static GeoTimeDataTupel addAndDiv(GeoTimeDataTupel input1, GeoTimeDataTupel input2){
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 long time =3D (input1.getTime()+input2.getTime())/2;
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 List<LatLo= ngSeriable> list =3D new ArrayList<LatLongSeriable>();
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 list.add(inpu= t1.getGeo());
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 list.add(inpu= t2.getGeo());
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 LatLongSeriab= le geo =3D Geometry.getGeoCenterOf(list);

=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 return new GeoTimeDataTupel(geo,time,"POINT");
=C2=A0=C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0
=C2=A0=C2=A0=C2=A0 // computes new centroid from co= ordinate sum and count of points
=C2=A0=C2=A0=C2=A0 private static final class Centr= oidAverager implements MapFunction<Tuple2<Integer, GeoTimeDataTupel>, GeoTimeDataCenter> {

=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 private stati= c final long serialVersionUID =3D -2687234478847261803L;

=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 public GeoTim= eDataCenter map(Tuple2<Integer, GeoTimeDataTupel> value) {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 return new GeoTimeDataCenter(value.f0, value.f1.getGeo(),value.f1.getTime());
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0 }

=C2=A0=C2=A0=C2=A0 private static DataSet<GeoTimeDataTupel> getPointDataSet(ExecutionEnvironment env) {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 // load prope= rties
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 Properties = pro =3D new Properties();
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 try {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 pro.load(new FileInputStream("./resources/config.properti= es"));
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 } catch (Ex= ception e) {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 e.printStackTrace();
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 String= inputFile =3D pro.getProperty("input");
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 // map csv fi= le
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 return env.re= adCsvFile(inputFile)
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 .ignoreInvalidLines()
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 .fieldDelimiter('\u0009')
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 //.fieldDelimiter("\t")
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 //.lineDelimiter("\n")
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 .includeFields(true, true, false, false, false, false, false, false, false, false, false
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 , false, false, false, false, false, false, false, false, false, false
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 , false, false, false, false, false, false, false, false, false, false
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 , false, false, false, false, false, false, false, false, true, true
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 , false, false, false, false, false, false, false, false, false, false
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 , false, false, false, false, false, false, false, false)
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 //.includeFields(true,true,true,true)
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 .types(String.class, Long.class, Double.class, Double.class)
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 .map(new TuplePointConverter());
=C2=A0=C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0
=C2=A0=C2=A0=C2=A0 private static final class TuplePointConverter implements MapFunction<Tuple4<String, Long, Double, Double>, GeoTimeDataTupel>{
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 private stati= c final long serialVersionUID =3D 3485560278562719538L;

=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 public GeoTim= eDataTupel map(Tuple4<String, Long, Double, Double> t) throws Exception {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 return new GeoTimeDataTupel(new LatLongSeriable(t.f2, t.f3), t.f1, t.f0);
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0 }

=C2=A0=C2=A0=C2=A0 private static DataSet<GeoTimeDataCenter> getCentroidDataSet(ExecutionEnvironment env) {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 // load prope= rties
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 Properties = pro =3D new Properties();
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 try {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 pro.load(new FileInputStream("./resources/config.properti= es"));
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 } catch (Ex= ception e) {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2= =A0=C2=A0 e.printStackTrace();
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 String= seedFile =3D pro.getProperty("seed.file");
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 boolean seedF= lag =3D Boolean.parseBoolean(pro.getProperty("seed.fla= g"));
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 // get points= from file or random
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 if(seedFlag |= | !(new File(seedFile+"-1").exists())) {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 Seeding.randomSeeding();
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 // map csv fi= le
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 return env.re= adCsvFile(seedFile+"-1")
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 .lineDelimiter("\n")
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 .fieldDelimiter('\u0009')
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 //.fieldDelimiter("\t")
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 .includeFields(true, true, true, true)
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 .types(Integer.class, Double.class, Double.class, Long.class)
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 .map(new TupleCentroidConverter());
=C2=A0=C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0
=C2=A0=C2=A0=C2=A0 private static final class TupleCentroidConverter implements MapFunction<Tuple4<Integer, Double, Double, Long>, GeoTimeDataCenter>{

=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 private stati= c final long serialVersionUID =3D -1046538744363026794L;

=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 public GeoTim= eDataCenter map(Tuple4<Integer, Double, Double, Long> t) throws Exception {
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0= =C2=A0 return new GeoTimeDataCenter(t.f0,new LatLongSeriable(t.f1, t.f2), t.f3);
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 }
=C2=A0=C2=A0=C2=A0 }
}

2015-05-21 14:22 GMT+02:00 Till Rohrmann <= ;trohrmann@apache= .org>:
Concerning your first problem that you only see one resulting centroid, your code looks good modulo the parts you haven't posted.=C2=A0

However, your problem could simply be caused by a bad selection of initial centroids. If, for example, all centroids except for one don't get any points assigned, then only one centroid will survive the iteration step. How do you do it?=C2=A0

To check that all centroids are read you can print the contents of the centroids DataSet. Furthermore, you can simply println the new centroids after each iteration step. In local mode you can then observe the computation.

Cheers,
Till

On Thu, May 21, 2015 at 12:23 PM, Stephan Ewen <= sewen@apache.org&= gt; wrote:
Hi!

This problem should not depend on any user code. There are no user-code dependent actors in Flink.

Is there more stack trace that you can send us? It looks like it misses the core exception that is causing the issue is not part of the stack trace.

Greetings,
Stephan



On Thu, May 21, 2015 at 11:11 AM, Pa R=C3=B6 <paul.roewer1990@googlemail.com> wrote:
hi flink community,

i have implement k-means for clustering temporal geo data. i use the following github project and my own data structure:
https://github.com/apache/flink/blob/master/flink-examples/flin= k-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMe= ans.java

not i have the problem, that flink read the centroids from file and work parallel futher. if i look at the results, i have the feeling, that the prgramm load only one centroid point.

i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the following exception:
ERROR actor.OneForOneSt= rategy: exception during creation
akka.actor.ActorI= nitializationException: exception during creation
=C2=A0=C2=A0=C2= =A0 at akka.actor.ActorI= nitializationException$.apply(Actor.scala:218)
=C2=A0=C2=A0=C2= =A0 at akka.actor.ActorC= ell.create(ActorCell.scala:578)
=C2=A0=C2=A0=C2= =A0 at akka.actor.ActorC= ell.invokeAll$1(ActorCell.scala:425)
=C2=A0=C2=A0=C2= =A0 at akka.actor.ActorC= ell.systemInvoke(ActorCell.scala:447)
=C2=A0=C2=A0=C2= =A0 at akka.dispatch.Mai= lbox.processAllSystemMessages(Mailbox.scala:262)
=C2=A0=C2=A0=C2= =A0 at akka.dispatch.Mai= lbox.run(Mailbox.scala:218)
=C2=A0=C2=A0=C2= =A0 at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDi= spatcher.scala:386)
=C2=A0=C2=A0=C2= =A0 at scala.concurrent.= forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
=C2=A0=C2=A0=C2= =A0 at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:= 1339)
=C2=A0=C2=A0=C2= =A0 at scala.concurrent.= forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
=C2=A0=C2=A0=C2= =A0 at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav= a:107)
Caused by: java.lang.reflect= .InvocationTargetException
=C2=A0=C2=A0=C2= =A0 at sun.reflect.Nativ= eConstructorAccessorImpl.newInstance0(Native Method)
=C2=A0=C2=A0=C2= =A0 at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAcce= ssorImpl.java:57)
=C2=A0=C2=A0=C2= =A0 at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstru= ctorAccessorImpl.java:45)
=C2=A0=C2=A0=C2= =A0 at java.lang.reflect= .Constructor.newInstance(Constructor.java:526)
=C2=A0=C2=A0=C2= =A0 at akka.util.Reflect= $.instantiate(Reflect.scala:65)
=C2=A0=C2=A0=C2= =A0 at akka.actor.Props.= newActor(Props.scala:337)
=C2=A0=C2=A0=C2= =A0 at akka.actor.ActorC= ell.newActor(ActorCell.scala:534)
=C2=A0=C2=A0=C2= =A0 at akka.actor.ActorC= ell.create(ActorCell.scala:560)
=C2=A0=C2=A0=C2= =A0 ... 9 more

how can i say flink, that it should be wait for loading dataset, and what say this exception?

best regards,
paul







--001a11c27cf08e0f7f0516b25a2a--