flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hilmi Yildirim <hilmi.yildi...@neofonie.de>
Subject Re: Reading from HBase problem
Date Tue, 09 Jun 2015 12:36:08 GMT
Hi Ufuk,
I used the TableInput format from flink-addons.

Best Regards,
Hilmi

Am 09.06.2015 um 13:17 schrieb Ufuk Celebi:
> Hey Hilmi,
>
> thanks for reporting the issue. Sorry for the inconvenience this has caused. I'm not
familiar with HBase in combination with Flink. From what I've seen, there are two options:
either use Flink's TableInputFormat from flink-addons or the Hadoop TableInputFormat, right?
Which one are you using?
>
> – Ufuk
>
> On 09 Jun 2015, at 11:53, fhueske@gmail.com wrote:
>
>> Thank you very much!
>>
>> From: Hilmi Yildirim
>> Sent: ‎Tuesday‎, ‎9‎. ‎June‎, ‎2015 ‎11‎:‎40
>> To: user@flink.apache.org
>>
>> Done
>> https://issues.apache.org/jira/browse/FLINK-2188
>>
>> Am 09.06.2015 um 11:26 schrieb Fabian Hueske:
>> Would you mind opening a JIRA for this issue?
>>
>> -> https://issues.apache.org/jira/browse/FLINK
>>
>> I can do it as well, but you know all the details.
>>
>> Thanks, Fabian
>>
>> 2015-06-09 11:03 GMT+02:00 Hilmi Yildirim <hilmi.yildirim@neofonie.de>:
>> I want to add that I run the Flink job on a cluster with 13 machines and each machine
has 13 processing slots which results in a total number of processing slots of 169.
>>
>>
>> Am 09.06.2015 um 10:59 schrieb Hilmi Yildirim:
>> Correct.
>>
>> I also counted the rows with Spark and Hive. Both returned the same value which is
nearly 100 mio. rows. But Flink returns 102 mio. rows.
>>
>> Best Regards,
>> Hilmi
>>
>> Am 09.06.2015 um 10:47 schrieb Fabian Hueske:
>> OK, so the problem seems to be with the HBase InputFormat.
>>
>> I guess this issue needs a bit of debugging.
>> We need to check if records are emitted twice (or more often) and if that is the
case which records.
>> Unfortunately, this issue only seems to occur with large tables :-(
>>
>> Did I got that right, that the HBase format returns about 2M (~2%) more records than
are contained in the HBase table?
>>
>> Cheers, Fabian
>>
>> 2015-06-09 10:34 GMT+02:00 Hilmi Yildirim <hilmi.yildirim@neofonie.de>:
>> Hi,
>> Now I tested the "count" method. It returns the same result as the flatmap.groupBy(0).sum(1)
method.
>>
>> Furthermore, the Hbase contains nearly 100 mio. rows but the result is 102 mio..
This means that the HbaseInput reads more rows than the HBase contains.
>>
>> Best Regards,
>> Hilmi
>>
>>
>> Am 08.06.2015 um 23:29 schrieb Fabian Hueske:
>> Hi Hilmi,
>>
>> I see two possible reasons:
>>
>> 1) The data source / InputFormat is not properly working, so not all HBase records
are read/forwarded, or
>> 2) The aggregation / count is buggy
>>
>> Roberts suggestion will use an alternative mechanism to do the count. In fact, you
can count with groupBy(0).sum() and accumulators at the same time.
>> If both counts are the same, this will indicate that the aggregation is correct and
hint that the HBase format is faulty.
>>
>> In any case, it would be very good to know your findings. Please keep us updated.
>>
>> One more hint, if you want to do a full aggregate, you don't have to use a "dummy"
key like "a". Instead, you can work with Tuple1<Long> and directly call sum(0) without
doing the groupBy().
>>
>> Best, Fabian
>>
>> 2015-06-08 17:36 GMT+02:00 Robert Metzger<rmetzger@apache.org>:
>> Hi Hilmi,
>>
>> if you just want to count the number of elements, you can also use accumulators,
as described here [1].
>> They are much more lightweight.
>>
>> So you need to make your flatMap function a RichFlatMapFunction, then call getExecutionContext().
>> Use a long accumulator to count the elements.
>>
>> If the results with the accumulator are consistent (the exact element count), then
there is a severe bug in Flink. But I suspect that the accumulator will give you the same
result (off by +-5)
>>
>> Best,
>> Robert
>>
>>
>> [1]: http://slideshare.net/robertmetzger1/apache-flink-hands-on
>>
>> On Mon, Jun 8, 2015 at 3:04 PM, Hilmi Yildirim<hilmi.yildirim@neofonie.de>
wrote:
>> Hi,
>> I implemented a simple Flink Batch job which reads from an HBase Cluster of 13 machines
and with nearly 100 million rows. The hbase version is 1.0.0-cdh5.4.1. So, I imported hbase-client
1.0.0-cdh5.4.1.
>> I implemented a flatmap which creates a tuple ("a", 1L) for each row . Then, I use
groupBy(0).sum(1).writeAsTest. The result should be the number of rows. But, the result is
not correct. I run the job multiple times and the result flactuates by +-5. I also run the
job for a smaller table with 100.000 rows and the result is correct.
>>
>> Does anyone know the reason for that?
>>
>> Best Regards,
>> Hilmi
>>
>> -- 
>> --
>> Hilmi Yildirim
>> Software Developer R&D
>>
>> http://www.neofonie.de
>>
>> Besuchen Sie den Neo Tech Blog für Anwender:
>> http://blog.neofonie.de/
>>
>> Folgen Sie uns:
>> https://plus.google.com/+neofonie
>> http://www.linkedin.com/company/neofonie-gmbh
>> https://www.xing.com/companies/neofoniegmbh
>>
>> Neofonie GmbH | Robert-Koch-Platz 4 | 10115 Berlin
>> Handelsregister Berlin-Charlottenburg: HRB 67460
>> Geschäftsführung: Thomas Kitlitschko

-- 
--
Hilmi Yildirim
Software Developer R&D

T: +49 30 24627-281
Hilmi.Yildirim@neofonie.de

http://www.neofonie.de

Besuchen Sie den Neo Tech Blog für Anwender:
http://blog.neofonie.de/

Folgen Sie uns:
https://plus.google.com/+neofonie
http://www.linkedin.com/company/neofonie-gmbh
https://www.xing.com/companies/neofoniegmbh

Neofonie GmbH | Robert-Koch-Platz 4 | 10115 Berlin
Handelsregister Berlin-Charlottenburg: HRB 67460
Geschäftsführung: Thomas Kitlitschko


Mime
View raw message