Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A3463200BB9 for ; Mon, 7 Nov 2016 15:34:37 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id A1E88160AEC; Mon, 7 Nov 2016 14:34:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9868C160AEB for ; Mon, 7 Nov 2016 15:34:36 +0100 (CET) Received: (qmail 10981 invoked by uid 500); 7 Nov 2016 14:34:35 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 10970 invoked by uid 99); 7 Nov 2016 14:34:35 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Nov 2016 14:34:35 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 31823C84AC for ; Mon, 7 Nov 2016 14:34:35 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.629 X-Spam-Level: ** X-Spam-Status: No, score=2.629 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id QDAvvNM0eLH0 for ; Mon, 7 Nov 2016 14:34:33 +0000 (UTC) Received: from mail-yw0-f181.google.com (mail-yw0-f181.google.com [209.85.161.181]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 67B935F1E5 for ; Mon, 7 Nov 2016 14:34:33 +0000 (UTC) Received: by mail-yw0-f181.google.com with SMTP id l124so138219578ywb.3 for ; Mon, 07 Nov 2016 06:34:33 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=bYkjR8K6Gkasqq2ys6/9IEo81gAhD7oCu+GLeQ9suo0=; b=nxC6nk8jeWJfx697AzYqaVAK6D2nxRogkjp+iN6AjxVvaxH2eYIcF0GeZrqGWVUPhr HDWeN9fHxWgEOkQ8gIm0OdM9jArA0e71CHKZLQkHZQ17K10+MurlxCRDFjXgMZ1KDI9D k1JI2sEHfv7OrC70x5zuStSNDuLuGKrwdkILeW/0jCSkfTCmE/G2BHkfy0Jf8EZR5NOQ V+/dYWtdUln6w0830BuLM9PCL5Bv0tX6M6gho1IXWK3etxHC4WoCucS3OzgeINFq9L5k ux11oOxUgNmeGWfQ3VXDT+kt2hKGEh5iEi1SREzyaFraEatCJyVEg8xQSrDaWoK7wf+z UgpA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=bYkjR8K6Gkasqq2ys6/9IEo81gAhD7oCu+GLeQ9suo0=; b=KbazCjJ+3vpn9oLeOQlVsf3tOfFfKJ3NiuhOLDp54lrIcIBkG51rdwGRLt5ZnvhZQB E+hfvWCwrM0vOsL77mbWzxZKraoiMfBn8mLPdfO/QCMVsKdUBRUsVmfNUAaFafmdArO4 Ej71rYJcsGiDIXt6Gl23eMWR5Z7y8KCWxe/K3oF/NmhUS3ycsKXPA0zXykqn0Bk8gMOB 607h8XFjCV68UQ1hj1IO6BNUbgK5mbFqh4hw2Nxnv2Rsmbd3RtCn6xhAKJPr00iSkovD VnJRytEilTXn3IyUmxs261CO0lC0duVxA1Q9EFdQO0zo9ai6BR4CoaP52072jpOuCtV5 jy6Q== X-Gm-Message-State: ABUngvcOAmE0K0AOczsxT2G75oo8E/4tbuThbO83pIDJSfs2Kec3gttwgrLkqanyB2UzTnJElZTFSBXLB9FKkA== X-Received: by 10.202.170.7 with SMTP id t7mr4544412oie.0.1478529272230; Mon, 07 Nov 2016 06:34:32 -0800 (PST) MIME-Version: 1.0 Received: by 10.202.240.198 with HTTP; Mon, 7 Nov 2016 06:34:31 -0800 (PST) In-Reply-To: <581B5FDB.4060409@apache.org> References: <581B5FDB.4060409@apache.org> From: Oliver Swoboda Date: Mon, 7 Nov 2016 15:34:31 +0100 Message-ID: Subject: Re: Using Flink with Accumulo To: user@flink.apache.org Cc: user@accumulo.apache.org Content-Type: multipart/alternative; boundary=001a113c35760fca3c0540b6ec72 archived-at: Mon, 07 Nov 2016 14:34:37 -0000 --001a113c35760fca3c0540b6ec72 Content-Type: text/plain; charset=UTF-8 Hi Josh, thank you for your quick answer! 2016-11-03 17:03 GMT+01:00 Josh Elser : > Hi Oliver, > > Cool stuff. I wish I knew more about Flink to make some better > suggestions. Some points inline, and sorry in advance if I suggest > something outright wrong. Hopefully someone from the Flink side can help > give context where necessary :) > > Oliver Swoboda wrote: > >> Hello, >> >> I'm using Flink with Accumulo and wanted to read data from the database >> by using the createHadoopInput function. Therefore I configure an >> AccumuloInputFormat. The source code you can find here: >> https://github.com/OSwoboda/masterthesis/blob/master/aggrega >> tion.flink/src/main/java/de/oswoboda/aggregation/Main.java >> > ation.flink/src/main/java/de/oswoboda/aggregation/Main.java> >> >> I'm using a 5 Node Cluster (1 Master, 4 Worker). >> Accumulo is installed with Ambari and has 1 Master Server on the Master >> Node and 4 Tablet Servers (one on each Worker). >> Flink is installed standalone with the Jobmanager on the Master Node and >> 4 Taskmanagers (one on each Worker). Every Taskmanager can have 4 Tasks, >> so there are 32 in total. >> >> First problem I have: >> If I start serveral Flink Jobs the client count for Zookeeper in the >> Accumulo Overview is constantly increasing. I assume that the used >> scanner isn't correctly closed. The client count only decreases to >> normal values when I restart Flink. >> > > Hrm, this does seem rather bad. Eventually, you'll saturate the > connections to ZK and ZK itself will start limiting new connections (per > the maxClientCnxns property). > > This sounds somewhat familiar to https://issues.apache.org/jira > /browse/ACCUMULO-2113. The lack of a proper "close()" method on the > Instance interface is a known deficiency. I'm not sure how Flink execution > happens, so I am kind of just guessing. > > You might be able to try to use the CleanUp[1] utility to close out the > thread pools/connections when your Flink "task" is done. Unfortunately that didn't worked. I guess because Flink is starting the tasks with the scanners by a TaskManager and I can't access those tasks with my program. So after the task is done, I can't close the connections with the utility, because the thread where I use it hasn't startet the scanners. Second problem I have: >> I want to compare aggregations on time series data with Accumulo (with >> Iterators) and with flink. Unfortunately, the results vary inexplicable >> when I'm using Flink. I wanted to compare the results for a full table >> scan (called baseline in the code), but sometimes it takes 17-18 minutes >> and sometimes its between 30 and 60 minutes. In the longer case I can >> see in the Accumulo Overview that after some time only one worker is >> left with running scans and there are just a few entries/s sanned (4 >> million at the beginning when all workers are running to 200k when the >> one worker is left). Because there are 2.5 billion records to scan and >> almost 500 million left it takes really long. >> This problem doesn't occur with Accumulo using Iterators and a batch >> scanner on the master node, each scan has almost identical durations and >> graphics in the Accumulo Overview for entries/s, MB/s scanned and seeks >> are for each scan the same. >> > > It sounds like maybe your partitioning was sub-optimal and caused one task > to get a majority of the data? Having the autoAdjustRanges=true (as you do > by default) should help get many batches of work based on the tablet > boundaries in Accumulo. I'm not sure how Flink actually executes them > though. > The problem was that half of the data was on one node after a restart of accumulo. It seems that it has something to do with the problem described here: https://issues.apache.org/jira/browse/ACCUMULO-4353. I stopped and then startet accumulo instead of doing a restart and then the data is distributed evenly across all nodes. For my tests I keep accumulo running now, because after each restart the data distribution is changed and I don't want to upgrade to 1.8. Yours faithfully, >> Oliver Swoboda >> > > > [1] https://github.com/apache/accumulo/blob/e900e67425d950bd4c0c > 5288a6270d7b362ac458/core/src/main/java/org/apache/accumulo/ > core/util/CleanUp.java#L36 > > --001a113c35760fca3c0540b6ec72 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Josh, thank you for your quick answer!

