Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3CC67178B8 for ; Mon, 29 Sep 2014 10:00:31 +0000 (UTC) Received: (qmail 28370 invoked by uid 500); 29 Sep 2014 10:00:31 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 28320 invoked by uid 500); 29 Sep 2014 10:00:31 -0000 Mailing-List: contact user-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.incubator.apache.org Delivered-To: mailing list user@flink.incubator.apache.org Received: (qmail 28311 invoked by uid 99); 29 Sep 2014 10:00:31 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Sep 2014 10:00:31 +0000 X-ASF-Spam-Status: No, hits=-2000.6 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 29 Sep 2014 10:00:07 +0000 Received: (qmail 28198 invoked by uid 99); 29 Sep 2014 10:00:04 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Sep 2014 10:00:04 +0000 Received: from mail-oi0-f54.google.com (mail-oi0-f54.google.com [209.85.218.54]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 1C3F81A0479 for ; Mon, 29 Sep 2014 09:59:59 +0000 (UTC) Received: by mail-oi0-f54.google.com with SMTP id v63so803238oia.13 for ; Mon, 29 Sep 2014 03:00:01 -0700 (PDT) MIME-Version: 1.0 X-Received: by 10.182.133.104 with SMTP id pb8mr37984209obb.37.1411984801343; Mon, 29 Sep 2014 03:00:01 -0700 (PDT) Received: by 10.76.28.161 with HTTP; Mon, 29 Sep 2014 03:00:01 -0700 (PDT) In-Reply-To: References: Date: Mon, 29 Sep 2014 12:00:01 +0200 Message-ID: Subject: Re: Iterative Algorithm From: Aljoscha Krettek To: user@flink.incubator.apache.org Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable X-Virus-Checked: Checked by ClamAV on apache.org Yes, the "first level" iteration code is actually not executed at runtime. It is only executed once to put together the DataSet operations. This graph of operations is then repeatedly executed at runtime. Cheers, Aljoscha On Mon, Sep 29, 2014 at 10:30 AM, Maximilian Alber wrote: > Ah ok, that's the trick. So I can just use inside dataset function applie= d > via rich functions during the iteration but not in the "first level" > iteration code? > But it shouldn't be a problem for me. > > Thanks! > Cheers, > Max > > On Mon, Sep 29, 2014 at 9:51 AM, Aljoscha Krettek > wrote: >> >> Hi, >> yes you can, I modified the ConnectedComponents Example to print out the >> iteration number inside one of the Join functions: >> >> // open a delta iteration >> val verticesWithComponents =3D vertices.iterateDelta(vertices, >> maxIterations, Array(0)) { >> (s, ws) =3D> >> >> // apply the step logic: join with the edges >> val allNeighbors =3D ws.join(edges).where(0).equalTo(0) ( >> new RichJoinFunction[(Long, Long), (Long, Long), (Long, Long)]= { >> override def join(vertex: (Long, Long), edge: (Long, Long)): >> (Long, Long) =3D { >> val context =3D getIterationRuntimeContext >> println("Iteration #" + context.getSuperstepNumber) >> (edge._2, vertex._2) >> >> } >> }) >> >> // select the minimum neighbor >> val minNeighbors =3D allNeighbors.groupBy(0).min(1) >> >> // update if the component of the candidate is smaller >> val updatedComponents =3D minNeighbors.join(s).where(0).equalTo(= 0) { >> (newVertex, oldVertex, out: Collector[(Long, Long)]) =3D> >> if (newVertex._2 < oldVertex._2) out.collect(newVertex) >> } >> >> // delta and new workset are identical >> (updatedComponents, updatedComponents) >> } >> >> Unfortunately for this you have to use a RichFunction instead of a lambd= a. >> >> Cheers, >> Aljoscha >> >> On Sat, Sep 27, 2014 at 11:09 AM, Maximilian Alber >> wrote: >>> >>> Ok. I'm back at this point: >>> >>> In the 0.7 version is there a way to get the superstep number inside a >>> iterateWithDeta function? >>> >>> Cheers, >>> Max >>> >>> On Mon, Aug 18, 2014 at 12:05 PM, Aljoscha Krettek >>> wrote: >>>> >>>> Yes, but they were always available. Because user code in Java was >>>> always in "Rich Functions". There is no rich function for iterations, >>>> though, since iterations themselves don't have user code attached. >>>> >>>> Aljoscha >>>> >>>> >>>> On Mon, Aug 18, 2014 at 10:59 AM, Fabian Hueske >>>> wrote: >>>>> >>>>> RichFunctions were added to the JavaAPI recently: >>>>> >>>>> >>>>> https://github.com/apache/incubator-flink/tree/72d7b86274c33d1570ffb2= 2b1fca2081c15d753c/flink-java/src/main/java/org/apache/flink/api/java/funct= ions >>>>> >>>>> Cheers, Fabian >>>>> >>>>> >>>>> 2014-08-18 8:16 GMT+02:00 Aljoscha Krettek : >>>>> >>>>>> Hi, >>>>>> there is no RichFunction in the Java API either. You don't have to >>>>>> create a new DataSet. Your iteration result will be a DataSet that r= esults >>>>>> from some operations based on the previous SolutionSet and/or Workin= gSet. >>>>>> For example: >>>>>> >>>>>> def stepFunction(s: DataSet[SolutionType], ws: DataSet[WorksetType])= =3D >>>>>> { >>>>>> val intermediate =3D ws.join(somethingFromOutside) where {...} >>>>>> isEqualTo {...} map {...} >>>>>> val newSolution =3D s.join(intermediate) where ... >>>>>> val newWorkset =3D ... >>>>>> (newSolution, newWorkset) >>>>>> } >>>>>> >>>>>> Aljoscha >>>>>> >>>>>> >>>>>> On Sun, Aug 17, 2014 at 6:14 PM, Maximilian Alber >>>>>> wrote: >>>>>>> >>>>>>> Hi! >>>>>>> Thank you! >>>>>>> >>>>>>> But how do I join my result to the solution set if I cannot create = a >>>>>>> new DataSet inside the iteration? >>>>>>> In Scala there is not yet a RichFunction for the Iterations, am I >>>>>>> right? So I should best use the Java class? >>>>>>> >>>>>>> Mit freundlichen Gr=C3=BC=C3=9Fen, >>>>>>> Max! >>>>>>> >>>>>>> >>>>>>> On Fri, Aug 15, 2014 at 3:50 PM, Aljoscha Krettek >>>>>>> wrote: >>>>>>>> >>>>>>>> Hi, >>>>>>>> right now, the only way of updating the solution set in a delta >>>>>>>> iteration is by joining with the solution set from the previous it= eration >>>>>>>> and having the result of that join as the result of the step funct= ion. I >>>>>>>> working on simplifying iterations as well as bringing the Scala AP= I to >>>>>>>> feature parity with the Java API. It should not be possible right = now to >>>>>>>> create a new data source inside each iteration step. >>>>>>>> >>>>>>>> The way to get at the current iteration number is by having a rich >>>>>>>> function instead of a lambda function. So instead of: >>>>>>>> val someSet =3D ... >>>>>>>> val otherSet =3D someSet map { x =3D> x + 1} >>>>>>>> >>>>>>>> you would have: >>>>>>>> val someSet =3D ... >>>>>>>> val otherSet =3D someSet map( new MapFunction[InType, OutType]() { >>>>>>>> def apply(in: SomeType): SomeOtherType =3D { >>>>>>>> val iteration =3D >>>>>>>> getIterationRuntimeContext().getSuperstepNumber() >>>>>>>> (iteration, x, y, ...) >>>>>>>> } >>>>>>>> }) >>>>>>>> >>>>>>>> I hope that helps. >>>>>>>> >>>>>>>> Aljoscha >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Aug 12, 2014 at 8:21 PM, Maximilian Alber >>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Hi everybody, >>>>>>>>> >>>>>>>>> as already stated, I try currently to implement a Machine Learnin= g >>>>>>>>> algorithm on Stratosphere for the ML group at TU Berlin. I ran in= to some >>>>>>>>> issues. >>>>>>>>> >>>>>>>>> The basic scheme of my algorithm is: >>>>>>>>> >>>>>>>>> X =3D input data >>>>>>>>> Y =3D input data >>>>>>>>> residuals =3D Y >>>>>>>>> >>>>>>>>> model =3D array[float, float, float] size n >>>>>>>>> >>>>>>>>> for i in 1:n >>>>>>>>> a =3D calc_a(X, residuals) >>>>>>>>> b =3D calc_b(X, a, residuals) >>>>>>>>> c =3D calc_c(X, a, b, c, residuals) >>>>>>>>> >>>>>>>>> model(i) =3D (a, b, c) >>>>>>>>> residuals =3D update_residuals(residuals, a, b, c) >>>>>>>>> >>>>>>>>> output model >>>>>>>>> >>>>>>>>> My attempt now would be to use the delta iterations, use the mode= l >>>>>>>>> as solution set, and the residuals as working sets: >>>>>>>>> >>>>>>>>> Code: >>>>>>>>> -----------------------------------------------------------------= ---------------------------------------------------------------------------= ----------- >>>>>>>>> val X =3D getInputSource >>>>>>>>> val Y =3D DataSource(YFile, CsvInputFormat[Float]) >>>>>>>>> >>>>>>>>> val model =3D CollectionDataSource[(Int, Float, Float, >>>>>>>>> Float)](List()) >>>>>>>>> val residual =3D Y >>>>>>>>> >>>>>>>>> def step_function(model: DataSet[(Int, Float, Float, Float)], >>>>>>>>> residuals: DataSet[Float]) =3D { >>>>>>>>> import util.Random >>>>>>>>> (CollectionDataSource(Seq(new Tuple4(Random.nextInt, 1.0f, 1.0f, >>>>>>>>> 2.0f))), residuals) >>>>>>>>> } >>>>>>>>> >>>>>>>>> model.iterateWithDelta( >>>>>>>>> residual, >>>>>>>>> { x: (Int, Float, Float, Float) =3D> x._1 }, >>>>>>>>> step_function, >>>>>>>>> config.iterations >>>>>>>>> ) >>>>>>>>> >>>>>>>>> val output =3D model //map { x =3D> println(x); x } >>>>>>>>> val sink =3D output.write(outFile, CsvOutputFormat[(Int, Float, >>>>>>>>> Float, Float)], "Model output") >>>>>>>>> >>>>>>>>> Code End >>>>>>>>> -----------------------------------------------------------------= ---------------------------------------------------------------------------= -------- >>>>>>>>> >>>>>>>>> At the moment I try just to output a list of tuples. >>>>>>>>> >>>>>>>>> My problems are: >>>>>>>>> - instead of the random integer I would like to insert the index = of >>>>>>>>> the iterations. >>>>>>>>> - I get this error: >>>>>>>>> 08/12/2014 20:14:37: Job execution switched to status SCHEDULED >>>>>>>>> 08/12/2014 20:14:37: DataSource() >>>>>>>>> (1/1) switched to SCHEDULED >>>>>>>>> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to >>>>>>>>> SCHEDULED >>>>>>>>> 08/12/2014 20:14:37: DataSource() >>>>>>>>> (1/1) switched to ASSIGNED >>>>>>>>> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to >>>>>>>>> ASSIGNED >>>>>>>>> 08/12/2014 20:14:37: DataSource() >>>>>>>>> (1/1) switched to READY >>>>>>>>> 08/12/2014 20:14:37: DataSource() >>>>>>>>> (1/1) switched to STARTING >>>>>>>>> 08/12/2014 20:14:37: Job execution switched to status RUNNING >>>>>>>>> 08/12/2014 20:14:37: DataSource() >>>>>>>>> (1/1) switched to RUNNING >>>>>>>>> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to REA= DY >>>>>>>>> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to >>>>>>>>> STARTING >>>>>>>>> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to >>>>>>>>> RUNNING >>>>>>>>> 08/12/2014 20:14:38: DataSource() >>>>>>>>> (1/1) switched to FINISHING >>>>>>>>> 08/12/2014 20:14:38: DataSource() >>>>>>>>> (1/1) switched to CANCELING >>>>>>>>> 08/12/2014 20:14:38: DataSink(Model output) (1/1) switched to >>>>>>>>> FAILED >>>>>>>>> java.lang.RuntimeException: Cannot serialize record with out fiel= d >>>>>>>>> at position: 0 >>>>>>>>> at >>>>>>>>> eu.stratosphere.api.java.record.io.CsvOutputFormat.writeRecord(Cs= vOutputFormat.java:295) >>>>>>>>> at >>>>>>>>> eu.stratosphere.api.java.record.io.CsvOutputFormat.writeRecord(Cs= vOutputFormat.java:50) >>>>>>>>> at >>>>>>>>> eu.stratosphere.pact.runtime.task.DataSinkTask.invoke(DataSinkTas= k.java:178) >>>>>>>>> at >>>>>>>>> eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeE= nvironment.java:284) >>>>>>>>> at java.lang.Thread.run(Thread.java:744) >>>>>>>>> >>>>>>>>> I doubt there is no record inside model. Because if I enable the >>>>>>>>> map function in the second last line I get an IndexOutOfBounds ex= ception at >>>>>>>>> index 0. >>>>>>>>> >>>>>>>>> Many thanks in adavance >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Max >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >