flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maximilian Michels <...@apache.org>
Subject Re: How to maintain the state of a variable in a map transformation.
Date Mon, 13 Jun 2016 09:54:05 GMT
Hi Ravikumar,

In short: No, you can't use closures to maintain a global state. If
you want to keep an always global state, you'll have to use
parallelism 1 or an external data store to keep that global state.

Is it possible to break up your global state into a set of local
states which can be combined in the end? That way, you can take
advantage of distributed parallel processing.

Cheers,
Max

On Fri, Jun 10, 2016 at 8:28 AM, Ravikumar Hawaldar
<ravikumar.hawaldar@gmail.com> wrote:
> Hi Fabian,  Thank you for your help.
>
> I want my Flink application to be distributed as well as I want the facility
> to support the update of the variable [Coefficients of LinearRegression].
>
> How you would do in that case?
>
> The problem with iteration is that it expects Dataset with same type to be
> fed back, and my variable is just a double[]. Otherwise I have to map every
> record with a double[] wrapped inside a tuple2 and then try out iterations
> but I am sure this won't work as well.
>
> Can I use closure or lambdas to maintain global state?
>
>
> Regards,
> Ravikumar
>
> On 9 June 2016 at 20:17, Fabian Hueske <fhueske@gmail.com> wrote:
>>
>> Hi,
>>
>> 1) Yes, that is correct. If you set the parallelism of an operator to 1 it
>> is only executed on a single node. It depends on your application, if you
>> need a global state or whether multiple local states are OK.
>> 2) Flink programs follow the concept a data flow. There is no
>> communication between parallel instances of a task, i.e., all four tasks of
>> a MapOperator with parallelism 4 cannot talk to each other. You might want
>> to take a look at Flink's iteration operators. With these you can feed data
>> back into a previous operator [1].
>> 4) Yes, that should work.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/iterations.html
>>
>> 2016-06-09 15:01 GMT+02:00 Ravikumar Hawaldar
>> <ravikumar.hawaldar@gmail.com>:
>>>
>>> Hi Fabian, Thank you for your answers,
>>>
>>> 1) If there is only single instance of that function, then it will defeat
>>> the purpose of distributed correct me if I am wrong, so If I run parallelism
>>> with 1 on cluster does that mean it will execute on only one node?
>>>
>>> 2) I mean to say, when a map operator returns a variable, is there any
>>> other function which takes that updated variable and returns that to all
>>> instances of map?
>>>
>>> 3) Question Cleared.
>>>
>>> 4) My question was can I use same ExecutionEnvironment for all flink
>>> programs in a module.
>>>
>>> 5) Question Cleared.
>>>
>>>
>>> Regards
>>> Ravikumar
>>>
>>>
>>>
>>> On 9 June 2016 at 17:58, Fabian Hueske <fhueske@gmail.com> wrote:
>>>>
>>>> Hi Ravikumar,
>>>>
>>>> I'll try to answer your questions:
>>>> 1) If you set the parallelism of a map function to 1, there will be only
>>>> a single instance of that function regardless whether it is execution
>>>> locally or remotely in a cluster.
>>>> 2) Flink does also support aggregations, (reduce, groupReduce, combine,
>>>> ...). However, I do not see how this would help with a stateful map
>>>> function.
>>>> 3) In Flink DataSet programs you usually construct the complete program
>>>> and call execute() after you have defined your sinks. There are two
>>>> exceptions: print() and collect() which both add special sinks and
>>>> immediately execute your program. print() prints the result to the stdout
of
>>>> the submitting client and collect() fetches a dataset as collection.
>>>> 4) I am not sure I understood your question. When you obtain an
>>>> ExecutionEnvironment with ExecutionEnvironment.getExecutionEnvrionment()
the
>>>> type of the returned environment depends on the context in which the program
>>>> was executed. It can be a local environment if it is executed from within
an
>>>> IDE or a RemodeExecutionEnvironment if the program is executed via the CLI
>>>> client and shipped to a remote cluster.
>>>> 5) A map operator processes records one after the other, i.e., as a
>>>> sequence. If you need a certain order, you can call DataSet.sortPartition()
>>>> to locally sort the partition.
>>>>
>>>> Hope that helps,
>>>> Fabian
>>>>
>>>> 2016-06-09 12:23 GMT+02:00 Ravikumar Hawaldar
>>>> <ravikumar.hawaldar@gmail.com>:
>>>>>
>>>>> Hi Till, Thank you for your answer, I have couple of questions
>>>>>
>>>>> 1) Setting parallelism on a single map function in local is fine but
on
>>>>> distributed will it work as local execution?
>>>>>
>>>>> 2) Is there any other way apart from setting parallelism? Like spark
>>>>> aggregate function?
>>>>>
>>>>> 3) Is it necessary that after transformations to call execute function?
>>>>> Or Execution starts as soon as it encounters a action (Similar to Spark)?
>>>>>
>>>>> 4) Can I create a global execution environment (Either local or
>>>>> distributed) for different Flink program in a module?
>>>>>
>>>>> 5) How to make the records come in sequence for a map or any other
>>>>> operator?
>>>>>
>>>>>
>>>>> Regards,
>>>>> Ravikumar
>>>>>
>>>>>
>>>>> On 8 June 2016 at 21:14, Till Rohrmann <trohrmann@apache.org> wrote:
>>>>>>
>>>>>> Hi Ravikumar,
>>>>>>
>>>>>> Flink's operators are stateful. So you can simply create a variable
in
>>>>>> your mapper to keep the state around. But every mapper instance will
have
>>>>>> it's own state. This state is determined by the records which are
sent to
>>>>>> this mapper instance. If you need a global state, then you have to
set the
>>>>>> parallelism to 1.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Wed, Jun 8, 2016 at 5:08 PM, Ravikumar Hawaldar
>>>>>> <ravikumar.hawaldar@gmail.com> wrote:
>>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I have an DataSet<UserDefinedType> which is roughly a record
in a
>>>>>>> DataSet Or a file.
>>>>>>>
>>>>>>> Now I am using map transformation on this DataSet to compute
a
>>>>>>> variable (coefficients of linear regression parameters and data
structure
>>>>>>> used is a double[]).
>>>>>>>
>>>>>>> Now the issue is that, per record the variable will get updated
and I
>>>>>>> am struggling to maintain state of this variable for the next
record.
>>>>>>>
>>>>>>> In simple, for first record the variable values will be 0.0,
and
>>>>>>> after first record the variable will get updated and I have to
pass this
>>>>>>> updated variable for the second record and so on for all records
in DataSet.
>>>>>>>
>>>>>>> Any suggestions on how to maintain state of a variable?
>>>>>>>
>>>>>>>
>>>>>>> Regards,
>>>>>>> Ravikumar
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message