2016-11-03 17:03 GMT+01:00 Jos= h Elser <elserj@apache.org>:
Hi Oliver,

Cool stuff. I wish I knew more about Flink to make some better suggestions.= Some points inline, and sorry in advance if I suggest something outright w= rong. Hopefully someone from the Flink side can help give context where nec= essary :)

Oliver Swoboda wrote:
Hello,

I'm using Flink with Accumulo and wanted to read data from the database=
by using the createHadoopInput function. Therefore I configure an
AccumuloInputFormat. The source code you can find here:
https://github.com/OSwoboda/masterthesis/blob/master= /aggregation.flink/src/main/java/de/oswoboda/aggregation/Main.jav= a
<https://github.com/OSwoboda/masterthesis/blob/ma= ster/aggregation.flink/src/main/java/de/oswoboda/aggregation/Main= .java>

I'm using a 5 Node Cluster (1 Master, 4 Worker).
Accumulo is installed with Ambari and has 1 Master Server on the Master
Node and 4 Tablet Servers (one on each Worker).
Flink is installed standalone with the Jobmanager on the Master Node and 4 Taskmanagers (one on each Worker). Every Taskmanager can have 4 Tasks, so there are 32 in total.

First problem I have:
If I start serveral Flink Jobs the client count for Zookeeper in the
Accumulo Overview is constantly increasing. I assume that the used
scanner isn't correctly closed. The client count only decreases to
normal values when I restart Flink.

