From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Flink Mongodb
Date Thu, 06 Nov 2014 14:41:40 GMT
Any help here..?

On Wed, Nov 5, 2014 at 6:39 PM, Flavio Pompermaier <pompermaier@okkam.it>

> Just shared the example at https://github.com/okkam-it/flink-mongodb-test
> and twitted :)
> The next step is to show how to write the result of a Flink process back
> to Mongo.
> How can I manage to do that? Can someone help me?
> On Wed, Nov 5, 2014 at 1:17 PM, Fabian Hueske <fhueske@apache.org> wrote:
>> How about going for an optional parameter for the InputFormat to
>> determine into how many splits each region is split?
>> That would be a lightweight option to control the number of splits with
>> low effort (on our side).
>> 2014-11-05 0:01 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>> So how are we going to proceed here? Is someone willing to help me in
>>> improving the splitting policy or we leave it as it is now?
>>> On Tue, Nov 4, 2014 at 9:42 PM, Fabian Hueske <fhueske@apache.org>
>>> wrote:
>>>> I agree. Going for more splits with smaller key regions is a good idea.
>>>> However, it might be a bit difficult to determine a good number of
>>>> splits as the size of a split depends on its density. Too large splits are
>>>> prone to cause data skew, too small ones will increase the overhead of
>>>> split assignment.
>>>> A solution for this problem could be to add an optional parameter to
>>>> the IF to give an upper bound for the number of InputSplits.
>>>> 2014-11-04 20:53 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>>>>> Typo: it should have meant that workers that get a larger split will
>>>>> get fewer additional splits.
>>>>> Am 04.11.2014 20:48 schrieb sewen@apache.org:
>>>>> InputSplits are assigned lazily at runtime, which gives you many of
>>>>> the benefits of re-assigning without the nastyness.
>>>>> Can you write the logic that creates the splits such that it creates
>>>>> multiple splits per region? Then the lazy assignment will make sure that
>>>>> workers that get a larger split will get get additional splits than workers
>>>>> that get smaller splits...
>>>>> Am 04.11.2014 20:32 schrieb "Fabian Hueske" <fhueske@apache.org>:
>>>>> Hmm, that's good question indeed. I am not familiar with HBase's mode
>>>>>> of operation.
>>>>>> I would assume, that HBase uses range partitioning to partition a
>>>>>> table into regions. That way it is rather easy to balance the size
>>>>>> regions, as long as there is no single key that occurs very often.
I am not
>>>>>> sure if it is possible to overcome data skew cause by frequent keys.
>>>>>> However as I said, these are just assumption. I will have a look
>>>>>> HBase's internals for verification.
>>>>>> In any case, Flink does currently not support reassigning
>>>>>> or splitting of InputSplits at runtime.
>>>>>> Also initially generating balanced InputSplits willl be tricky. That
>>>>>> would be possible if we can efficiently determine the "density" of
a key
>>>>>> range when creating the InputSplits. However, I'm a bit skeptical
>>>>>> this can be done...
>>>>>> 2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>>>> From what I know HBase manages the regions but the fact that
>>>>>>> are evenly distributed depends on a well-designed key..
>>>>>>> if it is not the case you could encounter very unbalanced regions
>>>>>>> (i.e. hot spotting).
>>>>>>> Could it be a good idea to create a split policy that compares
>>>>>>> size of all the splits and generate equally-sized split that
can be
>>>>>>> reassigned to free worker if the original assigned one is still
>>>>>>> On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <fhueske@apache.org>
>>>>>>> wrote:
>>>>>>>> ad 1) HBase manages the regions and should also take care
of their
>>>>>>>> uniform size.
>>>>>>>> as 2) Dynamically changing InputSplits is not possible at
>>>>>>>> moment. However, the input split generation of the IF should
also be able
>>>>>>>> to handle such issues upfront. In fact, the IF could also
generate multiple
>>>>>>>> splits per region (this would be necessary to make sure that
the minimum
>>>>>>>> number of splits is generated if there are less regions than
>>>>>>>> splits).
>>>>>>>> 2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it
>>>>>>>> >:
>>>>>>>>> Ok, thanks for the explanation!
>>>>>>>>> That was more or less like I thought it should be but
there are
>>>>>>>>> still points I'd like to clarify:
>>>>>>>>> 1 - What if a region is very big and there are other
regions very
>>>>>>>>> small..? There will be one slot that takes a very long
time while the
>>>>>>>>> others will stay inactive..
>>>>>>>>> 2 - Do you think it is possible to implement this in
an adaptive
>>>>>>>>> way (stop processing of huge region if it worth it and
assign remaining
>>>>>>>>> data to inactive task managers)?
>>>>>>>>> On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <fhueske@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>> Local split assignment preferably assigns input split
to workers
>>>>>>>>>> that can locally read the data of an input split.
>>>>>>>>>> For example, HDFS stores file chunks (blocks) distributed
>>>>>>>>>> the cluster and gives access to these chunks to every
worker via network
>>>>>>>>>> transfer. However, if a chunk is read from a process
that runs on the same
>>>>>>>>>> node as the chunk is stored, the read operation directly
accesses the local
>>>>>>>>>> file system without going over the network. Hence,
it is essential to
>>>>>>>>>> assign input splits based on the locality of their
data if you want to have
>>>>>>>>>> reasonably performance. We call this local split
assignment. This is a
>>>>>>>>>> general concept of all data parallel systems including
Hadoop, Spark, and
>>>>>>>>>> Flink.
>>>>>>>>>> This issue is not related to serializability of input
>>>>>>>>>> I assume that the wrapped MongoIF is also not capable
of local
>>>>>>>>>> split assignment.
>>>>>>>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier
>>>>>>>>>>> What do you mean for  "might lack support for
local split
>>>>>>>>>>> assignment"? You mean that InputFormat is not
>>>>>>>>>>> This instead is not true for Mongodb?
>>>>>>>>>>> On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske
>>>>>>>>>>> fhueske@apache.org> wrote:
>>>>>>>>>>>> There's a page about Hadoop Compatibility
that shows how to use
>>>>>>>>>>>> the wrapper.
>>>>>>>>>>>> The HBase format should work as well, but
might lack support
>>>>>>>>>>>> for local split assignment. In that case
performance would suffer a lot.
>>>>>>>>>>>> Am Dienstag, 4. November 2014 schrieb Flavio
Pompermaier :
>>>>>>>>>>>>> Should I start from
>>>>>>>>>>>>> http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html
>>>>>>>>>>>>> ? Is it ok?
>>>>>>>>>>>>> Thus, in principle, also the TableInputFormat
of HBase could
>>>>>>>>>>>>> be used in a similar way..isn't it?
>>>>>>>>>>>>> On Tue, Nov 4, 2014 at 9:42 AM, Fabian
Hueske <
>>>>>>>>>>>>> fhueske@apache.org> wrote:
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>> the blog post uses Flinks wrapper
for Hadoop InputFormats.
>>>>>>>>>>>>>> This has been ported to the new API
and is described in the
>>>>>>>>>>>>>> documentation.
>>>>>>>>>>>>>> So you just need to take Mongos Hadoop
IF and plug it into
>>>>>>>>>>>>>> the new IF wrapper. :-)
>>>>>>>>>>>>>> Fabian
>>>>>>>>>>>>>> Am Dienstag, 4. November 2014 schrieb
Flavio Pompermaier :
>>>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>>>>> I saw this post
>>>>>>>>>>>>>>> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
>>>>>>>>>>>>>>> but it use the old APIs (HadoopDataSource
instead of
>>>>>>>>>>>>>>> DataSource).
>>>>>>>>>>>>>>> How can I use Mongodb with the
new Flink APIs?
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> Flavio
>>>>>>>>>>>>>  .

