flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Flink Mongodb
Date Mon, 10 Nov 2014 16:09:09 GMT
Yes, it will stay there as long it will work :)
However if you want to bring it into official flink examples it will be
better I think!

Best,
Flavio

On Mon, Nov 10, 2014 at 5:06 PM, Stephan Ewen <sewen@apache.org> wrote:

> Hi Flavio!
>
> Looks very nice :-)
>
> Is the repository going to stay for a while? Can we link to your example
> from the Flink Website?
>
> Stephan
>
>
> On Fri, Nov 7, 2014 at 5:01 PM, Flavio Pompermaier <pompermaier@okkam.it>
> wrote:
>
>> I managed to write back to mongo using this:
>>
>> MongoConfigUtil.setOutputURI( hdIf.getJobConf(),
>> "mongodb://localhost:27017/test.testData");
>> // emit result (this works only locally)
>> fin.output(new HadoopOutputFormat<Text,BSONWritable>(new
>> MongoOutputFormat<Text,BSONWritable>(), hdIf.getJobConf()));
>>
>> So I updated also the example at
>> https://github.com/okkam-it/flink-mongodb-test :)
>>
>> On Thu, Nov 6, 2014 at 5:10 PM, Stephan Ewen <sewen@apache.org> wrote:
>>
>>> Hi!
>>> Can you:
>>>
>>>   - either return a BSONWritable from the function
>>>   - or type the output formats to String?
>>>
>>> The MongoRecordWriter can work with non BSON objects as well.
>>> https://github.com/mongodb/mongo-hadoop/blob/master/core/src/main/java/com/mongodb/hadoop/mapred/output/MongoRecordWriter.java
>>>
>>>
>>> Stephan
>>>
>>>
>>> On Thu, Nov 6, 2014 at 4:12 PM, Flavio Pompermaier <pompermaier@okkam.it
>>> > wrote:
>>>
>>>> I'm trying to do that but I can't find the proper typing.. For example:
>>>>
>>>> DataSet<String> fin = input.map(new MapFunction<Tuple2<BSONWritable,
>>>> BSONWritable>, String>() {
>>>>
>>>> private static final long serialVersionUID = 1L;
>>>>
>>>> @Override
>>>> public String map(Tuple2<BSONWritable, BSONWritable> record) throws
>>>> Exception {
>>>> BSONWritable value = record.getField(1);
>>>> BSONObject doc = value.getDoc();
>>>> BasicDBObject jsonld = (BasicDBObject) doc.get("jsonld");
>>>> String type = jsonld.getString("@type");
>>>> return type;
>>>> }
>>>> });
>>>>
>>>> MongoConfigUtil.setOutputURI( hdIf.getJobConf(),
>>>> "mongodb://localhost:27017/test.test");
>>>> fin.output(new HadoopOutputFormat<BSONWritable,BSONWritable>(new
>>>> MongoOutputFormat<BSONWritable,BSONWritable>(), hdIf.getJobConf()));
>>>>
>>>> Obviously this doesn't work because I'm emitting strings and trying to
>>>> write BSONWritable ..can you show me a simple working example?
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Thu, Nov 6, 2014 at 3:58 PM, Stephan Ewen <sewen@apache.org> wrote:
>>>>
>>>>> Hi Flavio!
>>>>>
>>>>> I think the general method is the same as with the inputs.
>>>>>
>>>>> You use the "HadoopOutputFormat" wrapping the "MongoOutputFormat" (
>>>>> https://github.com/mongodb/mongo-hadoop/blob/master/core/src/main/java/com/mongodb/hadoop/mapred/MongoOutputFormat.java
>>>>> )
>>>>>
>>>>> You can then call
>>>>>
>>>>> DataSet<Tuple2<BSONWritable, BSONWritable>> data = ...;
>>>>>
>>>>> data.output(mongoOutput);
>>>>>
>>>>>
>>>>> Greetings,
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Thu, Nov 6, 2014 at 3:41 PM, Flavio Pompermaier <
>>>>> pompermaier@okkam.it> wrote:
>>>>>
>>>>>> Any help here..?
>>>>>>
>>>>>> On Wed, Nov 5, 2014 at 6:39 PM, Flavio Pompermaier <
>>>>>> pompermaier@okkam.it> wrote:
>>>>>>
>>>>>>> Just shared the example at
>>>>>>> https://github.com/okkam-it/flink-mongodb-test and twitted :)
>>>>>>>
>>>>>>> The next step is to show how to write the result of a Flink process
>>>>>>> back to Mongo.
>>>>>>> How can I manage to do that? Can someone help me?
>>>>>>>
>>>>>>> On Wed, Nov 5, 2014 at 1:17 PM, Fabian Hueske <fhueske@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> How about going for an optional parameter for the InputFormat
to
>>>>>>>> determine into how many splits each region is split?
>>>>>>>> That would be a lightweight option to control the number
of splits
>>>>>>>> with low effort (on our side).
>>>>>>>>
>>>>>>>> 2014-11-05 0:01 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>
>>>>>>>> :
>>>>>>>>
>>>>>>>>> So how are we going to proceed here? Is someone willing
to help me
>>>>>>>>> in improving the splitting policy or we leave it as it
is now?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Nov 4, 2014 at 9:42 PM, Fabian Hueske <fhueske@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I agree. Going for more splits with smaller key regions
is a
>>>>>>>>>> good idea.
>>>>>>>>>> However, it might be a bit difficult to determine
a good number
>>>>>>>>>> of splits as the size of a split depends on its density.
Too large splits
>>>>>>>>>> are prone to cause data skew, too small ones will
increase the overhead of
>>>>>>>>>> split assignment.
>>>>>>>>>>
>>>>>>>>>> A solution for this problem could be to add an optional
>>>>>>>>>> parameter to the IF to give an upper bound for the
number of InputSplits.
>>>>>>>>>>
>>>>>>>>>> 2014-11-04 20:53 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>>>>>>>>>>
>>>>>>>>>>> Typo: it should have meant that workers that
get a larger split
>>>>>>>>>>> will get fewer additional splits.
>>>>>>>>>>> Am 04.11.2014 20:48 schrieb sewen@apache.org:
>>>>>>>>>>>
>>>>>>>>>>> InputSplits are assigned lazily at runtime, which
gives you many
>>>>>>>>>>> of the benefits of re-assigning without the nastyness.
>>>>>>>>>>>
>>>>>>>>>>> Can you write the logic that creates the splits
such that it
>>>>>>>>>>> creates multiple splits per region? Then the
lazy assignment will make sure
>>>>>>>>>>> that workers that get a larger split will get
get additional splits than
>>>>>>>>>>> workers that get smaller splits...
>>>>>>>>>>> Am 04.11.2014 20:32 schrieb "Fabian Hueske" <fhueske@apache.org
>>>>>>>>>>> >:
>>>>>>>>>>>
>>>>>>>>>>> Hmm, that's good question indeed. I am not familiar
with HBase's
>>>>>>>>>>>> mode of operation.
>>>>>>>>>>>> I would assume, that HBase uses range partitioning
to partition
>>>>>>>>>>>> a table into regions. That way it is rather
easy to balance the size of
>>>>>>>>>>>> regions, as long as there is no single key
that occurs very often. I am not
>>>>>>>>>>>> sure if it is possible to overcome data skew
cause by frequent keys.
>>>>>>>>>>>> However as I said, these are just assumption.
I will have a
>>>>>>>>>>>> look at HBase's internals for verification.
>>>>>>>>>>>>
>>>>>>>>>>>> In any case, Flink does currently not support
reassigning
>>>>>>>>>>>> or splitting of InputSplits at runtime.
>>>>>>>>>>>> Also initially generating balanced InputSplits
willl be
>>>>>>>>>>>> tricky. That would be possible if we can
efficiently determine the
>>>>>>>>>>>> "density" of a key range when creating the
InputSplits. However, I'm a bit
>>>>>>>>>>>> skeptical that this can be done...
>>>>>>>>>>>>
>>>>>>>>>>>> 2014-11-04 17:33 GMT+01:00 Flavio Pompermaier
<
>>>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>>>
>>>>>>>>>>>>> From what I know HBase manages the regions
but the fact that
>>>>>>>>>>>>> they are evenly distributed depends on
a well-designed key..
>>>>>>>>>>>>> if it is not the case you could encounter
very unbalanced
>>>>>>>>>>>>> regions (i.e. hot spotting).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Could it be a good idea to create a split
policy that compares
>>>>>>>>>>>>> the size of all the splits and generate
equally-sized split that can be
>>>>>>>>>>>>> reassigned to free worker if the original
assigned one is still busy?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Nov 4, 2014 at 5:18 PM, Fabian
Hueske <
>>>>>>>>>>>>> fhueske@apache.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> ad 1) HBase manages the regions and
should also take care
>>>>>>>>>>>>>> of their uniform size.
>>>>>>>>>>>>>> as 2) Dynamically changing InputSplits
is not possible at the
>>>>>>>>>>>>>> moment. However, the input split
generation of the IF should also be able
>>>>>>>>>>>>>> to handle such issues upfront. In
fact, the IF could also generate multiple
>>>>>>>>>>>>>> splits per region (this would be
necessary to make sure that the minimum
>>>>>>>>>>>>>> number of splits is generated if
there are less regions than required
>>>>>>>>>>>>>> splits).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2014-11-04 17:04 GMT+01:00 Flavio
Pompermaier <
>>>>>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Ok, thanks for the explanation!
>>>>>>>>>>>>>>> That was more or less like I
thought it should be but there
>>>>>>>>>>>>>>> are still points I'd like to
clarify:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 1 - What if a region is very
big and there are other regions
>>>>>>>>>>>>>>> very small..? There will be one
slot that takes a very long time while the
>>>>>>>>>>>>>>> others will stay inactive..
>>>>>>>>>>>>>>> 2 - Do you think it is possible
to implement this in an
>>>>>>>>>>>>>>> adaptive way (stop processing
of huge region if it worth it and assign
>>>>>>>>>>>>>>> remaining data to inactive task
managers)?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Nov 4, 2014 at 4:37 PM,
Fabian Hueske <
>>>>>>>>>>>>>>> fhueske@apache.org> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Local split assignment preferably
assigns input split to
>>>>>>>>>>>>>>>> workers that can locally
read the data of an input split.
>>>>>>>>>>>>>>>> For example, HDFS stores
file chunks (blocks) distributed
>>>>>>>>>>>>>>>> over the cluster and gives
access to these chunks to every worker via
>>>>>>>>>>>>>>>> network transfer. However,
if a chunk is read from a process that runs on
>>>>>>>>>>>>>>>> the same node as the chunk
is stored, the read operation directly accesses
>>>>>>>>>>>>>>>> the local file system without
going over the network. Hence, it is
>>>>>>>>>>>>>>>> essential to assign input
splits based on the locality of their data if you
>>>>>>>>>>>>>>>> want to have reasonably performance.
We call this local split
>>>>>>>>>>>>>>>> assignment. This is a general
concept of all data parallel systems
>>>>>>>>>>>>>>>> including Hadoop, Spark,
and Flink.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This issue is not related
to serializability of input
>>>>>>>>>>>>>>>> formats.
>>>>>>>>>>>>>>>> I assume that the wrapped
MongoIF is also not capable of
>>>>>>>>>>>>>>>> local split assignment.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Am Dienstag, 4. November
2014 schrieb Flavio Pompermaier :
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> What do you mean for
 "might lack support for local split
