flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ravikumar Hawaldar <ravikumar.hawal...@gmail.com>
Subject Re: How to maintain the state of a variable in a map transformation.
Date Fri, 10 Jun 2016 06:28:39 GMT
Hi Fabian,  Thank you for your help.

I want my Flink application to be distributed as well as I want
the facility to support the update of the variable [Coefficients of

How you would do in that case?

The problem with iteration is that it expects Dataset with same type to be
fed back, and my variable is just a double[]. Otherwise I have to map every
record with a double[] wrapped inside a tuple2 and then try out iterations
but I am sure this won't work as well.

Can I use closure or lambdas to maintain global state?


On 9 June 2016 at 20:17, Fabian Hueske <fhueske@gmail.com> wrote:

> Hi,
> 1) Yes, that is correct. If you set the parallelism of an operator to 1 it
> is only executed on a single node. It depends on your application, if you
> need a global state or whether multiple local states are OK.
> 2) Flink programs follow the concept a data flow. There is no
> communication between parallel instances of a task, i.e., all four tasks of
> a MapOperator with parallelism 4 cannot talk to each other. You might want
> to take a look at Flink's iteration operators. With these you can feed data
> back into a previous operator [1].
> 4) Yes, that should work.
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/iterations.html
> 2016-06-09 15:01 GMT+02:00 Ravikumar Hawaldar <
> ravikumar.hawaldar@gmail.com>:
>> Hi Fabian, Thank you for your answers,
>> 1) If there is only single instance of that function, then it will defeat
>> the purpose of distributed correct me if I am wrong, so If I run
>> parallelism with 1 on cluster does that mean it will execute on only one
>> node?
>> 2) I mean to say, when a map operator returns a variable, is there any
>> other function which takes that updated variable and returns that to all
>> instances of map?
>> 3) Question Cleared.
>> 4) My question was can I use same ExecutionEnvironment for all flink
>> programs in a module.
>> 5) Question Cleared.
>> Regards
>> Ravikumar
>> On 9 June 2016 at 17:58, Fabian Hueske <fhueske@gmail.com> wrote:
>>> Hi Ravikumar,
>>> I'll try to answer your questions:
>>> 1) If you set the parallelism of a map function to 1, there will be only
>>> a single instance of that function regardless whether it is execution
>>> locally or remotely in a cluster.
>>> 2) Flink does also support aggregations, (reduce, groupReduce, combine,
>>> ...). However, I do not see how this would help with a stateful map
>>> function.
>>> 3) In Flink DataSet programs you usually construct the complete program
>>> and call execute() after you have defined your sinks. There are two
>>> exceptions: print() and collect() which both add special sinks and
>>> immediately execute your program. print() prints the result to the stdout
>>> of the submitting client and collect() fetches a dataset as collection.
>>> 4) I am not sure I understood your question. When you obtain an
>>> ExecutionEnvironment with ExecutionEnvironment.getExecutionEnvrionment()
>>> the type of the returned environment depends on the context in which the
>>> program was executed. It can be a local environment if it is executed from
>>> within an IDE or a RemodeExecutionEnvironment if the program is executed
>>> via the CLI client and shipped to a remote cluster.
>>> 5) A map operator processes records one after the other, i.e., as a
>>> sequence. If you need a certain order, you can call DataSet.sortPartition()
>>> to locally sort the partition.
>>> Hope that helps,
>>> Fabian
>>> 2016-06-09 12:23 GMT+02:00 Ravikumar Hawaldar <
>>> ravikumar.hawaldar@gmail.com>:
>>>> Hi Till, Thank you for your answer, I have couple of questions
>>>> 1) Setting parallelism on a single map function in local is fine but on
>>>> distributed will it work as local execution?
>>>> 2) Is there any other way apart from setting parallelism? Like spark
>>>> aggregate function?
>>>> 3) Is it necessary that after transformations to call execute function?
>>>> Or Execution starts as soon as it encounters a action (Similar to Spark)?
>>>> 4) Can I create a global execution environment (Either local or
>>>> distributed) for different Flink program in a module?
>>>> 5) How to make the records come in sequence for a map or any other
>>>> operator?
>>>> Regards,
>>>> Ravikumar
>>>> On 8 June 2016 at 21:14, Till Rohrmann <trohrmann@apache.org> wrote:
>>>>> Hi Ravikumar,
>>>>> Flink's operators are stateful. So you can simply create a variable in
>>>>> your mapper to keep the state around. But every mapper instance will
>>>>> it's own state. This state is determined by the records which are sent
>>>>> this mapper instance. If you need a global state, then you have to set
>>>>> parallelism to 1.
>>>>> Cheers,
>>>>> Till
>>>>> On Wed, Jun 8, 2016 at 5:08 PM, Ravikumar Hawaldar <
>>>>> ravikumar.hawaldar@gmail.com> wrote:
>>>>>> Hello,
>>>>>> I have an DataSet<UserDefinedType> which is roughly a record
in a
>>>>>> DataSet Or a file.
>>>>>> Now I am using map transformation on this DataSet to compute a
>>>>>> variable (coefficients of linear regression parameters and data structure
>>>>>> used is a double[]).
>>>>>> Now the issue is that, per record the variable will get updated and
>>>>>> am struggling to maintain state of this variable for the next record.
>>>>>> In simple, for first record the variable values will be 0.0, and
>>>>>> after first record the variable will get updated and I have to pass
>>>>>> updated variable for the second record and so on for all records
in DataSet.
>>>>>> Any suggestions on how to maintain state of a variable?
>>>>>> Regards,
>>>>>> Ravikumar

View raw message