flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Flink Mongodb
Date Thu, 06 Nov 2014 15:12:00 GMT
I'm trying to do that but I can't find the proper typing.. For example:

DataSet<String> fin = input.map(new MapFunction<Tuple2<BSONWritable,
BSONWritable>, String>() {

private static final long serialVersionUID = 1L;

@Override
public String map(Tuple2<BSONWritable, BSONWritable> record) throws
Exception {
BSONWritable value = record.getField(1);
BSONObject doc = value.getDoc();
BasicDBObject jsonld = (BasicDBObject) doc.get("jsonld");
String type = jsonld.getString("@type");
return type;
}
});

MongoConfigUtil.setOutputURI( hdIf.getJobConf(),
"mongodb://localhost:27017/test.test");
fin.output(new HadoopOutputFormat<BSONWritable,BSONWritable>(new
MongoOutputFormat<BSONWritable,BSONWritable>(), hdIf.getJobConf()));

Obviously this doesn't work because I'm emitting strings and trying to
write BSONWritable ..can you show me a simple working example?

Best,
Flavio

On Thu, Nov 6, 2014 at 3:58 PM, Stephan Ewen <sewen@apache.org> wrote:

> Hi Flavio!
>
> I think the general method is the same as with the inputs.
>
> You use the "HadoopOutputFormat" wrapping the "MongoOutputFormat" (
> https://github.com/mongodb/mongo-hadoop/blob/master/core/src/main/java/com/mongodb/hadoop/mapred/MongoOutputFormat.java
> )
>
> You can then call
>
> DataSet<Tuple2<BSONWritable, BSONWritable>> data = ...;
>
> data.output(mongoOutput);
>
>
> Greetings,
> Stephan
>
>
> On Thu, Nov 6, 2014 at 3:41 PM, Flavio Pompermaier <pompermaier@okkam.it>
> wrote:
>
>> Any help here..?
>>
>> On Wed, Nov 5, 2014 at 6:39 PM, Flavio Pompermaier <pompermaier@okkam.it>
>> wrote:
>>
>>> Just shared the example at
>>> https://github.com/okkam-it/flink-mongodb-test and twitted :)
>>>
>>> The next step is to show how to write the result of a Flink process back
>>> to Mongo.
>>> How can I manage to do that? Can someone help me?
>>>
>>> On Wed, Nov 5, 2014 at 1:17 PM, Fabian Hueske <fhueske@apache.org>
>>> wrote:
>>>
>>>> How about going for an optional parameter for the InputFormat to
>>>> determine into how many splits each region is split?
>>>> That would be a lightweight option to control the number of splits with
>>>> low effort (on our side).
>>>>
>>>> 2014-11-05 0:01 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>
>>>>> So how are we going to proceed here? Is someone willing to help me in
>>>>> improving the splitting policy or we leave it as it is now?
>>>>>
>>>>>
>>>>> On Tue, Nov 4, 2014 at 9:42 PM, Fabian Hueske <fhueske@apache.org>
>>>>> wrote:
>>>>>
>>>>>> I agree. Going for more splits with smaller key regions is a
>>>>>> good idea.
>>>>>> However, it might be a bit difficult to determine a good number of
>>>>>> splits as the size of a split depends on its density. Too large splits
are
>>>>>> prone to cause data skew, too small ones will increase the overhead
of
>>>>>> split assignment.
>>>>>>
>>>>>> A solution for this problem could be to add an optional parameter
to
>>>>>> the IF to give an upper bound for the number of InputSplits.
>>>>>>
>>>>>> 2014-11-04 20:53 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>>>>>>
>>>>>>> Typo: it should have meant that workers that get a larger split
will
>>>>>>> get fewer additional splits.
>>>>>>> Am 04.11.2014 20:48 schrieb sewen@apache.org:
>>>>>>>
>>>>>>> InputSplits are assigned lazily at runtime, which gives you many
of
>>>>>>> the benefits of re-assigning without the nastyness.
>>>>>>>
>>>>>>> Can you write the logic that creates the splits such that it
creates
>>>>>>> multiple splits per region? Then the lazy assignment will make
sure that
>>>>>>> workers that get a larger split will get get additional splits
than workers
>>>>>>> that get smaller splits...
>>>>>>> Am 04.11.2014 20:32 schrieb "Fabian Hueske" <fhueske@apache.org>:
>>>>>>>
>>>>>>> Hmm, that's good question indeed. I am not familiar with HBase's
>>>>>>>> mode of operation.
>>>>>>>> I would assume, that HBase uses range partitioning to partition
a
>>>>>>>> table into regions. That way it is rather easy to balance
the size of
>>>>>>>> regions, as long as there is no single key that occurs very
often. I am not
>>>>>>>> sure if it is possible to overcome data skew cause by frequent
keys.
>>>>>>>> However as I said, these are just assumption. I will have
a look at
>>>>>>>> HBase's internals for verification.
>>>>>>>>
>>>>>>>> In any case, Flink does currently not support reassigning
>>>>>>>> or splitting of InputSplits at runtime.
>>>>>>>> Also initially generating balanced InputSplits willl be
>>>>>>>> tricky. That would be possible if we can efficiently determine
the
>>>>>>>> "density" of a key range when creating the InputSplits. However,
I'm a bit
>>>>>>>> skeptical that this can be done...
>>>>>>>>
>>>>>>>> 2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it
>>>>>>>> >:
>>>>>>>>
>>>>>>>>> From what I know HBase manages the regions but the fact
that they
>>>>>>>>> are evenly distributed depends on a well-designed key..
>>>>>>>>> if it is not the case you could encounter very unbalanced
regions
>>>>>>>>> (i.e. hot spotting).
>>>>>>>>>
>>>>>>>>> Could it be a good idea to create a split policy that
compares the
>>>>>>>>> size of all the splits and generate equally-sized split
that can be
>>>>>>>>> reassigned to free worker if the original assigned one
is still busy?
>>>>>>>>>
>>>>>>>>> On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <fhueske@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> ad 1) HBase manages the regions and should also take
care
>>>>>>>>>> of their uniform size.
>>>>>>>>>> as 2) Dynamically changing InputSplits is not possible
at the
>>>>>>>>>> moment. However, the input split generation of the
IF should also be able
>>>>>>>>>> to handle such issues upfront. In fact, the IF could
also generate multiple
>>>>>>>>>> splits per region (this would be necessary to make
sure that the minimum
>>>>>>>>>> number of splits is generated if there are less regions
than required
>>>>>>>>>> splits).
>>>>>>>>>>
>>>>>>>>>> 2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <
>>>>>>>>>> pompermaier@okkam.it>:
>>>>>>>>>>
>>>>>>>>>>> Ok, thanks for the explanation!
>>>>>>>>>>> That was more or less like I thought it should
be but there are
>>>>>>>>>>> still points I'd like to clarify:
>>>>>>>>>>>
>>>>>>>>>>> 1 - What if a region is very big and there are
other regions
>>>>>>>>>>> very small..? There will be one slot that takes
a very long time while the
>>>>>>>>>>> others will stay inactive..
>>>>>>>>>>> 2 - Do you think it is possible to implement
this in an adaptive
>>>>>>>>>>> way (stop processing of huge region if it worth
it and assign remaining
>>>>>>>>>>> data to inactive task managers)?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske
<
>>>>>>>>>>> fhueske@apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Local split assignment preferably assigns
input split to
>>>>>>>>>>>> workers that can locally read the data of
an input split.
>>>>>>>>>>>> For example, HDFS stores file chunks (blocks)
distributed over
>>>>>>>>>>>> the cluster and gives access to these chunks
to every worker via network
>>>>>>>>>>>> transfer. However, if a chunk is read from
a process that runs on the same
>>>>>>>>>>>> node as the chunk is stored, the read operation
directly accesses the local
>>>>>>>>>>>> file system without going over the network.
Hence, it is essential to
>>>>>>>>>>>> assign input splits based on the locality
of their data if you want to have
>>>>>>>>>>>> reasonably performance. We call this local
split assignment. This is a
>>>>>>>>>>>> general concept of all data parallel systems
including Hadoop, Spark, and
>>>>>>>>>>>> Flink.
>>>>>>>>>>>>
>>>>>>>>>>>> This issue is not related to serializability
of input formats.
>>>>>>>>>>>> I assume that the wrapped MongoIF is also
not capable of local
>>>>>>>>>>>> split assignment.
>>>>>>>>>>>>
>>>>>>>>>>>> Am Dienstag, 4. November 2014 schrieb Flavio
Pompermaier :
>>>>>>>>>>>>
>>>>>>>>>>>>> What do you mean for  "might lack support
for local split
>>>>>>>>>>>>> assignment"? You mean that InputFormat
is not serializable?
>>>>>>>>>>>>> This instead is not true for Mongodb?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Nov 4, 2014 at 10:00 AM, Fabian
Hueske <
>>>>>>>>>>>>> fhueske@apache.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> There's a page about Hadoop Compatibility
that shows how to
>>>>>>>>>>>>>> use the wrapper.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The HBase format should work as well,
but might lack support
>>>>>>>>>>>>>> for local split assignment. In that
case performance would suffer a lot.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Am Dienstag, 4. November 2014 schrieb
Flavio Pompermaier :
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Should I start from
>>>>>>>>>>>>>>> http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html
>>>>>>>>>>>>>>> ? Is it ok?
>>>>>>>>>>>>>>> Thus, in principle, also the
TableInputFormat of HBase could
>>>>>>>>>>>>>>> be used in a similar way..isn't
it?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Nov 4, 2014 at 9:42 AM,
Fabian Hueske <
>>>>>>>>>>>>>>> fhueske@apache.org> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> the blog post uses Flinks
wrapper for Hadoop InputFormats.
>>>>>>>>>>>>>>>> This has been ported to the
new API and is described in the
>>>>>>>>>>>>>>>> documentation.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> So you just need to take
Mongos Hadoop IF and plug it into
>>>>>>>>>>>>>>>> the new IF wrapper. :-)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Fabian
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Am Dienstag, 4. November
2014 schrieb Flavio Pompermaier :
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi to all,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I saw this post
>>>>>>>>>>>>>>>>> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
>>>>>>>>>>>>>>>>> but it use the old APIs
(HadoopDataSource instead of
>>>>>>>>>>>>>>>>> DataSource).
>>>>>>>>>>>>>>>>> How can I use Mongodb
with the new Flink APIs?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>> Flavio
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  .
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Mime
View raw message