>>>>>>>>>>>>>>>>> assignment"? You mean
that InputFormat is not
>>>>>>>>>>>>>>>>> serializable? This instead
is not true for Mongodb?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Tue, Nov 4, 2014 at
10:00 AM, Fabian Hueske <
>>>>>>>>>>>>>>>>> fhueske@apache.org>
wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> There's a page about
Hadoop Compatibility that shows how
>>>>>>>>>>>>>>>>>> to use the wrapper.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The HBase format
should work as well, but might lack
>>>>>>>>>>>>>>>>>> support for local
split assignment. In that case performance would suffer a
>>>>>>>>>>>>>>>>>> lot.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Am Dienstag, 4. November
2014 schrieb Flavio Pompermaier :
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Should I start
from
>>>>>>>>>>>>>>>>>>> http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html
>>>>>>>>>>>>>>>>>>> ? Is it ok?
>>>>>>>>>>>>>>>>>>> Thus, in principle,
also the TableInputFormat of HBase
>>>>>>>>>>>>>>>>>>> could be used
in a similar way..isn't it?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Tue, Nov 4,
2014 at 9:42 AM, Fabian Hueske <
>>>>>>>>>>>>>>>>>>> fhueske@apache.org>
wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> the blog
post uses Flinks wrapper for Hadoop
>>>>>>>>>>>>>>>>>>>> InputFormats.
>>>>>>>>>>>>>>>>>>>> This has
been ported to the new API and is described in
>>>>>>>>>>>>>>>>>>>> the documentation.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> So you just
need to take Mongos Hadoop IF and plug it
>>>>>>>>>>>>>>>>>>>> into the
new IF wrapper. :-)
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Fabian
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Am Dienstag,
4. November 2014 schrieb Flavio
>>>>>>>>>>>>>>>>>>>> Pompermaier
:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I saw
this post
>>>>>>>>>>>>>>>>>>>>> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
>>>>>>>>>>>>>>>>>>>>> but it
use the old APIs (HadoopDataSource instead of
>>>>>>>>>>>>>>>>>>>>> DataSource).
>>>>>>>>>>>>>>>>>>>>> How can
I use Mongodb with the new Flink APIs?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>> Flavio
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>  .
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>>
>>
>

Mime
View raw message