flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Flink Mongodb
Date Tue, 04 Nov 2014 23:01:27 GMT
So how are we going to proceed here? Is someone willing to help me in
improving the splitting policy or we leave it as it is now?

On Tue, Nov 4, 2014 at 9:42 PM, Fabian Hueske <fhueske@apache.org> wrote:

> I agree. Going for more splits with smaller key regions is a good idea.
> However, it might be a bit difficult to determine a good number of splits
> as the size of a split depends on its density. Too large splits are prone
> to cause data skew, too small ones will increase the overhead of split
> assignment.
> A solution for this problem could be to add an optional parameter to
> the IF to give an upper bound for the number of InputSplits.
> 2014-11-04 20:53 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>> Typo: it should have meant that workers that get a larger split will get
>> fewer additional splits.
>> Am 04.11.2014 20:48 schrieb sewen@apache.org:
>> InputSplits are assigned lazily at runtime, which gives you many of the
>> benefits of re-assigning without the nastyness.
>> Can you write the logic that creates the splits such that it creates
>> multiple splits per region? Then the lazy assignment will make sure that
>> workers that get a larger split will get get additional splits than workers
>> that get smaller splits...
>> Am 04.11.2014 20:32 schrieb "Fabian Hueske" <fhueske@apache.org>:
>> Hmm, that's good question indeed. I am not familiar with HBase's mode of
>>> operation.
>>> I would assume, that HBase uses range partitioning to partition a table
>>> into regions. That way it is rather easy to balance the size of regions, as
>>> long as there is no single key that occurs very often. I am not sure if it
>>> is possible to overcome data skew cause by frequent keys.
>>> However as I said, these are just assumption. I will have a look at
>>> HBase's internals for verification.
>>> In any case, Flink does currently not support reassigning or splitting
>>> of InputSplits at runtime.
>>> Also initially generating balanced InputSplits willl be tricky. That
>>> would be possible if we can efficiently determine the "density" of a key
>>> range when creating the InputSplits. However, I'm a bit skeptical that
>>> this can be done...
>>> 2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>> From what I know HBase manages the regions but the fact that they are
>>>> evenly distributed depends on a well-designed key..
>>>> if it is not the case you could encounter very unbalanced regions (i.e.
>>>> hot spotting).
>>>> Could it be a good idea to create a split policy that compares the size
>>>> of all the splits and generate equally-sized split that can be reassigned
>>>> to free worker if the original assigned one is still busy?
>>>> On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <fhueske@apache.org>
>>>> wrote:
>>>>> ad 1) HBase manages the regions and should also take care of their
>>>>> uniform size.
>>>>> as 2) Dynamically changing InputSplits is not possible at the moment.
>>>>> However, the input split generation of the IF should also be able to
>>>>> such issues upfront. In fact, the IF could also generate multiple splits
>>>>> per region (this would be necessary to make sure that the minimum number
>>>>> splits is generated if there are less regions than required splits).
>>>>> 2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>>> Ok, thanks for the explanation!
>>>>>> That was more or less like I thought it should be but there are still
>>>>>> points I'd like to clarify:
>>>>>> 1 - What if a region is very big and there are other regions very
>>>>>> small..? There will be one slot that takes a very long time while
>>>>>> others will stay inactive..
>>>>>> 2 - Do you think it is possible to implement this in an adaptive
>>>>>> (stop processing of huge region if it worth it and assign remaining
data to
>>>>>> inactive task managers)?
>>>>>> On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <fhueske@apache.org>
>>>>>> wrote:
>>>>>>> Local split assignment preferably assigns input split to workers
>>>>>>> that can locally read the data of an input split.
>>>>>>> For example, HDFS stores file chunks (blocks) distributed over
>>>>>>> cluster and gives access to these chunks to every worker via
>>>>>>> transfer. However, if a chunk is read from a process that runs
on the same
>>>>>>> node as the chunk is stored, the read operation directly accesses
the local
>>>>>>> file system without going over the network. Hence, it is essential
>>>>>>> assign input splits based on the locality of their data if you
want to have
>>>>>>> reasonably performance. We call this local split assignment.
This is a
>>>>>>> general concept of all data parallel systems including Hadoop,
Spark, and
>>>>>>> Flink.
>>>>>>> This issue is not related to serializability of input formats.
>>>>>>> I assume that the wrapped MongoIF is also not capable of local
>>>>>>> assignment.
>>>>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>>>>>>> What do you mean for  "might lack support for local split
>>>>>>>> assignment"? You mean that InputFormat is not serializable?
>>>>>>>> instead is not true for Mongodb?
>>>>>>>> On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <fhueske@apache.org>
>>>>>>>> wrote:
>>>>>>>>> There's a page about Hadoop Compatibility that shows
how to use
>>>>>>>>> the wrapper.
>>>>>>>>> The HBase format should work as well, but might lack
support for
>>>>>>>>> local split assignment. In that case performance would
suffer a lot.
>>>>>>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier
>>>>>>>>>> Should I start from
>>>>>>>>>> http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html
>>>>>>>>>> ? Is it ok?
>>>>>>>>>> Thus, in principle, also the TableInputFormat of
HBase could be
>>>>>>>>>> used in a similar way..isn't it?
>>>>>>>>>> On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <fhueske@apache.org
>>>>>>>>>> > wrote:
>>>>>>>>>>> Hi,
>>>>>>>>>>> the blog post uses Flinks wrapper for Hadoop
>>>>>>>>>>> This has been ported to the new API and is described
in the
>>>>>>>>>>> documentation.
>>>>>>>>>>> So you just need to take Mongos Hadoop IF and
plug it into the
>>>>>>>>>>> new IF wrapper. :-)
>>>>>>>>>>> Fabian
>>>>>>>>>>> Am Dienstag, 4. November 2014 schrieb Flavio
Pompermaier :
>>>>>>>>>>> Hi to all,
>>>>>>>>>>>> I saw this post
>>>>>>>>>>>> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
>>>>>>>>>>>> but it use the old APIs (HadoopDataSource
instead of
>>>>>>>>>>>> DataSource).
>>>>>>>>>>>> How can I use Mongodb with the new Flink
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Flavio
>>>>>>>>>>  .

View raw message