Hrm, this does seem rather bad. Eventually, you'll saturate the connect= ions to ZK and ZK itself will start limiting new connections (per the maxCl= ientCnxns property).

This sounds somewhat familiar to https://issues.a= pache.org/jira/browse/ACCUMULO-2113. The lack of a proper "cl= ose()" method on the Instance interface is a known deficiency. I'm= not sure how Flink execution happens, so I am kind of just guessing.

You might be able to try to use the CleanUp[1] utility to close out the thr= ead pools/connections when your Flink "task" is done.

Unfortunately=C2=A0that didn't worked. I guess bec= ause Flink is starting the tasks with the scanners by a TaskManager and I c= an't access those tasks with my program. So after the task is done, I c= an't close the connections with the utility, because the thread where I= use it hasn't startet the scanners.

Second problem I have:
I want to compare aggregations on time series data with Accumulo (with
Iterators) and with flink. Unfortunately, the results vary inexplicable
when I'm using Flink. I wanted to compare the results for a full table<= br> scan (called baseline in the code), but sometimes it takes 17-18 minutes and sometimes its between 30 and 60 minutes. In the longer case I can
see in the Accumulo Overview that after some time only one worker is
left with running scans and there are just a few entries/s sanned (4
million at the beginning when all workers are running to 200k when the
one worker is left). Because there are 2.5 billion records to scan and
almost 500 million left it takes really long.
This problem doesn't occur with Accumulo using Iterators and a batch scanner on the master node, each scan has almost identical durations and graphics in the Accumulo Overview for entries/s, MB/s scanned and seeks
are for each scan the same.

It sounds like maybe your partitioning was sub-optimal and caused one task = to get a majority of the data? Having the autoAdjustRanges=3Dtrue (as you d= o by default) should help get many batches of work based on the tablet boun= daries in Accumulo. I'm not sure how Flink actually executes them thoug= h.

The problem was that half of the dat= a was on one node after a restart of accumulo. It seems that it has somethi= ng to do with the problem described here:=C2=A0https://issues.apache.org/jira/browse/A= CCUMULO-4353. I stopped and then startet accumulo instead of doing a re= start and then the data is distributed evenly across all nodes. For my test= s I keep accumulo running now, because after each restart the data distribu= tion is changed and I don't want to upgrade to 1.8.

--001a113c35760fca3c0540b6ec72--