flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Iterative Algorithm
Date Mon, 29 Sep 2014 07:51:21 GMT
Hi,
yes you can, I modified the ConnectedComponents Example to print out the
iteration number inside one of the Join functions:

// open a delta iteration
    val verticesWithComponents = vertices.iterateDelta(vertices,
maxIterations, Array(0)) {
      (s, ws) =>

        // apply the step logic: join with the edges
        val allNeighbors = ws.join(edges).where(0).equalTo(0) (
          new RichJoinFunction[(Long, Long), (Long, Long), (Long, Long)] {
            override def join(vertex: (Long, Long), edge: (Long, Long)):
(Long, Long) = {
              val context = getIterationRuntimeContext
              println("Iteration #" + context.getSuperstepNumber)
              (edge._2, vertex._2)

            }
          })

        // select the minimum neighbor
        val minNeighbors = allNeighbors.groupBy(0).min(1)

        // update if the component of the candidate is smaller
        val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) {
          (newVertex, oldVertex, out: Collector[(Long, Long)]) =>
            if (newVertex._2 < oldVertex._2) out.collect(newVertex)
        }

        // delta and new workset are identical
        (updatedComponents, updatedComponents)
    }

Unfortunately for this you have to use a RichFunction instead of a lambda.

Cheers,
Aljoscha

On Sat, Sep 27, 2014 at 11:09 AM, Maximilian Alber <
alber.maximilian@gmail.com> wrote:

> Ok. I'm back at this point:
>
> In the 0.7 version is there a way to get the superstep number inside a
> iterateWithDeta function?
>
> Cheers,
> Max
>
> On Mon, Aug 18, 2014 at 12:05 PM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
>
>> Yes, but they were always available. Because user code in Java was always
>> in "Rich Functions". There is no rich function for iterations, though,
>> since iterations themselves don't have user code attached.
>>
>> Aljoscha
>>
>>
>> On Mon, Aug 18, 2014 at 10:59 AM, Fabian Hueske <fhueske@apache.org>
>> wrote:
>>
>>> RichFunctions were added to the JavaAPI recently:
>>>
>>>
>>> https://github.com/apache/incubator-flink/tree/72d7b86274c33d1570ffb22b1fca2081c15d753c/flink-java/src/main/java/org/apache/flink/api/java/functions
>>>
>>> Cheers, Fabian
>>>
>>>
>>> 2014-08-18 8:16 GMT+02:00 Aljoscha Krettek <aljoscha@apache.org>:
>>>
>>> Hi,
>>>> there is no RichFunction in the Java API either. You don't have to
>>>> create a new DataSet. Your iteration result will be a DataSet that results
>>>> from some operations based on the previous SolutionSet and/or WorkingSet.
>>>> For example:
>>>>
>>>> def stepFunction(s: DataSet[SolutionType], ws: DataSet[WorksetType]) = {
>>>>   val intermediate = ws.join(somethingFromOutside) where {...}
>>>> isEqualTo {...} map {...}
>>>>   val newSolution = s.join(intermediate) where ...
>>>>   val newWorkset = ...
>>>>   (newSolution, newWorkset)
>>>> }
>>>>
>>>> Aljoscha
>>>>
>>>>
>>>> On Sun, Aug 17, 2014 at 6:14 PM, Maximilian Alber <
>>>> alber.maximilian@gmail.com> wrote:
>>>>
>>>>> Hi!
>>>>> Thank you!
>>>>>
>>>>> But how do I join my result to the solution set if I cannot create a
>>>>> new DataSet inside the iteration?
>>>>> In Scala there is not yet a RichFunction for the Iterations, am I
>>>>> right? So I should best use the Java class?
>>>>>
>>>>> Mit freundlichen Grüßen,
>>>>> Max!
>>>>>
>>>>>
>>>>> On Fri, Aug 15, 2014 at 3:50 PM, Aljoscha Krettek <aljoscha@apache.org
>>>>> > wrote:
>>>>>
>>>>>> Hi,
>>>>>> right now, the only way of updating the solution set in a delta
>>>>>> iteration is by joining with the solution set from the previous iteration
>>>>>> and having the result of that join as the result of the step function.
I
>>>>>> working on simplifying iterations as well as bringing the Scala API
to
>>>>>> feature parity with the Java API. It should not be possible right
now to
>>>>>> create a new data source inside each iteration step.
>>>>>>
>>>>>> The way to get at the current iteration number is by having a rich
>>>>>> function instead of a lambda function. So instead of:
>>>>>> val someSet = ...
>>>>>> val otherSet = someSet map { x => x + 1}
>>>>>>
>>>>>> you would have:
>>>>>> val someSet = ...
>>>>>> val otherSet = someSet map( new MapFunction[InType, OutType]() {
>>>>>>   def apply(in: SomeType): SomeOtherType = {
>>>>>>     val iteration = getIterationRuntimeContext().getSuperstepNumber()
>>>>>>     (iteration, x, y, ...)
>>>>>>   }
>>>>>> })
>>>>>>
>>>>>> I hope that helps.
>>>>>>
>>>>>> Aljoscha
>>>>>>
>>>>>>
>>>>>> On Tue, Aug 12, 2014 at 8:21 PM, Maximilian Alber <
>>>>>> alber.maximilian@gmail.com> wrote:
>>>>>>
>>>>>>> Hi everybody,
>>>>>>>
>>>>>>> as already stated, I try currently to implement a Machine Learning
>>>>>>> algorithm on Stratosphere for the ML group at TU Berlin. I ran
into some
>>>>>>> issues.
>>>>>>>
>>>>>>> The basic scheme of my algorithm is:
>>>>>>>
>>>>>>> X = input data
>>>>>>> Y = input data
>>>>>>> residuals = Y
>>>>>>>
>>>>>>> model = array[float, float, float] size n
>>>>>>>
>>>>>>> for i in 1:n
>>>>>>>   a = calc_a(X, residuals)
>>>>>>>   b = calc_b(X, a, residuals)
>>>>>>>   c = calc_c(X, a, b, c, residuals)
>>>>>>>
>>>>>>>   model(i) = (a, b, c)
>>>>>>>   residuals = update_residuals(residuals, a, b, c)
>>>>>>>
>>>>>>> output model
>>>>>>>
>>>>>>> My attempt now would be to use the delta iterations, use the
model
>>>>>>> as solution set, and the residuals as working sets:
>>>>>>>
>>>>>>> Code:
>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>>  val X = getInputSource
>>>>>>> val Y = DataSource(YFile, CsvInputFormat[Float])
>>>>>>>
>>>>>>> val model = CollectionDataSource[(Int, Float, Float, Float)](List())
>>>>>>> val residual = Y
>>>>>>>
>>>>>>> def step_function(model: DataSet[(Int, Float, Float, Float)],
>>>>>>> residuals: DataSet[Float]) = {
>>>>>>> import util.Random
>>>>>>> (CollectionDataSource(Seq(new Tuple4(Random.nextInt, 1.0f, 1.0f,
>>>>>>> 2.0f))), residuals)
>>>>>>> }
>>>>>>>
>>>>>>> model.iterateWithDelta(
>>>>>>> residual,
>>>>>>> { x: (Int, Float, Float, Float) => x._1 },
>>>>>>> step_function,
>>>>>>> config.iterations
>>>>>>> )
>>>>>>>
>>>>>>> val output = model //map { x => println(x); x }
>>>>>>> val sink = output.write(outFile, CsvOutputFormat[(Int, Float,
Float,
>>>>>>> Float)], "Model output")
>>>>>>>
>>>>>>> Code End
>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>>
>>>>>>> At the moment I try just to output a list of tuples.
>>>>>>>
>>>>>>> My problems are:
>>>>>>> - instead of the random integer I would like to insert the index
of
>>>>>>> the iterations.
>>>>>>> - I get this error:
>>>>>>> 08/12/2014 20:14:37: Job execution switched to status SCHEDULED
>>>>>>> 08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>)
>>>>>>> (1/1) switched to SCHEDULED
>>>>>>> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to
>>>>>>> SCHEDULED
>>>>>>> 08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>)
>>>>>>> (1/1) switched to ASSIGNED
>>>>>>> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to
>>>>>>> ASSIGNED
>>>>>>> 08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>)
>>>>>>> (1/1) switched to READY
>>>>>>> 08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>)
>>>>>>> (1/1) switched to STARTING
>>>>>>> 08/12/2014 20:14:37: Job execution switched to status RUNNING
>>>>>>> 08/12/2014 20:14:37: DataSource(<Unnamed Collection Data Source>)
>>>>>>> (1/1) switched to RUNNING
>>>>>>> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to
READY
>>>>>>> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to
>>>>>>> STARTING
>>>>>>> 08/12/2014 20:14:37: DataSink(Model output) (1/1) switched to
RUNNING
>>>>>>> 08/12/2014 20:14:38: DataSource(<Unnamed Collection Data Source>)
>>>>>>> (1/1) switched to FINISHING
>>>>>>> 08/12/2014 20:14:38: DataSource(<Unnamed Collection Data Source>)
>>>>>>> (1/1) switched to CANCELING
>>>>>>> 08/12/2014 20:14:38: DataSink(Model output) (1/1) switched to
FAILED
>>>>>>> java.lang.RuntimeException: Cannot serialize record with out
field
>>>>>>> at position: 0
>>>>>>> at
>>>>>>> eu.stratosphere.api.java.record.io.CsvOutputFormat.writeRecord(CsvOutputFormat.java:295)
>>>>>>> at
>>>>>>> eu.stratosphere.api.java.record.io.CsvOutputFormat.writeRecord(CsvOutputFormat.java:50)
>>>>>>> at
>>>>>>> eu.stratosphere.pact.runtime.task.DataSinkTask.invoke(DataSinkTask.java:178)
>>>>>>> at
>>>>>>> eu.stratosphere.nephele.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:284)
>>>>>>> at java.lang.Thread.run(Thread.java:744)
>>>>>>>
>>>>>>> I doubt there is no record inside model. Because if I enable
the map
>>>>>>> function in the second last line I get an IndexOutOfBounds exception
at
>>>>>>> index 0.
>>>>>>>
>>>>>>> Many thanks in adavance
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Max
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message