flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@apache.org>
Subject Re: Flink Mongodb
Date Tue, 04 Nov 2014 20:42:38 GMT
I agree. Going for more splits with smaller key regions is a good idea.
However, it might be a bit difficult to determine a good number of splits
as the size of a split depends on its density. Too large splits are prone
to cause data skew, too small ones will increase the overhead of split
assignment.

A solution for this problem could be to add an optional parameter to the IF
to give an upper bound for the number of InputSplits.

2014-11-04 20:53 GMT+01:00 Stephan Ewen <sewen@apache.org>:

> Typo: it should have meant that workers that get a larger split will get
> fewer additional splits.
> Am 04.11.2014 20:48 schrieb sewen@apache.org:
>
> InputSplits are assigned lazily at runtime, which gives you many of the
> benefits of re-assigning without the nastyness.
>
> Can you write the logic that creates the splits such that it creates
> multiple splits per region? Then the lazy assignment will make sure that
> workers that get a larger split will get get additional splits than workers
> that get smaller splits...
> Am 04.11.2014 20:32 schrieb "Fabian Hueske" <fhueske@apache.org>:
>
> Hmm, that's good question indeed. I am not familiar with HBase's mode of
>> operation.
>> I would assume, that HBase uses range partitioning to partition a table
>> into regions. That way it is rather easy to balance the size of regions, as
>> long as there is no single key that occurs very often. I am not sure if it
>> is possible to overcome data skew cause by frequent keys.
>> However as I said, these are just assumption. I will have a look at
>> HBase's internals for verification.
>>
>> In any case, Flink does currently not support reassigning or splitting
>> of InputSplits at runtime.
>> Also initially generating balanced InputSplits willl be tricky. That
>> would be possible if we can efficiently determine the "density" of a key
>> range when creating the InputSplits. However, I'm a bit skeptical that
>> this can be done...
>>
>> 2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>
>>> From what I know HBase manages the regions but the fact that they are
>>> evenly distributed depends on a well-designed key..
>>> if it is not the case you could encounter very unbalanced regions (i.e.
>>> hot spotting).
>>>
>>> Could it be a good idea to create a split policy that compares the size
>>> of all the splits and generate equally-sized split that can be reassigned
>>> to free worker if the original assigned one is still busy?
>>>
>>> On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <fhueske@apache.org>
>>> wrote:
>>>
>>>> ad 1) HBase manages the regions and should also take care of their
>>>> uniform size.
>>>> as 2) Dynamically changing InputSplits is not possible at the moment.
>>>> However, the input split generation of the IF should also be able to handle
>>>> such issues upfront. In fact, the IF could also generate multiple splits
>>>> per region (this would be necessary to make sure that the minimum number
of
>>>> splits is generated if there are less regions than required splits).
>>>>
>>>> 2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>
>>>>> Ok, thanks for the explanation!
>>>>> That was more or less like I thought it should be but there are still
>>>>> points I'd like to clarify:
>>>>>
>>>>> 1 - What if a region is very big and there are other regions very
>>>>> small..? There will be one slot that takes a very long time while the
>>>>> others will stay inactive..
>>>>> 2 - Do you think it is possible to implement this in an adaptive way
>>>>> (stop processing of huge region if it worth it and assign remaining data
to
>>>>> inactive task managers)?
>>>>>
>>>>>
>>>>> On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <fhueske@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Local split assignment preferably assigns input split to workers
that
>>>>>> can locally read the data of an input split.
>>>>>> For example, HDFS stores file chunks (blocks) distributed over the
>>>>>> cluster and gives access to these chunks to every worker via network
>>>>>> transfer. However, if a chunk is read from a process that runs on
the same
>>>>>> node as the chunk is stored, the read operation directly accesses
the local
>>>>>> file system without going over the network. Hence, it is essential
to
>>>>>> assign input splits based on the locality of their data if you want
to have
>>>>>> reasonably performance. We call this local split assignment. This
is a
>>>>>> general concept of all data parallel systems including Hadoop, Spark,
and
>>>>>> Flink.
>>>>>>
>>>>>> This issue is not related to serializability of input formats.
>>>>>> I assume that the wrapped MongoIF is also not capable of local split
>>>>>> assignment.
>>>>>>
>>>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>>>>>
>>>>>>> What do you mean for  "might lack support for local split
>>>>>>> assignment"? You mean that InputFormat is not serializable? This
>>>>>>> instead is not true for Mongodb?
>>>>>>>
>>>>>>> On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <fhueske@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> There's a page about Hadoop Compatibility that shows how
to use the
>>>>>>>> wrapper.
>>>>>>>>
>>>>>>>> The HBase format should work as well, but might lack support
for
>>>>>>>> local split assignment. In that case performance would suffer
a lot.
>>>>>>>>
>>>>>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier
:
>>>>>>>>
>>>>>>>>> Should I start from
>>>>>>>>> http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html
>>>>>>>>> ? Is it ok?
>>>>>>>>> Thus, in principle, also the TableInputFormat of HBase
could be
>>>>>>>>> used in a similar way..isn't it?
>>>>>>>>>
>>>>>>>>> On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <fhueske@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> the blog post uses Flinks wrapper for Hadoop InputFormats.
>>>>>>>>>> This has been ported to the new API and is described
in the
>>>>>>>>>> documentation.
>>>>>>>>>>
>>>>>>>>>> So you just need to take Mongos Hadoop IF and plug
it into the
>>>>>>>>>> new IF wrapper. :-)
>>>>>>>>>>
>>>>>>>>>> Fabian
>>>>>>>>>>
>>>>>>>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier
:
>>>>>>>>>>
>>>>>>>>>> Hi to all,
>>>>>>>>>>>
>>>>>>>>>>> I saw this post
>>>>>>>>>>> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
>>>>>>>>>>> but it use the old APIs (HadoopDataSource instead
of DataSource).
>>>>>>>>>>> How can I use Mongodb with the new Flink APIs?
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Flavio
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>  .
>>>
>>>
>>

Mime
View raw message