flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Read once input data?
Date Tue, 16 Feb 2016 20:47:39 GMT
You can use so-called BroadcastSets to send any sufficiently small DataSet
(such as a computed average) to any other function and use it there.
However, in your case you'll end up with a data flow that branches (at the
source) and merges again (when the average is send to the second map).
Such patterns can cause deadlocks and can therefore not be pipelined which
means that the data before the branch is written to disk and read again.
In your case it might be even better to read the data twice instead of
reading, writing, and reading it.

Fabian

2016-02-16 21:15 GMT+01:00 Saliya Ekanayake <esaliya@gmail.com>:

> I looked at the samples and I think what you meant is clear, but I didn't
> find a solution for my need. In my case, I want to use the result from
> first map operation before I can apply the second map on the *same* data
> set. For simplicity, let's say I've a bunch of short values represented as
> my data set. Then I need to find their average, so I use a map and reduce.
> Then I want to map these short values with another function, but it needs
> that average computed in the beginning to work correctly.
>
> Is this possible without doing multiple reads of the input data to create
> the same dataset?
>
> Thank you,
> saliya
>
> On Tue, Feb 16, 2016 at 12:03 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>
>> Yes, if you implement both maps in a single job, data is read once.
>>
>> 2016-02-16 15:53 GMT+01:00 Saliya Ekanayake <esaliya@gmail.com>:
>>
>>> Fabian,
>>>
>>> I've a quick follow-up question on what you suggested. When streaming
>>> the same data through different maps, were you implying that everything
>>> goes as single job in Flink, so data read happens only once?
>>>
>>> Thanks,
>>> Saliya
>>>
>>> On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske <fhueske@gmail.com>
>>> wrote:
>>>
>>>> It is not possible to "pin" data sets in memory, yet.
>>>> However, you can stream the same data set through two different mappers
>>>> at the same time.
>>>>
>>>> For instance you can have a job like:
>>>>
>>>>                  /---> Map 1 --> SInk1
>>>> Source --<
>>>>                  \---> Map 2 --> SInk2
>>>>
>>>> and execute it at once.
>>>> For that you define you data flow and call execute once after all sinks
>>>> have been created.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <esaliya@gmail.com>:
>>>>
>>>>> Fabian,
>>>>>
>>>>> count() was just an example. What I would like to do is say run two
>>>>> map operations on the dataset (ds). Each map will have it's own reduction,
>>>>> so is there a way to avoid creating two jobs for such scenario?
>>>>>
>>>>> The reason is, reading these binary matrices are expensive. In our
>>>>> current MPI implementation, I am using memory maps for faster loading
and
>>>>> reuse.
>>>>>
>>>>> Thank you,
>>>>> Saliya
>>>>>
>>>>> On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <fhueske@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> it looks like you are executing two distinct Flink jobs.
>>>>>> DataSet.count() triggers the execution of a new job. If you have
an
>>>>>> execute() call in your program, this will lead to two Flink jobs
being
>>>>>> executed.
>>>>>> It is not possible to share state among these jobs.
>>>>>>
>>>>>> Maybe you should add a custom count implementation (using a
>>>>>> ReduceFunction) which is executed in the same program as the other
>>>>>> ReduceFunction.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>>
>>>>>>
>>>>>> 2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <esaliya@gmail.com>:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I see that an InputFormat's open() and nextRecord() methods get
>>>>>>> called for each terminal operation on a given dataset using that
particular
>>>>>>> InputFormat. Is it possible to avoid this - possibly using some
caching
>>>>>>> technique in Flink?
>>>>>>>
>>>>>>> For example, I've some code like below and I see for both the
last
>>>>>>> two statements (reduce() and count()) the above methods in the
input format
>>>>>>> get called. Btw. this is a custom input format I wrote to represent
a
>>>>>>> binary matrix stored as Short values.
>>>>>>>
>>>>>>> ShortMatrixInputFormat smif = new ShortMatrixInputFormat();
>>>>>>>
>>>>>>> DataSet<Short[]> ds = env.createInput(smif, BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO);
>>>>>>>
>>>>>>> MapOperator<Short[], DoubleStatistics> op = ds.map(...)
>>>>>>>
>>>>>>> *op.reduce(...)*
>>>>>>>
>>>>>>> *op.count(...)*
>>>>>>>
>>>>>>>
>>>>>>> Thank you,
>>>>>>> Saliya
>>>>>>> --
>>>>>>> Saliya Ekanayake
>>>>>>> Ph.D. Candidate | Research Assistant
>>>>>>> School of Informatics and Computing | Digital Science Center
>>>>>>> Indiana University, Bloomington
>>>>>>> Cell 812-391-4914
>>>>>>> http://saliya.org
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Saliya Ekanayake
>>>>> Ph.D. Candidate | Research Assistant
>>>>> School of Informatics and Computing | Digital Science Center
>>>>> Indiana University, Bloomington
>>>>> Cell 812-391-4914
>>>>> http://saliya.org
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Saliya Ekanayake
>>> Ph.D. Candidate | Research Assistant
>>> School of Informatics and Computing | Digital Science Center
>>> Indiana University, Bloomington
>>> Cell 812-391-4914
>>> http://saliya.org
>>>
>>
>>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> Cell 812-391-4914
> http://saliya.org
>

Mime
View raw message