From issues-return-171361-archive-asf-public=cust-asf.ponee.io@flink.apache.org Wed Jun 13 11:17:04 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 710F3180609 for ; Wed, 13 Jun 2018 11:17:03 +0200 (CEST) Received: (qmail 5275 invoked by uid 500); 13 Jun 2018 09:17:02 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 5265 invoked by uid 99); 13 Jun 2018 09:17:02 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Jun 2018 09:17:02 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id EC711C0102 for ; Wed, 13 Jun 2018 09:17:01 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -109.501 X-Spam-Level: X-Spam-Status: No, score=-109.501 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id XXrNsMhBYIOA for ; Wed, 13 Jun 2018 09:17:01 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id BE7565F542 for ; Wed, 13 Jun 2018 09:17:00 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 5B1BAE0239 for ; Wed, 13 Jun 2018 09:17:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 168EC20EAB for ; Wed, 13 Jun 2018 09:17:00 +0000 (UTC) Date: Wed, 13 Jun 2018 09:17:00 +0000 (UTC) From: "Sihua Zhou (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510828#comment-16510828 ] Sihua Zhou commented on FLINK-9506: ----------------------------------- Hi [~yow], I didn't see the email you sent yet, but I just had a look at your code, I think the "non-scale-able" might be caused by your test code. From your code we could see that the source's parallelism is always the same as the other operators. And in the each sub-task of the source, you use the loop to mock the source records, that means the QPS of the source will increase when you trying to rescale up the parallelism of your job, in the end, you didn't scale up anything indeed. I would suggest to set the parallelism of the source to a fixed value(e.g. 4), and scale up the job, then let's see whether it's scalable. I didn't test your code on cluster yet, will test it later. My email is "summerleafs@163.com", if you had problem to send email to "user@flink.apache.org", you could send to me personally if you want. Thanks~ > Flink ReducingState.add causing more than 100% performance drop > --------------------------------------------------------------- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement > Affects Versions: 1.4.2 > Reporter: swy > Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, input_stop_when_timer_run.png, keyby.png > > > Hi, we found out application performance drop more than 100% when ReducingState.add is used in the source code. In the test checkpoint is disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just simply keep storing record, also with simple reduction function(in fact with empty function would see the same result). Any idea would be appreciated. What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how many record per second in "JsonTranslator", which is shown in the graph. The difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream stream = env.addSource(new GeneratorSource(loop); > DataStream convert = stream.map(new JsonTranslator()) > .keyBy() > .process(new ProcessAggregation()) > .map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState recStore; > public void processElement(Recordr, Context ctx, Collector out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)