flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Flink Mongodb
Date Mon, 10 Nov 2014 16:06:41 GMT
Hi Flavio!

Looks very nice :-)

Is the repository going to stay for a while? Can we link to your example
from the Flink Website?

Stephan


On Fri, Nov 7, 2014 at 5:01 PM, Flavio Pompermaier <pompermaier@okkam.it>
wrote:

> I managed to write back to mongo using this:
>
> MongoConfigUtil.setOutputURI( hdIf.getJobConf(),
> "mongodb://localhost:27017/test.testData");
> // emit result (this works only locally)
> fin.output(new HadoopOutputFormat<Text,BSONWritable>(new
> MongoOutputFormat<Text,BSONWritable>(), hdIf.getJobConf()));
>
> So I updated also the example at
> https://github.com/okkam-it/flink-mongodb-test :)
>
> On Thu, Nov 6, 2014 at 5:10 PM, Stephan Ewen <sewen@apache.org> wrote:
>
>> Hi!
>> Can you:
>>
>>   - either return a BSONWritable from the function
>>   - or type the output formats to String?
>>
>> The MongoRecordWriter can work with non BSON objects as well.
>> https://github.com/mongodb/mongo-hadoop/blob/master/core/src/main/java/com/mongodb/hadoop/mapred/output/MongoRecordWriter.java
>>
>>
>> Stephan
>>
>>
>> On Thu, Nov 6, 2014 at 4:12 PM, Flavio Pompermaier <pompermaier@okkam.it>
>> wrote:
>>
>>> I'm trying to do that but I can't find the proper typing.. For example:
>>>
>>> DataSet<String> fin = input.map(new MapFunction<Tuple2<BSONWritable,
>>> BSONWritable>, String>() {
>>>
>>> private static final long serialVersionUID = 1L;
>>>
>>> @Override
>>> public String map(Tuple2<BSONWritable, BSONWritable> record) throws
>>> Exception {
>>> BSONWritable value = record.getField(1);
>>> BSONObject doc = value.getDoc();
>>> BasicDBObject jsonld = (BasicDBObject) doc.get("jsonld");
>>> String type = jsonld.getString("@type");
>>> return type;
>>> }
>>> });
>>>
>>> MongoConfigUtil.setOutputURI( hdIf.getJobConf(),
>>> "mongodb://localhost:27017/test.test");
>>> fin.output(new HadoopOutputFormat<BSONWritable,BSONWritable>(new
>>> MongoOutputFormat<BSONWritable,BSONWritable>(), hdIf.getJobConf()));
>>>
>>> Obviously this doesn't work because I'm emitting strings and trying to
>>> write BSONWritable ..can you show me a simple working example?
>>>
>>> Best,
>>> Flavio
>>>
>>> On Thu, Nov 6, 2014 at 3:58 PM, Stephan Ewen <sewen@apache.org> wrote:
>>>
>>>> Hi Flavio!
>>>>
>>>> I think the general method is the same as with the inputs.
>>>>
>>>> You use the "HadoopOutputFormat" wrapping the "MongoOutputFormat" (
>>>> https://github.com/mongodb/mongo-hadoop/blob/master/core/src/main/java/com/mongodb/hadoop/mapred/MongoOutputFormat.java
>>>> )
>>>>
>>>> You can then call
>>>>
>>>> DataSet<Tuple2<BSONWritable, BSONWritable>> data = ...;
>>>>
>>>> data.output(mongoOutput);
>>>>
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>> On Thu, Nov 6, 2014 at 3:41 PM, Flavio Pompermaier <
>>>> pompermaier@okkam.it> wrote:
>>>>
>>>>> Any help here..?
>>>>>
>>>>> On Wed, Nov 5, 2014 at 6:39 PM, Flavio Pompermaier <
>>>>> pompermaier@okkam.it> wrote:
>>>>>
>>>>>> Just shared the example at
>>>>>> https://github.com/okkam-it/flink-mongodb-test and twitted :)
>>>>>>
>>>>>> The next step is to show how to write the result of a Flink process
>>>>>> back to Mongo.
>>>>>> How can I manage to do that? Can someone help me?
>>>>>>
>>>>>> On Wed, Nov 5, 2014 at 1:17 PM, Fabian Hueske <fhueske@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> How about going for an optional parameter for the InputFormat
to
>>>>>>> determine into how many splits each region is split?
>>>>>>> That would be a lightweight option to control the number of splits
>>>>>>> with low effort (on our side).
>>>>>>>
>>>>>>> 2014-11-05 0:01 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>>>>
>>>>>>>> So how are we going to proceed here? Is someone willing to
help me
>>>>>>>> in improving the splitting policy or we leave it as it is
now?
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Nov 4, 2014 at 9:42 PM, Fabian Hueske <fhueske@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I agree. Going for more splits with smaller key regions
is a
>>>>>>>>> good idea.
>>>>>>>>> However, it might be a bit difficult to determine a good
number of
>>>>>>>>> splits as the size of a split depends on its density.
Too large splits are
>>>>>>>>> prone to cause data skew, too small ones will increase
the overhead of
>>>>>>>>> split assignment.
>>>>>>>>>
>>>>>>>>> A solution for this problem could be to add an optional
>>>>>>>>> parameter to the IF to give an upper bound for the number
of InputSplits.
>>>>>>>>>
>>>>>>>>> 2014-11-04 20:53 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>>>>>>>>>
>>>>>>>>>> Typo: it should have meant that workers that get
a larger split
>>>>>>>>>> will get fewer additional splits.
>>>>>>>>>> Am 04.11.2014 20:48 schrieb sewen@apache.org:
>>>>>>>>>>
>>>>>>>>>> InputSplits are assigned lazily at runtime, which
gives you many
>>>>>>>>>> of the benefits of re-assigning without the nastyness.
>>>>>>>>>>
>>>>>>>>>> Can you write the logic that creates the splits such
that it
>>>>>>>>>> creates multiple splits per region? Then the lazy
assignment will make sure
>>>>>>>>>> that workers that get a larger split will get get
additional splits than
>>>>>>>>>> workers that get smaller splits...
>>>>>>>>>> Am 04.11.2014 20:32 schrieb "Fabian Hueske" <fhueske@apache.org>:
>>>>>>>>>>
>>>>>>>>>> Hmm, that's good question indeed. I am not familiar
with HBase's
>>>>>>>>>>> mode of operation.
>>>>>>>>>>> I would assume, that HBase uses range partitioning
to partition
>>>>>>>>>>> a table into regions. That way it is rather easy
to balance the size of
>>>>>>>>>>> regions, as long as there is no single key that
occurs very often. I am not
>>>>>>>>>>> sure if it is possible to overcome data skew
cause by frequent keys.
>>>>>>>>>>> However as I said, these are just assumption.
I will have a look
>>>>>>>>>>> at HBase's internals for verification.
>>>>>>>>>>>
>>>>>>>>>>> In any case, Flink does currently not support
reassigning
>>>>>>>>>>> or splitting of InputSplits at runtime.
>>>>>>>>>>> Also initially generating balanced InputSplits
willl be
>>>>>>>>>>> tricky. That would be possible if we can efficiently
determine the
>>>>>>>>>>> "density" of a key range when creating the InputSplits.
However, I'm a bit
>>>>>>>>>>> skeptical that this can be done...
>>>>>>>>>>>
>>>>>>>>>>> 2014-11-04 17:33 GMT+01:00 Flavio Pompermaier
<
>>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>>
>>>>>>>>>>>> From what I know HBase manages the regions
but the fact that
>>>>>>>>>>>> they are evenly distributed depends on a
well-designed key..
>>>>>>>>>>>> if it is not the case you could encounter
very unbalanced
>>>>>>>>>>>> regions (i.e. hot spotting).
>>>>>>>>>>>>
>>>>>>>>>>>> Could it be a good idea to create a split
policy that compares
>>>>>>>>>>>> the size of all the splits and generate equally-sized
split that can be
>>>>>>>>>>>> reassigned to free worker if the original
assigned one is still busy?
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske
<
>>>>>>>>>>>> fhueske@apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> ad 1) HBase manages the regions and should
also take care
>>>>>>>>>>>>> of their uniform size.
>>>>>>>>>>>>> as 2) Dynamically changing InputSplits
is not possible at the
>>>>>>>>>>>>> moment. However, the input split generation
of the IF should also be able
>>>>>>>>>>>>> to handle such issues upfront. In fact,
the IF could also generate multiple
>>>>>>>>>>>>> splits per region (this would be necessary
to make sure that the minimum
>>>>>>>>>>>>> number of splits is generated if there
are less regions than required
>>>>>>>>>>>>> splits).
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2014-11-04 17:04 GMT+01:00 Flavio Pompermaier
<
>>>>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Ok, thanks for the explanation!
>>>>>>>>>>>>>> That was more or less like I thought
it should be but there
>>>>>>>>>>>>>> are still points I'd like to clarify:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1 - What if a region is very big
and there are other regions
>>>>>>>>>>>>>> very small..? There will be one slot
that takes a very long time while the
>>>>>>>>>>>>>> others will stay inactive..
>>>>>>>>>>>>>> 2 - Do you think it is possible to
implement this in an
>>>>>>>>>>>>>> adaptive way (stop processing of
huge region if it worth it and assign
>>>>>>>>>>>>>> remaining data to inactive task managers)?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Nov 4, 2014 at 4:37 PM, Fabian
Hueske <
>>>>>>>>>>>>>> fhueske@apache.org> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Local split assignment preferably
assigns input split to
>>>>>>>>>>>>>>> workers that can locally read
the data of an input split.
>>>>>>>>>>>>>>> For example, HDFS stores file
chunks (blocks) distributed
>>>>>>>>>>>>>>> over the cluster and gives access
to these chunks to every worker via
>>>>>>>>>>>>>>> network transfer. However, if
a chunk is read from a process that runs on
>>>>>>>>>>>>>>> the same node as the chunk is
stored, the read operation directly accesses
>>>>>>>>>>>>>>> the local file system without
going over the network. Hence, it is
>>>>>>>>>>>>>>> essential to assign input splits
based on the locality of their data if you
>>>>>>>>>>>>>>> want to have reasonably performance.
We call this local split
>>>>>>>>>>>>>>> assignment. This is a general
concept of all data parallel systems
>>>>>>>>>>>>>>> including Hadoop, Spark, and
Flink.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This issue is not related to
serializability of input
>>>>>>>>>>>>>>> formats.
>>>>>>>>>>>>>>> I assume that the wrapped MongoIF
is also not capable of
>>>>>>>>>>>>>>> local split assignment.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Am Dienstag, 4. November 2014
schrieb Flavio Pompermaier :
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> What do you mean for  "might
lack support for local split
>>>>>>>>>>>>>>>> assignment"? You mean that
InputFormat is not
>>>>>>>>>>>>>>>> serializable? This instead
is not true for Mongodb?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, Nov 4, 2014 at 10:00
AM, Fabian Hueske <
>>>>>>>>>>>>>>>> fhueske@apache.org> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> There's a page about
Hadoop Compatibility that shows how
>>>>>>>>>>>>>>>>> to use the wrapper.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The HBase format should
work as well, but might lack
>>>>>>>>>>>>>>>>> support for local split
assignment. In that case performance would suffer a
>>>>>>>>>>>>>>>>> lot.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Am Dienstag, 4. November
2014 schrieb Flavio Pompermaier :
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Should I start from
>>>>>>>>>>>>>>>>>> http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html
>>>>>>>>>>>>>>>>>> ? Is it ok?
>>>>>>>>>>>>>>>>>> Thus, in principle,
also the TableInputFormat of HBase
>>>>>>>>>>>>>>>>>> could be used in
a similar way..isn't it?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Tue, Nov 4, 2014
at 9:42 AM, Fabian Hueske <
>>>>>>>>>>>>>>>>>> fhueske@apache.org>
wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> the blog post
uses Flinks wrapper for Hadoop
>>>>>>>>>>>>>>>>>>> InputFormats.
>>>>>>>>>>>>>>>>>>> This has been
ported to the new API and is described in
>>>>>>>>>>>>>>>>>>> the documentation.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> So you just need
to take Mongos Hadoop IF and plug it
>>>>>>>>>>>>>>>>>>> into the new
IF wrapper. :-)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Fabian
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Am Dienstag,
4. November 2014 schrieb Flavio Pompermaier
>>>>>>>>>>>>>>>>>>> :
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I saw this
post
>>>>>>>>>>>>>>>>>>>> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
>>>>>>>>>>>>>>>>>>>> but it use
the old APIs (HadoopDataSource instead of
>>>>>>>>>>>>>>>>>>>> DataSource).
>>>>>>>>>>>>>>>>>>>> How can I
use Mongodb with the new Flink APIs?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>> Flavio
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>  .
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>

Mime
View raw message