flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Oliver Swoboda <oswobod...@gmail.com>
Subject Re: Using Flink with Accumulo
Date Mon, 07 Nov 2016 14:34:31 GMT
Hi Josh, thank you for your quick answer!

2016-11-03 17:03 GMT+01:00 Josh Elser <elserj@apache.org>:

> Hi Oliver,
>
> Cool stuff. I wish I knew more about Flink to make some better
> suggestions. Some points inline, and sorry in advance if I suggest
> something outright wrong. Hopefully someone from the Flink side can help
> give context where necessary :)
>
> Oliver Swoboda wrote:
>
>> Hello,
>>
>> I'm using Flink with Accumulo and wanted to read data from the database
>> by using the createHadoopInput function. Therefore I configure an
>> AccumuloInputFormat. The source code you can find here:
>> https://github.com/OSwoboda/masterthesis/blob/master/aggrega
>> tion.flink/src/main/java/de/oswoboda/aggregation/Main.java
>> <https://github.com/OSwoboda/masterthesis/blob/master/aggreg
>> ation.flink/src/main/java/de/oswoboda/aggregation/Main.java>
>>
>> I'm using a 5 Node Cluster (1 Master, 4 Worker).
>> Accumulo is installed with Ambari and has 1 Master Server on the Master
>> Node and 4 Tablet Servers (one on each Worker).
>> Flink is installed standalone with the Jobmanager on the Master Node and
>> 4 Taskmanagers (one on each Worker). Every Taskmanager can have 4 Tasks,
>> so there are 32 in total.
>>
>> First problem I have:
>> If I start serveral Flink Jobs the client count for Zookeeper in the
>> Accumulo Overview is constantly increasing. I assume that the used
>> scanner isn't correctly closed. The client count only decreases to
>> normal values when I restart Flink.
>>
>
> Hrm, this does seem rather bad. Eventually, you'll saturate the
> connections to ZK and ZK itself will start limiting new connections (per
> the maxClientCnxns property).
>
> This sounds somewhat familiar to https://issues.apache.org/jira
> /browse/ACCUMULO-2113. The lack of a proper "close()" method on the
> Instance interface is a known deficiency. I'm not sure how Flink execution
> happens, so I am kind of just guessing.
>
> You might be able to try to use the CleanUp[1] utility to close out the
> thread pools/connections when your Flink "task" is done.


Unfortunately that didn't worked. I guess because Flink is starting the
tasks with the scanners by a TaskManager and I can't access those tasks
with my program. So after the task is done, I can't close the connections
with the utility, because the thread where I use it hasn't startet the
scanners.

Second problem I have:
>> I want to compare aggregations on time series data with Accumulo (with
>> Iterators) and with flink. Unfortunately, the results vary inexplicable
>> when I'm using Flink. I wanted to compare the results for a full table
>> scan (called baseline in the code), but sometimes it takes 17-18 minutes
>> and sometimes its between 30 and 60 minutes. In the longer case I can
>> see in the Accumulo Overview that after some time only one worker is
>> left with running scans and there are just a few entries/s sanned (4
>> million at the beginning when all workers are running to 200k when the
>> one worker is left). Because there are 2.5 billion records to scan and
>> almost 500 million left it takes really long.
>> This problem doesn't occur with Accumulo using Iterators and a batch
>> scanner on the master node, each scan has almost identical durations and
>> graphics in the Accumulo Overview for entries/s, MB/s scanned and seeks
>> are for each scan the same.
>>
>
> It sounds like maybe your partitioning was sub-optimal and caused one task
> to get a majority of the data? Having the autoAdjustRanges=true (as you do
> by default) should help get many batches of work based on the tablet
> boundaries in Accumulo. I'm not sure how Flink actually executes them
> though.
>

The problem was that half of the data was on one node after a restart of
accumulo. It seems that it has something to do with the problem described
here: https://issues.apache.org/jira/browse/ACCUMULO-4353. I stopped and
then startet accumulo instead of doing a restart and then the data is
distributed evenly across all nodes. For my tests I keep accumulo running
now, because after each restart the data distribution is changed and I
don't want to upgrade to 1.8.

Yours faithfully,
>> Oliver Swoboda
>>
>
>
> [1] https://github.com/apache/accumulo/blob/e900e67425d950bd4c0c
> 5288a6270d7b362ac458/core/src/main/java/org/apache/accumulo/
> core/util/CleanUp.java#L36
>
>

Mime
View raw message